43 using boost::asio::buffer;
48 unsigned int handled_packet_count = 0;
51 unsigned int recvd_packet_count = 0;
87 size_t packet_size = 0;
95 if ((!packet_data.zero()) && (packet_data.capacity() < max_packet_size))
97 packet_data.make_zero();
99 packet_data.resize(max_packet_size);
103 packet_size =
m_low_lvl_sock.receive_from(buffer(packet_data.data(), packet_data.size()),
104 low_lvl_remote_endpoint, 0, sys_err_code);
107 assert(packet_size <= packet_data.size());
108 packet_data.resize(packet_size);
111 "[UDP " << low_lvl_remote_endpoint <<
"]:");
114 ++recvd_packet_count;
123 else if (sys_err_code != boost::asio::error::would_block)
144 assert(packet_size == 0);
148 assert(packet_size == 0);
152 while ((packet_size != 0)
153 && ((recvd_packet_count_limit == 0) || (recvd_packet_count < recvd_packet_count_limit)));
161 FLOW_LOG_TRACE(
"Handled a total of [" << handled_packet_count <<
"] incoming packets "
162 "out of [" << recvd_packet_count <<
"] received (limit [" << recvd_packet_count_limit <<
"]) "
163 "in this boost.asio handler.");
183 bool is_sim_duplicate_packet)
186 using boost::chrono::milliseconds;
187 using boost::chrono::round;
191 unsigned int handled = 0;
207 packet_data_copy = *(
static_cast<const Blob*
>(packet_data));
211 if (latency == Fine_duration::zero())
222 FLOW_LOG_TRACE(
"SIMULATION: Delaying reception of packet by simulated latency "
223 "[" << round<milliseconds>(latency) <<
"].");
236 low_lvl_remote_endpoint,
true);
255 using boost::chrono::milliseconds;
256 using boost::chrono::round;
257 using boost::shared_ptr;
266 shared_ptr<Blob> packet_data_moved_ptr(
new Blob(std::move(*packet_data)));
272 [
this, packet_data_moved_ptr, low_lvl_remote_endpoint, latency, started_at]
278 (
"SIMULATOR: Handling low-level packet after "
279 "simulated latency [" << round<milliseconds>(latency) <<
"]; "
280 "actual simulated latency was "
281 "[" << round<milliseconds>(Fine_clock::now() - started_at) <<
"]; "
282 "from [UDP " << low_lvl_remote_endpoint <<
"].");
285 Blob packet_data_moved_again(std::move(*packet_data_moved_ptr));
297 "out of a simulated [1] received in this boost.asio handler.");
302 bool delayed_by_pacing)
319 using boost::asio::buffer;
322 const auto& packet_ref = *packet;
323 const auto& packet_type_id =
typeid(packet_ref);
383 const size_t bytes_to_send = packet->serialize_to_raw_data_and_log(&raw_bufs);
384 assert(bytes_to_send != 0);
387 sock->m_snd_stats.low_lvl_packet_xfer_called(packet_type_id, delayed_by_pacing, bytes_to_send);
390 if (bytes_to_send > limit)
393 FLOW_LOG_WARNING(
"Tried to send low-level packet but before doing so detected "
394 "serialized size [" << bytes_to_send <<
"] exceeds limit [" << limit <<
"]; "
395 "check max-block-size and low-lvl-max-packet-size options! Serialized packet: "
396 "[\n" << packet->m_concise_ostream_manip <<
"].");
398 FLOW_LOG_DATA(
"Detailed serialized packet details from preceding warning: "
399 "[\n" << packet->m_verbose_ostream_manip <<
"].");
402 sock->m_snd_stats.low_lvl_packet_xfer_completed(packet_type_id, bytes_to_send, 0);
409 low_lvl_remote_endpoint,
410 [
this, sock, packet, bytes_to_send](
const Error_code& sys_err_code,
size_t n_sent)
418 using boost::chrono::microseconds;
419 using boost::chrono::round;
424 if (sent_pkt_it == sock->m_snd_flying_pkts_by_sent_when.past_oldest())
428 "sequence number [" << seq_num <<
"] but cannot find corresponding Sent_packet. "
429 "Cannot deal with some of the related data structures; still sending. Bug?");
449 const Fine_time_pt& now = sock->m_snd_last_data_sent_when = Fine_clock::now();
450 const size_t cwnd = sock->m_snd_cong_ctl->congestion_window_bytes();
458 last_send_attempt.m_sent_time = now;
459 last_send_attempt.m_sent_cwnd_bytes = cwnd;
460 const microseconds diff = round<microseconds>(now - prev_sent_when);
463 (
"Sending/sent [DATA] packet over [" << sock <<
"] with "
464 "sequence number [" << seq_num <<
"] order_num [" << order_num <<
"]. Send timestamp changed from "
465 "[" << prev_sent_when <<
"] -> [" << now <<
"]; difference [" << diff <<
"].");
470 last_send_attempt.m_sent_time = now;
471 last_send_attempt.m_sent_cwnd_bytes = cwnd;
477 drop_timer->start_contemporaneous_events();
478 drop_timer->on_packet_in_flight(order_num);
479 drop_timer->end_contemporaneous_events();
492 size_t bytes_expected_transferred,
const Error_code& sys_err_code,
493 size_t bytes_transferred)
495 using std::numeric_limits;
499 const auto& packet_ref = *packet;
500 const auto& packet_type_id =
typeid(packet_ref);
511 "[\n" << packet->m_verbose_ostream_manip <<
"].");
516 "[\n" << packet->m_concise_ostream_manip <<
"].");
551 sock->m_snd_stats.low_lvl_packet_xfer_completed(packet_type_id);
560 sock->m_snd_stats.low_lvl_packet_xfer_completed(packet_type_id, bytes_expected_transferred, bytes_transferred);
563 if (bytes_transferred != bytes_expected_transferred)
571 FLOW_LOG_WARNING(
"Low-level packet sent, but only [" << bytes_transferred <<
"] of "
572 "[" << bytes_expected_transferred <<
"] bytes "
573 "were sent. Internal error with packet size calculations? More likely, did stack truncate?");
583 using boost::asio::buffer;
584 using boost::shared_ptr;
589 auto rst_base = Low_lvl_packet::create_uninit_packet_base<Rst_packet>(
get_logger());
591 rst_base->m_packed.m_src_port = causing_packet->m_packed.m_dst_port;
592 rst_base->m_packed.m_dst_port = causing_packet->m_packed.m_src_port;
593 rst_base->m_opt_rexmit_on =
false;
606 const auto& packet_ref = *packet;
607 const auto& packet_type_id =
typeid(packet_ref);
609 sock->m_snd_stats.low_lvl_packet_xfer_requested(packet_type_id);
612 packet->m_packed.m_src_port = sock->m_local_port;
613 packet->m_packed.m_dst_port = sock->remote_endpoint().m_flow_port;
614 packet->m_opt_rexmit_on = sock->rexmit_on();
619 if ((!sock->opt(sock->m_opts.m_st_snd_pacing_enabled)) ||
620 (sock->m_snd_smoothed_round_trip_time == Fine_duration::zero()) ||
644 using boost::chrono::duration_cast;
645 using boost::chrono::microseconds;
646 using boost::static_pointer_cast;
647 using boost::dynamic_pointer_cast;
648 using boost::shared_ptr;
652 const auto& packet_ref = *packet;
653 const auto& packet_type_id =
typeid(packet_ref);
658 const bool is_data_packet = packet_type_id ==
typeid(
Data_packet);
661 shared_ptr<const Data_packet> data;
664 init_seq_num = static_pointer_cast<const Data_packet>(packet)->m_seq_num;
668 const auto& acked_packets = static_pointer_cast<const Ack_packet>(packet)->m_rcv_acked_packets;
669 if (!acked_packets.empty())
671 init_seq_num = acked_packets.front()->m_seq_num;
675 const bool q_was_empty = pacing.
m_packet_q.empty();
682 FLOW_LOG_TRACE(
"Pacing: On [" << sock <<
"] packet of type [" << packet->m_type_ostream_manip <<
"] "
683 "is newly available for sending; pushed onto queue; queue size [" << pacing.
m_packet_q.size() <<
"]; "
684 "initial sequence number [" << init_seq_num <<
"].");
688 const auto data = dynamic_pointer_cast<const Data_packet>(pacing.
m_packet_q.front());
693 FLOW_LOG_TRACE(
"Pacing: On [" << sock <<
"]: was already in progress; queued and done.");
709 FLOW_LOG_TRACE(
"Pacing: On [" << sock <<
"]: due to packet type, sending immediately since at head of queue; "
710 "queue empty again.");
734 "slice [epoch+" << duration_cast<microseconds>(pacing.
m_slice_start.time_since_epoch()) <<
" "
735 "over " << duration_cast<microseconds>(pacing.
m_slice_period) <<
"] is over.");
759 using boost::chrono::duration_cast;
760 using boost::chrono::microseconds;
761 using boost::chrono::milliseconds;
768 const Fine_duration& srtt = sock->m_snd_smoothed_round_trip_time;
770 assert(srtt != Fine_duration::zero());
780 = srtt * sock->max_block_size() / sock->m_snd_cong_ctl->congestion_window_bytes();
781 if (slice_ideal_period == Fine_duration::zero())
788 if (timer_min_period == Fine_duration::zero())
796 const Fine_duration TIMER_MIN_PERIOD_DEFAULT = milliseconds(15);
797 timer_min_period = TIMER_MIN_PERIOD_DEFAULT;
802 pacing.
m_slice_period = max(slice_ideal_period, timer_min_period);
813 =
static_cast<size_t>(pacing.
m_slice_period * sock->max_block_size() / slice_ideal_period);
821 const size_t QUEUE_SIZE_DRIFT_PREVENTION_PCT = 110;
829 "slice [epoch+" << duration_cast<microseconds>(pacing.
m_slice_start.time_since_epoch()) <<
" "
830 "over " << duration_cast<microseconds>(pacing.
m_slice_period) <<
"]; "
831 "ideal slice period = [SRTT " << duration_cast<microseconds>(srtt) <<
"] / "
832 "([cong_wnd " << sock->m_snd_cong_ctl->congestion_window_bytes() <<
"] / "
833 "[max-block-size " << sock->max_block_size() <<
"]) = "
834 "[" << duration_cast<microseconds>(slice_ideal_period) <<
"]; "
835 "timer_min_period = [" << duration_cast<microseconds>(timer_min_period) <<
"]; "
836 "bytes_allowed = max(ideal, min) / ideal * max-block-size * "
837 "[" << QUEUE_SIZE_DRIFT_PREVENTION_PCT <<
"%] = "
843 using boost::chrono::milliseconds;
844 using boost::chrono::round;
845 using boost::shared_ptr;
846 using boost::weak_ptr;
847 using boost::static_pointer_cast;
848 using boost::dynamic_pointer_cast;
860 shared_ptr<const Data_packet> head_packet;
870 >= (head_packet = static_pointer_cast<const Data_packet>(pacing.
m_packet_q.front()))->m_data.size()))
879 FLOW_LOG_TRACE(
"Will send [" << head_packet->m_data.size() <<
"] bytes of data; budget now "
881 "queue size now [" << (pacing.
m_packet_q.size() - 1) <<
"].");
896 && (!(dynamic_pointer_cast<const Data_packet>(head_packet_base = pacing.
m_packet_q.front()))))
898 FLOW_LOG_TRACE(
"Pacing: On [" << sock <<
"]: due to packet type, sending immediately since at head of queue; "
899 "queue size now [" << (pacing.
m_packet_q.size() - 1) <<
"].");
929 "scheduling next processing at end of time slice "
930 "in [" << round<milliseconds>(slice_end - Fine_clock::now()) <<
"].");
933 pacing.
m_slice_timer.async_wait([
this, sock_observer = weak_ptr<Peer_socket>(sock)]
936 auto sock = sock_observer.lock();
964 assert(sys_err_code != boost::asio::error::operation_aborted);
969 assert(!sock->m_snd_pacing_data.m_packet_q.empty());
971 FLOW_LOG_TRACE(
"Pacing: On [" << sock <<
"]: slice end timer fired; creating new slice and processing queue.");
993 auto rst = Low_lvl_packet::create_uninit_packet_base<Rst_packet>(
get_logger());
999 using boost::asio::buffer;
1004 auto rst = Low_lvl_packet::create_uninit_packet_base<Rst_packet>(
get_logger());
1005 rst->m_packed.m_src_port = sock->m_local_port;
1006 rst->m_packed.m_dst_port = sock->remote_endpoint().m_flow_port;
1007 rst->m_opt_rexmit_on =
false;
1011 const size_t size = rst->serialize_to_raw_data_and_log(&raw_bufs);
1014 const auto& rst_type_id =
typeid(
Rst_packet);
1015 sock->m_snd_stats.low_lvl_packet_xfer_requested(rst_type_id);
1016 sock->m_snd_stats.low_lvl_packet_xfer_called(rst_type_id,
false, size);
1024 "serialized size [" << size <<
"] exceeds limit [" << limit <<
"]; "
1025 "check max-block-size and low-lvl-max-packet-size options! Serialized packet: "
1026 "[\n" << rst->m_concise_ostream_manip <<
"].");
1028 sock->m_snd_stats.low_lvl_packet_xfer_completed(rst_type_id, size, 0);
1035 sock->remote_endpoint().m_udp_endpoint, 0, sys_err_code);
1039 sock->m_snd_stats.low_lvl_packet_xfer_completed(rst_type_id);
1043 sock->m_snd_stats.low_lvl_packet_xfer_completed(rst_type_id, size, size_sent);
1050 m_bytes_allowed_this_slice(0),
1051 m_slice_timer(*task_engine)
const Component & get_log_component() const
Returns reference to the stored Component object, particularly as many FLOW_LOG_*() macros expect.
Logger * get_logger() const
Returns the stored Logger pointer, particularly as many FLOW_LOG_*() macros expect.
void low_lvl_packet_sent(Peer_socket::Ptr sock, Low_lvl_packet::Const_ptr packet, size_t bytes_expected_transferred, const Error_code &sys_err_code, size_t bytes_transferred)
Completion handler for async_low_lvl_packet_send_impl(); called when the packet is either successfull...
void sock_pacing_new_packet_ready(Peer_socket::Ptr sock, Low_lvl_packet::Ptr packet)
async_sock_low_lvl_packet_send_paced() pacing helper: Handles a DATA or ACK packet that was just pass...
void async_no_sock_low_lvl_packet_send(const util::Udp_endpoint &low_lvl_remote_endpoint, Low_lvl_packet::Const_ptr packet)
async_low_lvl_packet_send_impl() wrapper to call when packet is to be sent to the remote side of the ...
void async_sock_low_lvl_packet_send(Peer_socket::Ptr sock, Low_lvl_packet::Const_ptr &&packet, bool delayed_by_pacing)
async_low_lvl_packet_send_impl() wrapper to call when packet is to be sent to the remote side of the ...
void perform_accumulated_on_recv_tasks()
Performs all tasks to be performed at the end of low_lvl_recv_and_handle() or async part of async_wai...
void handle_incoming(util::Blob *packet_data, const util::Udp_endpoint &low_lvl_remote_endpoint)
Handles a just-received, not-yet-deserialized low-level packet.
void sync_sock_low_lvl_rst_send(Peer_socket::Ptr sock)
Sends an RST to the other side of the given socket, synchronously.
void mark_data_packet_sent(Peer_socket::Ptr sock, const Sequence_number &seq_num)
Performs important book-keeping based on the event "DATA packet was sent to destination....
void sock_pacing_time_slice_end(Peer_socket::Ptr sock, const Error_code &sys_err_code)
async_sock_low_lvl_packet_send_paced() pacing helper: If sock_pacing_process_q() ran out of the last ...
boost::shared_ptr< Net_env_simulator > m_net_env_sim
The object used to simulate stuff like packet loss and latency via local means directly in the code.
void async_wait_latency_then_handle_incoming(const Fine_duration &latency, util::Blob *packet_data, const util::Udp_endpoint &low_lvl_remote_endpoint)
Sets up handle_incoming(packet_data, low_lvl_remote_endpoint) to be called asynchronously after a spe...
Opt_type opt(const Opt_type &opt_val_ref) const
Obtain a copy of the value of a given option in a thread-safe manner.
void sock_pacing_process_q(Peer_socket::Ptr sock, bool executing_after_delay)
async_sock_low_lvl_packet_send_paced() pacing helper: Given that we are currently in the pacing time ...
void async_no_sock_low_lvl_rst_send(Low_lvl_packet::Const_ptr causing_packet, const util::Udp_endpoint &low_lvl_remote_endpoint)
Sends an RST to the given UDP endpoint in response to the given incoming low-level packet that came f...
unsigned int handle_incoming_with_simulation(util::Blob *packet_data, const util::Udp_endpoint &low_lvl_remote_endpoint, bool is_sim_duplicate_packet=false)
Helper for low_lvl_recv_and_handle() that calls handle_incoming() on the not-yet-deserialized low-lev...
void low_lvl_recv_and_handle(Error_code sys_err_code)
Handles the pre-condition that m_low_lvl_sock has a UDP packet available for reading,...
Udp_socket m_low_lvl_sock
The UDP socket used to receive low-level packets (to assemble into application layer data) and send t...
void async_low_lvl_packet_send_impl(const util::Udp_endpoint &low_lvl_remote_endpoint, Low_lvl_packet::Const_ptr packet, bool delayed_by_pacing, Peer_socket::Ptr sock)
Takes given low-level packet structure, serializes it, and initiates asynchronous send of these data ...
util::Blob m_packet_data
Stores incoming raw packet data; re-used repeatedly for possible performance gains.
void async_sock_low_lvl_rst_send(Peer_socket::Ptr sock)
Sends an RST to the other side of the given socket asynchronously when possible.
Node_options m_opts
This Node's global set of options.
void async_low_lvl_recv()
Registers so that during the current or next m_task_engine.run(), the latter will wait for a receivab...
void async_sock_low_lvl_packet_send_paced(const Peer_socket::Ptr &sock, Low_lvl_packet::Ptr &&packet)
Begins the process of asynchronously sending the given low-level packet to the remote Node specified ...
util::Task_engine m_task_engine
The main loop engine, functioning in the single-threaded-but-asynchronous callback-based "reactor" st...
void sock_pacing_new_time_slice(Peer_socket::Ptr sock, const Fine_time_pt &now)
async_sock_low_lvl_packet_send_paced() pacing helper: Resets the socket's Send_pacing_data structure ...
@ S_CLOSED
Neither future reads nor writes are possible, AND Node has disowned the Peer_socket.
Sequence_number::seq_num_t order_num_t
Short-hand for order number type. 0 is reserved. Caution: Keep in sync with Drop_timer::packet_id_t.
Sent_pkt_by_sent_when_map::iterator Sent_pkt_ordered_by_when_iter
Short-hand for m_snd_flying_pkts_by_sent_when iterator type.
An internal net_flow sequence number identifying a piece of data.
boost::shared_ptr< Peer_socket > Ptr
Short-hand for ref-counted pointer to mutable values of type Target_type::element_type (a-la T*).
Const_target_ptr Const_ptr
Short-hand for ref-counted pointer to immutable values of type Target_type::element_type (a-la T cons...
#define FLOW_ERROR_SYS_ERROR_LOG_WARNING()
Logs a warning about the (often errno-based or from a library) error code in sys_err_code.
#define FLOW_LOG_DATA(ARG_stream_fragment)
Logs a DATA message into flow::log::Logger *get_logger() with flow::log::Component get_log_component(...
#define FLOW_LOG_WARNING(ARG_stream_fragment)
Logs a WARNING message into flow::log::Logger *get_logger() with flow::log::Component get_log_compone...
#define FLOW_LOG_TRACE_WITHOUT_CHECKING(ARG_stream_fragment)
Logs a TRACE message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
#define FLOW_LOG_DATA_WITHOUT_CHECKING(ARG_stream_fragment)
Logs a DATA message into flow::log::Logger *get_logger() with flow::log::Component get_log_component(...
#define FLOW_LOG_TRACE(ARG_stream_fragment)
Logs a TRACE message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
@ S_DATA
Message satisfies Sev::S_TRACE description AND contains variable-length structure (like packet,...
@ S_TRACE
Message indicates any condition that may occur with great frequency (thus verbose if logged).
Flow module containing the API and implementation of the Flow network protocol, a TCP-inspired stream...
Scheduled_task_handle schedule_task_from_now(log::Logger *logger_ptr, const Fine_duration &from_now, bool single_threaded, Task_engine *task_engine, Scheduled_task_handler &&task_body_moved)
Schedule the given function to execute in a certain amount of time: A handy wrapper around Timer (asi...
boost::asio::io_context Task_engine
Short-hand for boost.asio event service, the central class of boost.asio.
Blob_with_log_context<> Blob
A concrete Blob_with_log_context that compile-time-disables Basic_blob::share() and the sharing API d...
boost::asio::ip::udp::endpoint Udp_endpoint
Short-hand for the UDP endpoint (IP/port) type.
boost::system::error_code Error_code
Short-hand for a boost.system error code (which basically encapsulates an integer/enum error code and...
Fine_clock::duration Fine_duration
A high-res time duration as computed from two Fine_time_pts.
Fine_clock::time_point Fine_time_pt
A high-res time point as returned by Fine_clock::now() and suitable for precise time math in general.
Internal net_flow struct that encapsulates the Flow-protocol low-level ACK packet.
Internal net_flow struct that encapsulates the Flow-protocol low-level DATA packet.
std::vector< Const_buffer > Const_buffer_sequence
Short-hand for sequence of immutable buffers; i.e., a sequence of 1 or more scattered areas in memory...
size_t m_dyn_low_lvl_max_packet_size
Any incoming low-level (UDP) packet will be truncated to this size.
unsigned int m_dyn_max_packets_per_main_loop_iteration
The UDP net-stack may deliver 2 or more datagrams to the Flow Node at the same time.
Fine_duration m_st_timer_min_period
A time period such that the boost.asio timer implementation for this platform is able to accurately a...
Metadata (and data, if retransmission is on) for a packet that has been sent one (if retransmission i...
std::vector< Sent_when > m_sent_when
Time stamps, order numbers, and other info at the times when the different attempts (including origin...
Internal net_flow struct that encapsulates the Flow-protocol low-level RST packet.
The current outgoing packet pacing state, including queue of low-level packets to be sent,...
util::Timer m_slice_timer
When running, m_packet_q is non-empty, m_bytes_allowed_this_slice < data size of m_packet_q....
size_t m_bytes_allowed_this_slice
This many bytes worth of DATA packets may still be sent, at this time, within the time slice defined ...
Send_pacing_data(util::Task_engine *task_engine)
Initializes data to initial state (no active time slice).
Packet_q m_packet_q
Queue of low-level packets to be sent to the remote endpoint, in order in which they are to be sent,...
Fine_duration m_slice_period
The length of the current pacing time slice period; this depends on congestion window and SRTT on the...
Fine_time_pt m_slice_start
The time point at which the last pacing time slice began; or epoch if no packets sent so far (i....