43  using boost::asio::buffer;
 
   48  unsigned int handled_packet_count = 0;
 
   51  unsigned int recvd_packet_count = 0;
 
   87    size_t packet_size = 0;
 
   95      if ((!packet_data.zero()) && (packet_data.capacity() < max_packet_size))
 
   97        packet_data.make_zero(); 
 
   99      packet_data.resize(max_packet_size);
 
  103      packet_size = 
m_low_lvl_sock.receive_from(buffer(packet_data.data(), packet_data.size()),
 
  104                                                low_lvl_remote_endpoint, 0, sys_err_code);
 
  107        assert(packet_size <= packet_data.size());
 
  108        packet_data.resize(packet_size); 
 
  111                       "[UDP " << low_lvl_remote_endpoint << 
"]:");
 
  114        ++recvd_packet_count;
 
  123      else if (sys_err_code != boost::asio::error::would_block) 
 
  144        assert(packet_size == 0); 
 
  148        assert(packet_size == 0);
 
  152    while ((packet_size != 0)
 
  153           && ((recvd_packet_count_limit == 0) || (recvd_packet_count < recvd_packet_count_limit)));
 
  161    FLOW_LOG_TRACE(
"Handled a total of [" << handled_packet_count << 
"] incoming packets " 
  162                   "out of [" << recvd_packet_count << 
"] received (limit [" << recvd_packet_count_limit << 
"]) " 
  163                   "in this boost.asio handler.");
 
  183                                                   bool is_sim_duplicate_packet)
 
  186  using boost::chrono::milliseconds; 
 
  187  using boost::chrono::round;
 
  191  unsigned int handled = 0; 
 
  207      packet_data_copy = *(
static_cast<const Blob*
>(packet_data)); 
 
  211    if (latency == Fine_duration::zero())
 
  222      FLOW_LOG_TRACE(
"SIMULATION: Delaying reception of packet by simulated latency " 
  223                     "[" << round<milliseconds>(latency) << 
"].");
 
  236                                                 low_lvl_remote_endpoint, 
true);
 
  255  using boost::chrono::milliseconds;
 
  256  using boost::chrono::round;
 
  257  using boost::shared_ptr;
 
  266  shared_ptr<Blob> packet_data_moved_ptr(
new Blob(std::move(*packet_data)));
 
  272                         [
this, packet_data_moved_ptr, low_lvl_remote_endpoint, latency, started_at]
 
  278      (
"SIMULATOR: Handling low-level packet after " 
  279       "simulated latency [" << round<milliseconds>(latency) << 
"]; " 
  280       "actual simulated latency was " 
  281       "[" << round<milliseconds>(Fine_clock::now() - started_at) << 
"]; " 
  282       "from [UDP " << low_lvl_remote_endpoint << 
"].");
 
  285    Blob packet_data_moved_again(std::move(*packet_data_moved_ptr));
 
  297                   "out of a simulated [1] received in this boost.asio handler.");
 
  302                                          bool delayed_by_pacing)
 
  319  using boost::asio::buffer;
 
  322  const auto& packet_ref = *packet;
 
  323  const auto& packet_type_id = 
typeid(packet_ref);
 
  383  const size_t bytes_to_send = packet->serialize_to_raw_data_and_log(&raw_bufs);
 
  384  assert(bytes_to_send != 0);
 
  387  sock->m_snd_stats.low_lvl_packet_xfer_called(packet_type_id, delayed_by_pacing, bytes_to_send);
 
  390  if (bytes_to_send > limit)
 
  393    FLOW_LOG_WARNING(
"Tried to send low-level packet but before doing so detected " 
  394                     "serialized size [" << bytes_to_send << 
"] exceeds limit [" << limit << 
"]; " 
  395                     "check max-block-size and low-lvl-max-packet-size options!  Serialized packet: " 
  396                     "[\n" << packet->m_concise_ostream_manip << 
"].");
 
  398    FLOW_LOG_DATA(
"Detailed serialized packet details from preceding warning: " 
  399                  "[\n" << packet->m_verbose_ostream_manip << 
"].");
 
  402    sock->m_snd_stats.low_lvl_packet_xfer_completed(packet_type_id, bytes_to_send, 0);
 
  409                               low_lvl_remote_endpoint,
 
  410                               [
this, sock, packet, bytes_to_send](
const Error_code& sys_err_code, 
size_t n_sent)
 
  418  using boost::chrono::microseconds;
 
  419  using boost::chrono::round;
 
  424  if (sent_pkt_it == sock->m_snd_flying_pkts_by_sent_when.past_oldest())
 
  428                     "sequence number [" << seq_num << 
"] but cannot find corresponding Sent_packet.  " 
  429                     "Cannot deal with some of the related data structures; still sending.  Bug?");
 
  449  const Fine_time_pt& now = sock->m_snd_last_data_sent_when = Fine_clock::now();
 
  450  const size_t cwnd = sock->m_snd_cong_ctl->congestion_window_bytes();
 
  458    last_send_attempt.m_sent_time = now;
 
  459    last_send_attempt.m_sent_cwnd_bytes = cwnd;
 
  460    const microseconds diff = round<microseconds>(now - prev_sent_when);
 
  463      (
"Sending/sent [DATA] packet over [" << sock << 
"] with " 
  464       "sequence number [" << seq_num << 
"] order_num [" << order_num << 
"].  Send timestamp changed from " 
  465       "[" << prev_sent_when << 
"] -> [" << now << 
"]; difference [" << diff << 
"].");
 
  470    last_send_attempt.m_sent_time = now;
 
  471    last_send_attempt.m_sent_cwnd_bytes = cwnd;
 
  477  drop_timer->start_contemporaneous_events();
 
  478  drop_timer->on_packet_in_flight(order_num);
 
  479  drop_timer->end_contemporaneous_events();
 
  492                               size_t bytes_expected_transferred, 
const Error_code& sys_err_code,
 
  493                               size_t bytes_transferred)
 
  495  using std::numeric_limits;
 
  499  const auto& packet_ref = *packet;
 
  500  const auto& packet_type_id = 
typeid(packet_ref);
 
  511                                     "[\n" << packet->m_verbose_ostream_manip << 
"].");
 
  516                                      "[\n" << packet->m_concise_ostream_manip << 
"].");
 
  551      sock->m_snd_stats.low_lvl_packet_xfer_completed(packet_type_id);
 
  560    sock->m_snd_stats.low_lvl_packet_xfer_completed(packet_type_id, bytes_expected_transferred, bytes_transferred);
 
  563  if (bytes_transferred != bytes_expected_transferred)
 
  571    FLOW_LOG_WARNING(
"Low-level packet sent, but only [" << bytes_transferred << 
"] of " 
  572                     "[" << bytes_expected_transferred << 
"] bytes " 
  573                     "were sent.  Internal error with packet size calculations?  More likely, did stack truncate?");
 
  583  using boost::asio::buffer;
 
  584  using boost::shared_ptr;
 
  589  auto rst_base = Low_lvl_packet::create_uninit_packet_base<Rst_packet>(
get_logger());
 
  591  rst_base->m_packed.m_src_port = causing_packet->m_packed.m_dst_port; 
 
  592  rst_base->m_packed.m_dst_port = causing_packet->m_packed.m_src_port;
 
  593  rst_base->m_opt_rexmit_on = 
false; 
 
  606  const auto& packet_ref = *packet;
 
  607  const auto& packet_type_id = 
typeid(packet_ref);
 
  609  sock->m_snd_stats.low_lvl_packet_xfer_requested(packet_type_id);
 
  612  packet->m_packed.m_src_port = sock->m_local_port;
 
  613  packet->m_packed.m_dst_port = sock->remote_endpoint().m_flow_port;
 
  614  packet->m_opt_rexmit_on = sock->rexmit_on();
 
  619  if ((!sock->opt(sock->m_opts.m_st_snd_pacing_enabled)) ||
 
  620      (sock->m_snd_smoothed_round_trip_time == Fine_duration::zero()) ||
 
  644  using boost::chrono::duration_cast;
 
  645  using boost::chrono::microseconds;
 
  646  using boost::static_pointer_cast;
 
  647  using boost::dynamic_pointer_cast;
 
  648  using boost::shared_ptr;
 
  652  const auto& packet_ref = *packet;
 
  653  const auto& packet_type_id = 
typeid(packet_ref);
 
  658  const bool is_data_packet = packet_type_id == 
typeid(
Data_packet);
 
  661  shared_ptr<const Data_packet> data;
 
  664    init_seq_num = static_pointer_cast<const Data_packet>(packet)->m_seq_num;
 
  668    const auto& acked_packets = static_pointer_cast<const Ack_packet>(packet)->m_rcv_acked_packets;
 
  669    if (!acked_packets.empty())
 
  671      init_seq_num = acked_packets.front()->m_seq_num;
 
  675  const bool q_was_empty = pacing.
m_packet_q.empty();
 
  682  FLOW_LOG_TRACE(
"Pacing: On [" << sock << 
"] packet of type [" << packet->m_type_ostream_manip << 
"] " 
  683                 "is newly available for sending; pushed onto queue; queue size [" << pacing.
m_packet_q.size() << 
"]; " 
  684                 "initial sequence number [" << init_seq_num << 
"].");
 
  688    const auto data = dynamic_pointer_cast<const Data_packet>(pacing.
m_packet_q.front());
 
  693    FLOW_LOG_TRACE(
"Pacing: On [" << sock << 
"]: was already in progress; queued and done.");
 
  709    FLOW_LOG_TRACE(
"Pacing: On [" << sock << 
"]: due to packet type, sending immediately since at head of queue; " 
  710                   "queue empty again.");
 
  734                   "slice [epoch+" << duration_cast<microseconds>(pacing.
m_slice_start.time_since_epoch()) << 
" " 
  735                   "over " << duration_cast<microseconds>(pacing.
m_slice_period) << 
"] is over.");
 
  759  using boost::chrono::duration_cast;
 
  760  using boost::chrono::microseconds;
 
  761  using boost::chrono::milliseconds;
 
  768  const Fine_duration& srtt = sock->m_snd_smoothed_round_trip_time;
 
  770  assert(srtt != Fine_duration::zero());
 
  780    = srtt * sock->max_block_size() / sock->m_snd_cong_ctl->congestion_window_bytes();
 
  781  if (slice_ideal_period == Fine_duration::zero())
 
  788  if (timer_min_period == Fine_duration::zero())
 
  796    const Fine_duration TIMER_MIN_PERIOD_DEFAULT = milliseconds(15);
 
  797    timer_min_period = TIMER_MIN_PERIOD_DEFAULT;
 
  802  pacing.
m_slice_period = max(slice_ideal_period, timer_min_period);
 
  813    = 
static_cast<size_t>(pacing.
m_slice_period * sock->max_block_size() / slice_ideal_period);
 
  821  const size_t QUEUE_SIZE_DRIFT_PREVENTION_PCT = 110; 
 
  829                 "slice [epoch+" << duration_cast<microseconds>(pacing.
m_slice_start.time_since_epoch()) << 
" " 
  830                 "over " << duration_cast<microseconds>(pacing.
m_slice_period) << 
"]; " 
  831                 "ideal slice period = [SRTT " << duration_cast<microseconds>(srtt) << 
"] / " 
  832                 "([cong_wnd " << sock->m_snd_cong_ctl->congestion_window_bytes() << 
"] / " 
  833                 "[max-block-size " << sock->max_block_size() << 
"]) = " 
  834                 "[" << duration_cast<microseconds>(slice_ideal_period) << 
"]; " 
  835                 "timer_min_period = [" << duration_cast<microseconds>(timer_min_period) << 
"]; " 
  836                 "bytes_allowed = max(ideal, min) / ideal * max-block-size * " 
  837                 "[" << QUEUE_SIZE_DRIFT_PREVENTION_PCT << 
"%] = " 
  843  using boost::chrono::milliseconds;
 
  844  using boost::chrono::round;
 
  845  using boost::shared_ptr;
 
  846  using boost::weak_ptr;
 
  847  using boost::static_pointer_cast;
 
  848  using boost::dynamic_pointer_cast;
 
  860  shared_ptr<const Data_packet> head_packet;
 
  870             >= (head_packet = static_pointer_cast<const Data_packet>(pacing.
m_packet_q.front()))->m_data.size()))
 
  879    FLOW_LOG_TRACE(
"Will send [" << head_packet->m_data.size() << 
"] bytes of data; budget now " 
  881                   "queue size now [" << (pacing.
m_packet_q.size() - 1) << 
"].");
 
  896           && (!(dynamic_pointer_cast<const Data_packet>(head_packet_base = pacing.
m_packet_q.front()))))
 
  898      FLOW_LOG_TRACE(
"Pacing: On [" << sock << 
"]: due to packet type, sending immediately since at head of queue; " 
  899                     "queue size now [" << (pacing.
m_packet_q.size() - 1) << 
"].");
 
  929                 "scheduling next processing at end of time slice " 
  930                 "in [" << round<milliseconds>(slice_end - Fine_clock::now()) << 
"].");
 
  933  pacing.
m_slice_timer.async_wait([
this, sock_observer = weak_ptr<Peer_socket>(sock)]
 
  936    auto sock = sock_observer.lock();
 
  964  assert(sys_err_code != boost::asio::error::operation_aborted);
 
  969  assert(!sock->m_snd_pacing_data.m_packet_q.empty());
 
  971  FLOW_LOG_TRACE(
"Pacing: On [" << sock << 
"]: slice end timer fired; creating new slice and processing queue.");
 
  993  auto rst = Low_lvl_packet::create_uninit_packet_base<Rst_packet>(
get_logger());
 
  999  using boost::asio::buffer;
 
 1004  auto rst = Low_lvl_packet::create_uninit_packet_base<Rst_packet>(
get_logger());
 
 1005  rst->m_packed.m_src_port = sock->m_local_port;
 
 1006  rst->m_packed.m_dst_port = sock->remote_endpoint().m_flow_port;
 
 1007  rst->m_opt_rexmit_on = 
false; 
 
 1011  const size_t size = rst->serialize_to_raw_data_and_log(&raw_bufs); 
 
 1014  const auto& rst_type_id = 
typeid(
Rst_packet); 
 
 1015  sock->m_snd_stats.low_lvl_packet_xfer_requested(rst_type_id);
 
 1016  sock->m_snd_stats.low_lvl_packet_xfer_called(rst_type_id, 
false, size);
 
 1024                     "serialized size [" << size << 
"] exceeds limit [" << limit << 
"]; " 
 1025                     "check max-block-size and low-lvl-max-packet-size options!  Serialized packet: " 
 1026                     "[\n" << rst->m_concise_ostream_manip << 
"].");
 
 1028    sock->m_snd_stats.low_lvl_packet_xfer_completed(rst_type_id, size, 0); 
 
 1035                                                    sock->remote_endpoint().m_udp_endpoint, 0, sys_err_code);
 
 1039      sock->m_snd_stats.low_lvl_packet_xfer_completed(rst_type_id);
 
 1043      sock->m_snd_stats.low_lvl_packet_xfer_completed(rst_type_id, size, size_sent);
 
 1050  m_bytes_allowed_this_slice(0),
 
 1051  m_slice_timer(*task_engine)
 
const Component & get_log_component() const
Returns reference to the stored Component object, particularly as many FLOW_LOG_*() macros expect.
Logger * get_logger() const
Returns the stored Logger pointer, particularly as many FLOW_LOG_*() macros expect.
void low_lvl_packet_sent(Peer_socket::Ptr sock, Low_lvl_packet::Const_ptr packet, size_t bytes_expected_transferred, const Error_code &sys_err_code, size_t bytes_transferred)
Completion handler for async_low_lvl_packet_send_impl(); called when the packet is either successfull...
void sock_pacing_new_packet_ready(Peer_socket::Ptr sock, Low_lvl_packet::Ptr packet)
async_sock_low_lvl_packet_send_paced() pacing helper: Handles a DATA or ACK packet that was just pass...
void async_no_sock_low_lvl_packet_send(const util::Udp_endpoint &low_lvl_remote_endpoint, Low_lvl_packet::Const_ptr packet)
async_low_lvl_packet_send_impl() wrapper to call when packet is to be sent to the remote side of the ...
void async_sock_low_lvl_packet_send(Peer_socket::Ptr sock, Low_lvl_packet::Const_ptr &&packet, bool delayed_by_pacing)
async_low_lvl_packet_send_impl() wrapper to call when packet is to be sent to the remote side of the ...
void perform_accumulated_on_recv_tasks()
Performs all tasks to be performed at the end of low_lvl_recv_and_handle() or async part of async_wai...
void handle_incoming(util::Blob *packet_data, const util::Udp_endpoint &low_lvl_remote_endpoint)
Handles a just-received, not-yet-deserialized low-level packet.
void sync_sock_low_lvl_rst_send(Peer_socket::Ptr sock)
Sends an RST to the other side of the given socket, synchronously.
void mark_data_packet_sent(Peer_socket::Ptr sock, const Sequence_number &seq_num)
Performs important book-keeping based on the event "DATA packet was sent to destination....
void sock_pacing_time_slice_end(Peer_socket::Ptr sock, const Error_code &sys_err_code)
async_sock_low_lvl_packet_send_paced() pacing helper: If sock_pacing_process_q() ran out of the last ...
boost::shared_ptr< Net_env_simulator > m_net_env_sim
The object used to simulate stuff like packet loss and latency via local means directly in the code.
void async_wait_latency_then_handle_incoming(const Fine_duration &latency, util::Blob *packet_data, const util::Udp_endpoint &low_lvl_remote_endpoint)
Sets up handle_incoming(packet_data, low_lvl_remote_endpoint) to be called asynchronously after a spe...
Opt_type opt(const Opt_type &opt_val_ref) const
Obtain a copy of the value of a given option in a thread-safe manner.
void sock_pacing_process_q(Peer_socket::Ptr sock, bool executing_after_delay)
async_sock_low_lvl_packet_send_paced() pacing helper: Given that we are currently in the pacing time ...
void async_no_sock_low_lvl_rst_send(Low_lvl_packet::Const_ptr causing_packet, const util::Udp_endpoint &low_lvl_remote_endpoint)
Sends an RST to the given UDP endpoint in response to the given incoming low-level packet that came f...
unsigned int handle_incoming_with_simulation(util::Blob *packet_data, const util::Udp_endpoint &low_lvl_remote_endpoint, bool is_sim_duplicate_packet=false)
Helper for low_lvl_recv_and_handle() that calls handle_incoming() on the not-yet-deserialized low-lev...
void low_lvl_recv_and_handle(Error_code sys_err_code)
Handles the pre-condition that m_low_lvl_sock has a UDP packet available for reading,...
Udp_socket m_low_lvl_sock
The UDP socket used to receive low-level packets (to assemble into application layer data) and send t...
void async_low_lvl_packet_send_impl(const util::Udp_endpoint &low_lvl_remote_endpoint, Low_lvl_packet::Const_ptr packet, bool delayed_by_pacing, Peer_socket::Ptr sock)
Takes given low-level packet structure, serializes it, and initiates asynchronous send of these data ...
util::Blob m_packet_data
Stores incoming raw packet data; re-used repeatedly for possible performance gains.
void async_sock_low_lvl_rst_send(Peer_socket::Ptr sock)
Sends an RST to the other side of the given socket asynchronously when possible.
Node_options m_opts
This Node's global set of options.
void async_low_lvl_recv()
Registers so that during the current or next m_task_engine.run(), the latter will wait for a receivab...
void async_sock_low_lvl_packet_send_paced(const Peer_socket::Ptr &sock, Low_lvl_packet::Ptr &&packet)
Begins the process of asynchronously sending the given low-level packet to the remote Node specified ...
util::Task_engine m_task_engine
The main loop engine, functioning in the single-threaded-but-asynchronous callback-based "reactor" st...
void sock_pacing_new_time_slice(Peer_socket::Ptr sock, const Fine_time_pt &now)
async_sock_low_lvl_packet_send_paced() pacing helper: Resets the socket's Send_pacing_data structure ...
@ S_CLOSED
Neither future reads nor writes are possible, AND Node has disowned the Peer_socket.
Sequence_number::seq_num_t order_num_t
Short-hand for order number type. 0 is reserved. Caution: Keep in sync with Drop_timer::packet_id_t.
Sent_pkt_by_sent_when_map::iterator Sent_pkt_ordered_by_when_iter
Short-hand for m_snd_flying_pkts_by_sent_when iterator type.
An internal net_flow sequence number identifying a piece of data.
boost::shared_ptr< Peer_socket > Ptr
Short-hand for ref-counted pointer to mutable values of type Target_type::element_type (a-la T*).
Const_target_ptr Const_ptr
Short-hand for ref-counted pointer to immutable values of type Target_type::element_type (a-la T cons...
#define FLOW_ERROR_SYS_ERROR_LOG_WARNING()
Logs a warning about the (often errno-based or from a library) error code in sys_err_code.
#define FLOW_LOG_DATA(ARG_stream_fragment)
Logs a DATA message into flow::log::Logger *get_logger() with flow::log::Component get_log_component(...
#define FLOW_LOG_WARNING(ARG_stream_fragment)
Logs a WARNING message into flow::log::Logger *get_logger() with flow::log::Component get_log_compone...
#define FLOW_LOG_TRACE_WITHOUT_CHECKING(ARG_stream_fragment)
Logs a TRACE message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
#define FLOW_LOG_DATA_WITHOUT_CHECKING(ARG_stream_fragment)
Logs a DATA message into flow::log::Logger *get_logger() with flow::log::Component get_log_component(...
#define FLOW_LOG_TRACE(ARG_stream_fragment)
Logs a TRACE message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
@ S_DATA
Message satisfies Sev::S_TRACE description AND contains variable-length structure (like packet,...
@ S_TRACE
Message indicates any condition that may occur with great frequency (thus verbose if logged).
Flow module containing the API and implementation of the Flow network protocol, a TCP-inspired stream...
Scheduled_task_handle schedule_task_from_now(log::Logger *logger_ptr, const Fine_duration &from_now, bool single_threaded, Task_engine *task_engine, Scheduled_task_handler &&task_body_moved)
Schedule the given function to execute in a certain amount of time: A handy wrapper around Timer (asi...
boost::asio::io_context Task_engine
Short-hand for boost.asio event service, the central class of boost.asio.
Blob_with_log_context<> Blob
A concrete Blob_with_log_context that compile-time-disables Basic_blob::share() and the sharing API d...
boost::asio::ip::udp::endpoint Udp_endpoint
Short-hand for the UDP endpoint (IP/port) type.
boost::system::error_code Error_code
Short-hand for a boost.system error code (which basically encapsulates an integer/enum error code and...
Fine_clock::duration Fine_duration
A high-res time duration as computed from two Fine_time_pts.
Fine_clock::time_point Fine_time_pt
A high-res time point as returned by Fine_clock::now() and suitable for precise time math in general.
Internal net_flow struct that encapsulates the Flow-protocol low-level ACK packet.
Internal net_flow struct that encapsulates the Flow-protocol low-level DATA packet.
std::vector< Const_buffer > Const_buffer_sequence
Short-hand for sequence of immutable buffers; i.e., a sequence of 1 or more scattered areas in memory...
size_t m_dyn_low_lvl_max_packet_size
Any incoming low-level (UDP) packet will be truncated to this size.
unsigned int m_dyn_max_packets_per_main_loop_iteration
The UDP net-stack may deliver 2 or more datagrams to the Flow Node at the same time.
Fine_duration m_st_timer_min_period
A time period such that the boost.asio timer implementation for this platform is able to accurately a...
Metadata (and data, if retransmission is on) for a packet that has been sent one (if retransmission i...
std::vector< Sent_when > m_sent_when
Time stamps, order numbers, and other info at the times when the different attempts (including origin...
Internal net_flow struct that encapsulates the Flow-protocol low-level RST packet.
The current outgoing packet pacing state, including queue of low-level packets to be sent,...
util::Timer m_slice_timer
When running, m_packet_q is non-empty, m_bytes_allowed_this_slice < data size of m_packet_q....
size_t m_bytes_allowed_this_slice
This many bytes worth of DATA packets may still be sent, at this time, within the time slice defined ...
Send_pacing_data(util::Task_engine *task_engine)
Initializes data to initial state (no active time slice).
Packet_q m_packet_q
Queue of low-level packets to be sent to the remote endpoint, in order in which they are to be sent,...
Fine_duration m_slice_period
The length of the current pacing time slice period; this depends on congestion window and SRTT on the...
Fine_time_pt m_slice_start
The time point at which the last pacing time slice began; or epoch if no packets sent so far (i....