43 using boost::asio::buffer;
48 unsigned int handled_packet_count = 0;
51 unsigned int recvd_packet_count = 0;
87 size_t packet_size = 0;
95 if ((!packet_data.zero()) && (packet_data.capacity() < max_packet_size))
97 packet_data.make_zero();
99 packet_data.resize(max_packet_size);
103 packet_size =
m_low_lvl_sock.receive_from(buffer(packet_data.data(), packet_data.size()),
104 low_lvl_remote_endpoint, 0, sys_err_code);
107 assert(packet_size <= packet_data.size());
108 packet_data.resize(packet_size);
111 "[UDP " << low_lvl_remote_endpoint <<
"]:");
114 ++recvd_packet_count;
123 else if (sys_err_code != boost::asio::error::would_block)
144 assert(packet_size == 0);
148 assert(packet_size == 0);
152 while ((packet_size != 0)
153 && ((recvd_packet_count_limit == 0) || (recvd_packet_count < recvd_packet_count_limit)));
161 FLOW_LOG_TRACE(
"Handled a total of [" << handled_packet_count <<
"] incoming packets "
162 "out of [" << recvd_packet_count <<
"] received (limit [" << recvd_packet_count_limit <<
"]) "
163 "in this boost.asio handler.");
183 bool is_sim_duplicate_packet)
186 using boost::chrono::milliseconds;
187 using boost::chrono::round;
191 unsigned int handled = 0;
207 packet_data_copy = *(
static_cast<const Blob*
>(packet_data));
211 if (latency == Fine_duration::zero())
222 FLOW_LOG_TRACE(
"SIMULATION: Delaying reception of packet by simulated latency "
223 "[" << round<milliseconds>(latency) <<
"].");
236 low_lvl_remote_endpoint,
true);
255 using boost::chrono::milliseconds;
256 using boost::chrono::round;
257 using boost::shared_ptr;
266 shared_ptr<Blob> packet_data_moved_ptr{
new Blob{std::move(*packet_data)}};
272 [
this, packet_data_moved_ptr, low_lvl_remote_endpoint, latency, started_at]
278 (
"SIMULATOR: Handling low-level packet after "
279 "simulated latency [" << round<milliseconds>(latency) <<
"]; "
280 "actual simulated latency was "
281 "[" << round<milliseconds>(Fine_clock::now() - started_at) <<
"]; "
282 "from [UDP " << low_lvl_remote_endpoint <<
"].");
285 Blob packet_data_moved_again(std::move(*packet_data_moved_ptr));
297 "out of a simulated [1] received in this boost.asio handler.");
302 bool delayed_by_pacing)
319 using boost::asio::buffer;
322 const auto& packet_ref = *packet;
323 const auto& packet_type_id =
typeid(packet_ref);
383 const size_t bytes_to_send = packet->serialize_to_raw_data_and_log(&raw_bufs);
384 assert(bytes_to_send != 0);
389 sock->m_snd_stats.low_lvl_packet_xfer_called(packet_type_id, delayed_by_pacing, bytes_to_send);
393 if (bytes_to_send > limit)
396 FLOW_LOG_WARNING(
"Tried to send low-level packet but before doing so detected "
397 "serialized size [" << bytes_to_send <<
"] exceeds limit [" << limit <<
"]; "
398 "check max-block-size and low-lvl-max-packet-size options! Serialized packet: "
399 "[\n" << packet->m_concise_ostream_manip <<
"].");
401 FLOW_LOG_DATA(
"Detailed serialized packet details from preceding warning: "
402 "[\n" << packet->m_verbose_ostream_manip <<
"].");
405 assert(sock &&
"Really? A giant low-level packet that is not even DATA? Bug.");
406 sock->m_snd_stats.low_lvl_packet_xfer_completed(packet_type_id, bytes_to_send, 0);
413 low_lvl_remote_endpoint,
414 [
this, sock, packet, bytes_to_send](
const Error_code& sys_err_code,
size_t n_sent)
422 using boost::chrono::microseconds;
423 using boost::chrono::round;
428 if (sent_pkt_it == sock->m_snd_flying_pkts_by_sent_when.past_oldest())
432 "sequence number [" << seq_num <<
"] but cannot find corresponding Sent_packet. "
433 "Cannot deal with some of the related data structures; still sending. Bug?");
453 const Fine_time_pt& now = sock->m_snd_last_data_sent_when = Fine_clock::now();
454 const size_t cwnd = sock->m_snd_cong_ctl->congestion_window_bytes();
462 last_send_attempt.m_sent_time = now;
463 last_send_attempt.m_sent_cwnd_bytes = cwnd;
464 const microseconds diff = round<microseconds>(now - prev_sent_when);
467 (
"Sending/sent [DATA] packet over [" << sock <<
"] with "
468 "sequence number [" << seq_num <<
"] order_num [" << order_num <<
"]. Send timestamp changed from "
469 "[" << prev_sent_when <<
"] -> [" << now <<
"]; difference [" << diff <<
"].");
474 last_send_attempt.m_sent_time = now;
475 last_send_attempt.m_sent_cwnd_bytes = cwnd;
481 drop_timer->start_contemporaneous_events();
482 drop_timer->on_packet_in_flight(order_num);
483 drop_timer->end_contemporaneous_events();
496 size_t bytes_expected_transferred,
const Error_code& sys_err_code,
497 size_t bytes_transferred)
499 using std::numeric_limits;
503 const auto& packet_ref = *packet;
504 const auto& packet_type_id =
typeid(packet_ref);
515 "[\n" << packet->m_verbose_ostream_manip <<
"].");
520 "[\n" << packet->m_concise_ostream_manip <<
"].");
555 sock->m_snd_stats.low_lvl_packet_xfer_completed(packet_type_id);
564 sock->m_snd_stats.low_lvl_packet_xfer_completed(packet_type_id, bytes_expected_transferred, bytes_transferred);
567 if (bytes_transferred != bytes_expected_transferred)
575 FLOW_LOG_WARNING(
"Low-level packet sent, but only [" << bytes_transferred <<
"] of "
576 "[" << bytes_expected_transferred <<
"] bytes "
577 "were sent. Internal error with packet size calculations? More likely, did stack truncate?");
587 using boost::asio::buffer;
588 using boost::shared_ptr;
593 auto rst_base = Low_lvl_packet::create_uninit_packet_base<Rst_packet>(
get_logger());
595 rst_base->m_packed.m_src_port = causing_packet->m_packed.m_dst_port;
596 rst_base->m_packed.m_dst_port = causing_packet->m_packed.m_src_port;
597 rst_base->m_opt_rexmit_on =
false;
610 const auto& packet_ref = *packet;
611 const auto& packet_type_id =
typeid(packet_ref);
613 sock->m_snd_stats.low_lvl_packet_xfer_requested(packet_type_id);
616 packet->m_packed.m_src_port = sock->m_local_port;
617 packet->m_packed.m_dst_port = sock->remote_endpoint().m_flow_port;
618 packet->m_opt_rexmit_on = sock->rexmit_on();
623 if ((!sock->opt(sock->m_opts.m_st_snd_pacing_enabled)) ||
624 (sock->m_snd_smoothed_round_trip_time == Fine_duration::zero()) ||
648 using boost::chrono::duration_cast;
649 using boost::chrono::microseconds;
650 using boost::static_pointer_cast;
651 using boost::dynamic_pointer_cast;
652 using boost::shared_ptr;
656 const auto& packet_ref = *packet;
657 const auto& packet_type_id =
typeid(packet_ref);
662 const bool is_data_packet = packet_type_id ==
typeid(
Data_packet);
665 shared_ptr<const Data_packet> data;
668 init_seq_num = static_pointer_cast<const Data_packet>(packet)->m_seq_num;
672 const auto& acked_packets = static_pointer_cast<const Ack_packet>(packet)->m_rcv_acked_packets;
673 if (!acked_packets.empty())
675 init_seq_num = acked_packets.front()->m_seq_num;
679 const bool q_was_empty = pacing.
m_packet_q.empty();
686 FLOW_LOG_TRACE(
"Pacing: On [" << sock <<
"] packet of type [" << packet->m_type_ostream_manip <<
"] "
687 "is newly available for sending; pushed onto queue; queue size [" << pacing.
m_packet_q.size() <<
"]; "
688 "initial sequence number [" << init_seq_num <<
"].");
692 const auto data = dynamic_pointer_cast<const Data_packet>(pacing.
m_packet_q.front());
697 FLOW_LOG_TRACE(
"Pacing: On [" << sock <<
"]: was already in progress; queued and done.");
713 FLOW_LOG_TRACE(
"Pacing: On [" << sock <<
"]: due to packet type, sending immediately since at head of queue; "
714 "queue empty again.");
738 "slice [epoch+" << duration_cast<microseconds>(pacing.
m_slice_start.time_since_epoch()) <<
" "
739 "over " << duration_cast<microseconds>(pacing.
m_slice_period) <<
"] is over.");
763 using boost::chrono::duration_cast;
764 using boost::chrono::microseconds;
765 using boost::chrono::milliseconds;
772 const Fine_duration& srtt = sock->m_snd_smoothed_round_trip_time;
774 assert(srtt != Fine_duration::zero());
784 = srtt * sock->max_block_size() / sock->m_snd_cong_ctl->congestion_window_bytes();
785 if (slice_ideal_period == Fine_duration::zero())
792 if (timer_min_period == Fine_duration::zero())
800 const Fine_duration TIMER_MIN_PERIOD_DEFAULT = milliseconds{15};
801 timer_min_period = TIMER_MIN_PERIOD_DEFAULT;
806 pacing.
m_slice_period = max(slice_ideal_period, timer_min_period);
817 =
static_cast<size_t>(pacing.
m_slice_period * sock->max_block_size() / slice_ideal_period);
825 const size_t QUEUE_SIZE_DRIFT_PREVENTION_PCT = 110;
833 "slice [epoch+" << duration_cast<microseconds>(pacing.
m_slice_start.time_since_epoch()) <<
" "
834 "over " << duration_cast<microseconds>(pacing.
m_slice_period) <<
"]; "
835 "ideal slice period = [SRTT " << duration_cast<microseconds>(srtt) <<
"] / "
836 "([cong_wnd " << sock->m_snd_cong_ctl->congestion_window_bytes() <<
"] / "
837 "[max-block-size " << sock->max_block_size() <<
"]) = "
838 "[" << duration_cast<microseconds>(slice_ideal_period) <<
"]; "
839 "timer_min_period = [" << duration_cast<microseconds>(timer_min_period) <<
"]; "
840 "bytes_allowed = max(ideal, min) / ideal * max-block-size * "
841 "[" << QUEUE_SIZE_DRIFT_PREVENTION_PCT <<
"%] = "
847 using boost::chrono::milliseconds;
848 using boost::chrono::round;
849 using boost::shared_ptr;
850 using boost::weak_ptr;
851 using boost::static_pointer_cast;
852 using boost::dynamic_pointer_cast;
864 shared_ptr<const Data_packet> head_packet;
874 >= (head_packet = static_pointer_cast<const Data_packet>(pacing.
m_packet_q.front()))->m_data.size()))
883 FLOW_LOG_TRACE(
"Will send [" << head_packet->m_data.size() <<
"] bytes of data; budget now "
885 "queue size now [" << (pacing.
m_packet_q.size() - 1) <<
"].");
900 && (!(dynamic_pointer_cast<const Data_packet>(head_packet_base = pacing.
m_packet_q.front()))))
902 FLOW_LOG_TRACE(
"Pacing: On [" << sock <<
"]: due to packet type, sending immediately since at head of queue; "
903 "queue size now [" << (pacing.
m_packet_q.size() - 1) <<
"].");
933 "scheduling next processing at end of time slice "
934 "in [" << round<milliseconds>(slice_end - Fine_clock::now()) <<
"].");
937 pacing.
m_slice_timer.async_wait([
this, sock_observer = weak_ptr<Peer_socket>(sock)]
940 auto sock = sock_observer.lock();
968 assert(sys_err_code != boost::asio::error::operation_aborted);
973 assert(!sock->m_snd_pacing_data.m_packet_q.empty());
975 FLOW_LOG_TRACE(
"Pacing: On [" << sock <<
"]: slice end timer fired; creating new slice and processing queue.");
997 auto rst = Low_lvl_packet::create_uninit_packet_base<Rst_packet>(
get_logger());
1003 using boost::asio::buffer;
1008 auto rst = Low_lvl_packet::create_uninit_packet_base<Rst_packet>(
get_logger());
1009 rst->m_packed.m_src_port = sock->m_local_port;
1010 rst->m_packed.m_dst_port = sock->remote_endpoint().m_flow_port;
1011 rst->m_opt_rexmit_on =
false;
1015 const size_t size = rst->serialize_to_raw_data_and_log(&raw_bufs);
1018 const auto& rst_type_id =
typeid(
Rst_packet);
1019 sock->m_snd_stats.low_lvl_packet_xfer_requested(rst_type_id);
1020 sock->m_snd_stats.low_lvl_packet_xfer_called(rst_type_id,
false, size);
1028 "serialized size [" << size <<
"] exceeds limit [" << limit <<
"]; "
1029 "check max-block-size and low-lvl-max-packet-size options! Serialized packet: "
1030 "[\n" << rst->m_concise_ostream_manip <<
"].");
1032 sock->m_snd_stats.low_lvl_packet_xfer_completed(rst_type_id, size, 0);
1039 sock->remote_endpoint().m_udp_endpoint, 0, sys_err_code);
1043 sock->m_snd_stats.low_lvl_packet_xfer_completed(rst_type_id);
1047 sock->m_snd_stats.low_lvl_packet_xfer_completed(rst_type_id, size, size_sent);
1054 m_bytes_allowed_this_slice(0),
1055 m_slice_timer(*task_engine)
const Component & get_log_component() const
Returns reference to the stored Component object, particularly as many FLOW_LOG_*() macros expect.
Logger * get_logger() const
Returns the stored Logger pointer, particularly as many FLOW_LOG_*() macros expect.
void low_lvl_packet_sent(Peer_socket::Ptr sock, Low_lvl_packet::Const_ptr packet, size_t bytes_expected_transferred, const Error_code &sys_err_code, size_t bytes_transferred)
Completion handler for async_low_lvl_packet_send_impl(); called when the packet is either successfull...
void sock_pacing_new_packet_ready(Peer_socket::Ptr sock, Low_lvl_packet::Ptr packet)
async_sock_low_lvl_packet_send_paced() pacing helper: Handles a DATA or ACK packet that was just pass...
void async_no_sock_low_lvl_packet_send(const util::Udp_endpoint &low_lvl_remote_endpoint, Low_lvl_packet::Const_ptr packet)
async_low_lvl_packet_send_impl() wrapper to call when packet is to be sent to the remote side of the ...
void async_sock_low_lvl_packet_send(Peer_socket::Ptr sock, Low_lvl_packet::Const_ptr &&packet, bool delayed_by_pacing)
async_low_lvl_packet_send_impl() wrapper to call when packet is to be sent to the remote side of the ...
void perform_accumulated_on_recv_tasks()
Performs all tasks to be performed at the end of low_lvl_recv_and_handle() or async part of async_wai...
void handle_incoming(util::Blob *packet_data, const util::Udp_endpoint &low_lvl_remote_endpoint)
Handles a just-received, not-yet-deserialized low-level packet.
void sync_sock_low_lvl_rst_send(Peer_socket::Ptr sock)
Sends an RST to the other side of the given socket, synchronously.
void mark_data_packet_sent(Peer_socket::Ptr sock, const Sequence_number &seq_num)
Performs important book-keeping based on the event "DATA packet was sent to destination....
void sock_pacing_time_slice_end(Peer_socket::Ptr sock, const Error_code &sys_err_code)
async_sock_low_lvl_packet_send_paced() pacing helper: If sock_pacing_process_q() ran out of the last ...
boost::shared_ptr< Net_env_simulator > m_net_env_sim
The object used to simulate stuff like packet loss and latency via local means directly in the code.
void async_wait_latency_then_handle_incoming(const Fine_duration &latency, util::Blob *packet_data, const util::Udp_endpoint &low_lvl_remote_endpoint)
Sets up handle_incoming(packet_data, low_lvl_remote_endpoint) to be called asynchronously after a spe...
Opt_type opt(const Opt_type &opt_val_ref) const
Obtain a copy of the value of a given option in a thread-safe manner.
void sock_pacing_process_q(Peer_socket::Ptr sock, bool executing_after_delay)
async_sock_low_lvl_packet_send_paced() pacing helper: Given that we are currently in the pacing time ...
void async_no_sock_low_lvl_rst_send(Low_lvl_packet::Const_ptr causing_packet, const util::Udp_endpoint &low_lvl_remote_endpoint)
Sends an RST to the given UDP endpoint in response to the given incoming low-level packet that came f...
unsigned int handle_incoming_with_simulation(util::Blob *packet_data, const util::Udp_endpoint &low_lvl_remote_endpoint, bool is_sim_duplicate_packet=false)
Helper for low_lvl_recv_and_handle() that calls handle_incoming() on the not-yet-deserialized low-lev...
void low_lvl_recv_and_handle(Error_code sys_err_code)
Handles the pre-condition that m_low_lvl_sock has a UDP packet available for reading,...
Udp_socket m_low_lvl_sock
The UDP socket used to receive low-level packets (to assemble into application layer data) and send t...
void async_low_lvl_packet_send_impl(const util::Udp_endpoint &low_lvl_remote_endpoint, Low_lvl_packet::Const_ptr packet, bool delayed_by_pacing, Peer_socket::Ptr sock)
Takes given low-level packet structure, serializes it, and initiates asynchronous send of these data ...
util::Blob m_packet_data
Stores incoming raw packet data; re-used repeatedly for possible performance gains.
void async_sock_low_lvl_rst_send(Peer_socket::Ptr sock)
Sends an RST to the other side of the given socket asynchronously when possible.
Node_options m_opts
This Node's global set of options.
void async_low_lvl_recv()
Registers so that during the current or next m_task_engine.run(), the latter will wait for a receivab...
void async_sock_low_lvl_packet_send_paced(const Peer_socket::Ptr &sock, Low_lvl_packet::Ptr &&packet)
Begins the process of asynchronously sending the given low-level packet to the remote Node specified ...
util::Task_engine m_task_engine
The main loop engine, functioning in the single-threaded-but-asynchronous callback-based "reactor" st...
void sock_pacing_new_time_slice(Peer_socket::Ptr sock, const Fine_time_pt &now)
async_sock_low_lvl_packet_send_paced() pacing helper: Resets the socket's Send_pacing_data structure ...
@ S_CLOSED
Neither future reads nor writes are possible, AND Node has disowned the Peer_socket.
Sequence_number::seq_num_t order_num_t
Short-hand for order number type. 0 is reserved. Caution: Keep in sync with Drop_timer::packet_id_t.
Sent_pkt_by_sent_when_map::iterator Sent_pkt_ordered_by_when_iter
Short-hand for m_snd_flying_pkts_by_sent_when iterator type.
An internal net_flow sequence number identifying a piece of data.
boost::shared_ptr< Peer_socket > Ptr
Short-hand for ref-counted pointer to mutable values of type Target_type::element_type (a-la T*).
Const_target_ptr Const_ptr
Short-hand for ref-counted pointer to immutable values of type Target_type::element_type (a-la T cons...
#define FLOW_ERROR_SYS_ERROR_LOG_WARNING()
Logs a warning about the (often errno-based or from a library) error code in sys_err_code.
#define FLOW_LOG_DATA(ARG_stream_fragment)
Logs a DATA message into flow::log::Logger *get_logger() with flow::log::Component get_log_component(...
#define FLOW_LOG_WARNING(ARG_stream_fragment)
Logs a WARNING message into flow::log::Logger *get_logger() with flow::log::Component get_log_compone...
#define FLOW_LOG_TRACE_WITHOUT_CHECKING(ARG_stream_fragment)
Logs a TRACE message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
#define FLOW_LOG_DATA_WITHOUT_CHECKING(ARG_stream_fragment)
Logs a DATA message into flow::log::Logger *get_logger() with flow::log::Component get_log_component(...
#define FLOW_LOG_TRACE(ARG_stream_fragment)
Logs a TRACE message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
@ S_DATA
Message satisfies Sev::S_TRACE description AND contains variable-length structure (like packet,...
@ S_TRACE
Message indicates any condition that may occur with great frequency (thus verbose if logged).
Flow module containing the API and implementation of the Flow network protocol, a TCP-inspired stream...
Scheduled_task_handle schedule_task_from_now(log::Logger *logger_ptr, const Fine_duration &from_now, bool single_threaded, Task_engine *task_engine, Scheduled_task_handler &&task_body_moved)
Schedule the given function to execute in a certain amount of time: A handy wrapper around Timer (asi...
boost::asio::io_context Task_engine
Short-hand for boost.asio event service, the central class of boost.asio.
Blob_with_log_context<> Blob
A concrete Blob_with_log_context that compile-time-disables Basic_blob::share() and the sharing API d...
boost::asio::ip::udp::endpoint Udp_endpoint
Short-hand for the UDP endpoint (IP/port) type.
boost::system::error_code Error_code
Short-hand for a boost.system error code (which basically encapsulates an integer/enum error code and...
Fine_clock::duration Fine_duration
A high-res time duration as computed from two Fine_time_pts.
Fine_clock::time_point Fine_time_pt
A high-res time point as returned by Fine_clock::now() and suitable for precise time math in general.
Internal net_flow struct that encapsulates the Flow-protocol low-level ACK packet.
Internal net_flow struct that encapsulates the Flow-protocol low-level DATA packet.
std::vector< Const_buffer > Const_buffer_sequence
Short-hand for sequence of immutable buffers; i.e., a sequence of 1 or more scattered areas in memory...
size_t m_dyn_low_lvl_max_packet_size
Any incoming low-level (UDP) packet will be truncated to this size.
unsigned int m_dyn_max_packets_per_main_loop_iteration
The UDP net-stack may deliver 2 or more datagrams to the Flow Node at the same time.
Fine_duration m_st_timer_min_period
A time period such that the boost.asio timer implementation for this platform is able to accurately a...
Metadata (and data, if retransmission is on) for a packet that has been sent one (if retransmission i...
std::vector< Sent_when > m_sent_when
Time stamps, order numbers, and other info at the times when the different attempts (including origin...
Internal net_flow struct that encapsulates the Flow-protocol low-level RST packet.
The current outgoing packet pacing state, including queue of low-level packets to be sent,...
util::Timer m_slice_timer
When running, m_packet_q is non-empty, m_bytes_allowed_this_slice < data size of m_packet_q....
size_t m_bytes_allowed_this_slice
This many bytes worth of DATA packets may still be sent, at this time, within the time slice defined ...
Send_pacing_data(util::Task_engine *task_engine)
Initializes data to initial state (no active time slice).
Packet_q m_packet_q
Queue of low-level packets to be sent to the remote endpoint, in order in which they are to be sent,...
Fine_duration m_slice_period
The length of the current pacing time slice period; this depends on congestion window and SRTT on the...
Fine_time_pt m_slice_start
The time point at which the last pacing time slice began; or epoch if no packets sent so far (i....