33 using boost::asio::null_buffers;
49 using boost::asio::buffer;
54 unsigned int handled_packet_count = 0;
57 unsigned int recvd_packet_count = 0;
93 size_t packet_size = 0;
101 if ((!packet_data.zero()) && (packet_data.capacity() < max_packet_size))
103 packet_data.make_zero();
105 packet_data.resize(max_packet_size);
109 packet_size =
m_low_lvl_sock.receive_from(buffer(packet_data.data(), packet_data.size()),
110 low_lvl_remote_endpoint, 0, sys_err_code);
113 assert(packet_size <= packet_data.size());
114 packet_data.resize(packet_size);
117 "[UDP " << low_lvl_remote_endpoint <<
"]:");
120 ++recvd_packet_count;
129 else if (sys_err_code != boost::asio::error::would_block)
150 assert(packet_size == 0);
154 assert(packet_size == 0);
158 while ((packet_size != 0)
159 && ((recvd_packet_count_limit == 0) || (recvd_packet_count < recvd_packet_count_limit)));
167 FLOW_LOG_TRACE(
"Handled a total of [" << handled_packet_count <<
"] incoming packets "
168 "out of [" << recvd_packet_count <<
"] received (limit [" << recvd_packet_count_limit <<
"]) "
169 "in this boost.asio handler.");
189 bool is_sim_duplicate_packet)
192 using boost::chrono::milliseconds;
193 using boost::chrono::round;
197 unsigned int handled = 0;
213 packet_data_copy = *(
static_cast<const Blob*
>(packet_data));
217 if (latency == Fine_duration::zero())
228 FLOW_LOG_TRACE(
"SIMULATION: Delaying reception of packet by simulated latency "
229 "[" << round<milliseconds>(latency) <<
"].");
242 low_lvl_remote_endpoint,
true);
261 using boost::chrono::milliseconds;
262 using boost::chrono::round;
263 using boost::shared_ptr;
272 shared_ptr<Blob> packet_data_moved_ptr(
new Blob(std::move(*packet_data)));
278 [
this, packet_data_moved_ptr, low_lvl_remote_endpoint, latency, started_at]
284 (
"SIMULATOR: Handling low-level packet after "
285 "simulated latency [" << round<milliseconds>(latency) <<
"]; "
286 "actual simulated latency was "
287 "[" << round<milliseconds>(Fine_clock::now() - started_at) <<
"]; "
288 "from [UDP " << low_lvl_remote_endpoint <<
"].");
291 Blob packet_data_moved_again(std::move(*packet_data_moved_ptr));
303 "out of a simulated [1] received in this boost.asio handler.");
308 bool delayed_by_pacing)
325 using boost::asio::buffer;
328 const auto& packet_ref = *packet;
329 const auto& packet_type_id =
typeid(packet_ref);
389 const size_t bytes_to_send = packet->serialize_to_raw_data_and_log(&raw_bufs);
390 assert(bytes_to_send != 0);
393 sock->m_snd_stats.low_lvl_packet_xfer_called(packet_type_id, delayed_by_pacing, bytes_to_send);
396 if (bytes_to_send > limit)
399 FLOW_LOG_WARNING(
"Tried to send low-level packet but before doing so detected "
400 "serialized size [" << bytes_to_send <<
"] exceeds limit [" << limit <<
"]; "
401 "check max-block-size and low-lvl-max-packet-size options! Serialized packet: "
402 "[\n" << packet->m_concise_ostream_manip <<
"].");
404 FLOW_LOG_DATA(
"Detailed serialized packet details from preceding warning: "
405 "[\n" << packet->m_verbose_ostream_manip <<
"].");
408 sock->m_snd_stats.low_lvl_packet_xfer_completed(packet_type_id, bytes_to_send, 0);
415 low_lvl_remote_endpoint,
416 [
this, sock, packet, bytes_to_send](
const Error_code& sys_err_code,
size_t n_sent)
424 using boost::chrono::microseconds;
425 using boost::chrono::round;
430 if (sent_pkt_it == sock->m_snd_flying_pkts_by_sent_when.past_oldest())
434 "sequence number [" << seq_num <<
"] but cannot find corresponding Sent_packet. "
435 "Cannot deal with some of the related data structures; still sending. Bug?");
455 const Fine_time_pt& now = sock->m_snd_last_data_sent_when = Fine_clock::now();
456 const size_t cwnd = sock->m_snd_cong_ctl->congestion_window_bytes();
464 last_send_attempt.m_sent_time = now;
465 last_send_attempt.m_sent_cwnd_bytes = cwnd;
466 const microseconds diff = round<microseconds>(now - prev_sent_when);
469 (
"Sending/sent [DATA] packet over [" << sock <<
"] with "
470 "sequence number [" << seq_num <<
"] order_num [" << order_num <<
"]. Send timestamp changed from "
471 "[" << prev_sent_when <<
"] -> [" << now <<
"]; difference [" << diff <<
"].");
476 last_send_attempt.m_sent_time = now;
477 last_send_attempt.m_sent_cwnd_bytes = cwnd;
483 drop_timer->start_contemporaneous_events();
484 drop_timer->on_packet_in_flight(order_num);
485 drop_timer->end_contemporaneous_events();
498 size_t bytes_expected_transferred,
const Error_code& sys_err_code,
499 size_t bytes_transferred)
501 using std::numeric_limits;
505 const auto& packet_ref = *packet;
506 const auto& packet_type_id =
typeid(packet_ref);
517 "[\n" << packet->m_verbose_ostream_manip <<
"].");
522 "[\n" << packet->m_concise_ostream_manip <<
"].");
557 sock->m_snd_stats.low_lvl_packet_xfer_completed(packet_type_id);
566 sock->m_snd_stats.low_lvl_packet_xfer_completed(packet_type_id, bytes_expected_transferred, bytes_transferred);
569 if (bytes_transferred != bytes_expected_transferred)
577 FLOW_LOG_WARNING(
"Low-level packet sent, but only [" << bytes_transferred <<
"] of "
578 "[" << bytes_expected_transferred <<
"] bytes "
579 "were sent. Internal error with packet size calculations? More likely, did stack truncate?");
589 using boost::asio::buffer;
590 using boost::shared_ptr;
595 auto rst_base = Low_lvl_packet::create_uninit_packet_base<Rst_packet>(
get_logger());
597 rst_base->m_packed.m_src_port = causing_packet->m_packed.m_dst_port;
598 rst_base->m_packed.m_dst_port = causing_packet->m_packed.m_src_port;
599 rst_base->m_opt_rexmit_on =
false;
613 const auto& packet_ref = *packet;
614 const auto& packet_type_id =
typeid(packet_ref);
616 sock->m_snd_stats.low_lvl_packet_xfer_requested(packet_type_id);
619 packet->m_packed.m_src_port = sock->m_local_port;
620 packet->m_packed.m_dst_port = sock->remote_endpoint().m_flow_port;
621 packet->m_opt_rexmit_on = sock->rexmit_on();
626 if ((!sock->opt(sock->m_opts.m_st_snd_pacing_enabled)) ||
627 (sock->m_snd_smoothed_round_trip_time == Fine_duration::zero()) ||
652 using boost::chrono::duration_cast;
653 using boost::chrono::microseconds;
654 using boost::static_pointer_cast;
655 using boost::dynamic_pointer_cast;
656 using boost::shared_ptr;
660 const auto& packet_ref = *packet;
661 const auto& packet_type_id =
typeid(packet_ref);
666 const bool is_data_packet = packet_type_id ==
typeid(
Data_packet);
669 shared_ptr<const Data_packet> data;
672 init_seq_num = static_pointer_cast<const Data_packet>(packet)->m_seq_num;
676 const auto& acked_packets = static_pointer_cast<const Ack_packet>(packet)->m_rcv_acked_packets;
677 if (!acked_packets.empty())
679 init_seq_num = acked_packets.front()->m_seq_num;
683 const bool q_was_empty = pacing.
m_packet_q.empty();
690 FLOW_LOG_TRACE(
"Pacing: On [" << sock <<
"] packet of type [" << packet->m_type_ostream_manip <<
"] "
691 "is newly available for sending; pushed onto queue; queue size [" << pacing.
m_packet_q.size() <<
"]; "
692 "initial sequence number [" << init_seq_num <<
"].");
696 const auto data = dynamic_pointer_cast<const Data_packet>(pacing.
m_packet_q.front());
701 FLOW_LOG_TRACE(
"Pacing: On [" << sock <<
"]: was already in progress; queued and done.");
717 FLOW_LOG_TRACE(
"Pacing: On [" << sock <<
"]: due to packet type, sending immediately since at head of queue; "
718 "queue empty again.");
742 "slice [epoch+" << duration_cast<microseconds>(pacing.
m_slice_start.time_since_epoch()) <<
" "
743 "over " << duration_cast<microseconds>(pacing.
m_slice_period) <<
"] is over.");
767 using boost::chrono::duration_cast;
768 using boost::chrono::microseconds;
769 using boost::chrono::milliseconds;
776 const Fine_duration& srtt = sock->m_snd_smoothed_round_trip_time;
778 assert(srtt != Fine_duration::zero());
788 = srtt * sock->max_block_size() / sock->m_snd_cong_ctl->congestion_window_bytes();
789 if (slice_ideal_period == Fine_duration::zero())
796 if (timer_min_period == Fine_duration::zero())
804 const Fine_duration TIMER_MIN_PERIOD_DEFAULT = milliseconds(15);
805 timer_min_period = TIMER_MIN_PERIOD_DEFAULT;
810 pacing.
m_slice_period = max(slice_ideal_period, timer_min_period);
821 =
static_cast<size_t>(pacing.
m_slice_period * sock->max_block_size() / slice_ideal_period);
829 const size_t QUEUE_SIZE_DRIFT_PREVENTION_PCT = 110;
837 "slice [epoch+" << duration_cast<microseconds>(pacing.
m_slice_start.time_since_epoch()) <<
" "
838 "over " << duration_cast<microseconds>(pacing.
m_slice_period) <<
"]; "
839 "ideal slice period = [SRTT " << duration_cast<microseconds>(srtt) <<
"] / "
840 "([cong_wnd " << sock->m_snd_cong_ctl->congestion_window_bytes() <<
"] / "
841 "[max-block-size " << sock->max_block_size() <<
"]) = "
842 "[" << duration_cast<microseconds>(slice_ideal_period) <<
"]; "
843 "timer_min_period = [" << duration_cast<microseconds>(timer_min_period) <<
"]; "
844 "bytes_allowed = max(ideal, min) / ideal * max-block-size * "
845 "[" << QUEUE_SIZE_DRIFT_PREVENTION_PCT <<
"%] = "
851 using boost::chrono::milliseconds;
852 using boost::chrono::round;
853 using boost::shared_ptr;
854 using boost::weak_ptr;
855 using boost::static_pointer_cast;
856 using boost::dynamic_pointer_cast;
868 shared_ptr<const Data_packet> head_packet;
878 >= (head_packet = static_pointer_cast<const Data_packet>(pacing.
m_packet_q.front()))->m_data.size()))
887 FLOW_LOG_TRACE(
"Will send [" << head_packet->m_data.size() <<
"] bytes of data; budget now "
889 "queue size now [" << (pacing.
m_packet_q.size() - 1) <<
"].");
904 && (!(dynamic_pointer_cast<const Data_packet>(head_packet_base = pacing.
m_packet_q.front()))))
906 FLOW_LOG_TRACE(
"Pacing: On [" << sock <<
"]: due to packet type, sending immediately since at head of queue; "
907 "queue size now [" << (pacing.
m_packet_q.size() - 1) <<
"].");
938 "scheduling next processing at end of time slice "
939 "in [" << round<milliseconds>(slice_end - Fine_clock::now()) <<
"].");
951 pacing.
m_slice_timer.async_wait([
this, sock_observer = weak_ptr<Peer_socket>(sock)]
954 auto sock = sock_observer.lock();
983 assert(sys_err_code != boost::asio::error::operation_aborted);
988 assert(!sock->m_snd_pacing_data.m_packet_q.empty());
990 FLOW_LOG_TRACE(
"Pacing: On [" << sock <<
"]: slice end timer fired; creating new slice and processing queue.");
1021 bool defer_delta_check)
1038 auto rst = Low_lvl_packet::create_uninit_packet_base<Rst_packet>(
get_logger());
1047 using boost::asio::buffer;
1052 auto rst = Low_lvl_packet::create_uninit_packet_base<Rst_packet>(
get_logger());
1053 rst->m_packed.m_src_port = sock->m_local_port;
1054 rst->m_packed.m_dst_port = sock->remote_endpoint().m_flow_port;
1055 rst->m_opt_rexmit_on =
false;
1059 const size_t size = rst->serialize_to_raw_data_and_log(&raw_bufs);
1062 const auto& rst_type_id =
typeid(
Rst_packet);
1063 sock->m_snd_stats.low_lvl_packet_xfer_requested(rst_type_id);
1064 sock->m_snd_stats.low_lvl_packet_xfer_called(rst_type_id,
false, size);
1072 "serialized size [" << size <<
"] exceeds limit [" << limit <<
"]; "
1073 "check max-block-size and low-lvl-max-packet-size options! Serialized packet: "
1074 "[\n" << rst->m_concise_ostream_manip <<
"].");
1076 sock->m_snd_stats.low_lvl_packet_xfer_completed(rst_type_id, size, 0);
1083 sock->remote_endpoint().m_udp_endpoint, 0, sys_err_code);
1087 sock->m_snd_stats.low_lvl_packet_xfer_completed(rst_type_id);
1091 sock->m_snd_stats.low_lvl_packet_xfer_completed(rst_type_id, size, size_sent);
1098 m_bytes_allowed_this_slice(0),
1099 m_slice_timer(*task_engine)
const Component & get_log_component() const
Returns reference to the stored Component object, particularly as many FLOW_LOG_*() macros expect.
Logger * get_logger() const
Returns the stored Logger pointer, particularly as many FLOW_LOG_*() macros expect.
void low_lvl_packet_sent(Peer_socket::Ptr sock, Low_lvl_packet::Const_ptr packet, size_t bytes_expected_transferred, const Error_code &sys_err_code, size_t bytes_transferred)
Completion handler for async_low_lvl_packet_send_impl(); called when the packet is either successfull...
void async_no_sock_low_lvl_packet_send(const util::Udp_endpoint &low_lvl_remote_endpoint, Low_lvl_packet::Const_ptr packet)
async_low_lvl_packet_send_impl() wrapper to call when packet is to be sent to the remote side of the ...
void async_sock_low_lvl_packet_send(Peer_socket::Ptr sock, Low_lvl_packet::Const_ptr &&packet, bool delayed_by_pacing)
async_low_lvl_packet_send_impl() wrapper to call when packet is to be sent to the remote side of the ...
void perform_accumulated_on_recv_tasks()
Performs all tasks to be performed at the end of low_lvl_recv_and_handle() or async part of async_wai...
void handle_incoming(util::Blob *packet_data, const util::Udp_endpoint &low_lvl_remote_endpoint)
Handles a just-received, not-yet-deserialized low-level packet.
void sync_sock_low_lvl_rst_send(Peer_socket::Ptr sock)
Sends an RST to the other side of the given socket, synchronously.
void mark_data_packet_sent(Peer_socket::Ptr sock, const Sequence_number &seq_num)
Performs important book-keeping based on the event "DATA packet was sent to destination....
bool async_sock_low_lvl_packet_send_paced(const Peer_socket::Ptr &sock, Low_lvl_packet::Ptr &&packet, Error_code *err_code)
Begins the process of asynchronously sending the given low-level packet to the remote Node specified ...
void sock_pacing_time_slice_end(Peer_socket::Ptr sock, const Error_code &sys_err_code)
async_sock_low_lvl_packet_send_paced() pacing helper: If sock_pacing_process_q() ran out of the last ...
boost::shared_ptr< Net_env_simulator > m_net_env_sim
The object used to simulate stuff like packet loss and latency via local means directly in the code.
void async_wait_latency_then_handle_incoming(const Fine_duration &latency, util::Blob *packet_data, const util::Udp_endpoint &low_lvl_remote_endpoint)
Sets up handle_incoming(packet_data, low_lvl_remote_endpoint) to be called asynchronously after a spe...
Opt_type opt(const Opt_type &opt_val_ref) const
Obtain a copy of the value of a given option in a thread-safe manner.
bool sock_pacing_new_packet_ready(Peer_socket::Ptr sock, Low_lvl_packet::Ptr packet, Error_code *err_code)
async_sock_low_lvl_packet_send_paced() pacing helper: Handles a DATA or ACK packet that was just pass...
void async_no_sock_low_lvl_rst_send(Low_lvl_packet::Const_ptr causing_packet, const util::Udp_endpoint &low_lvl_remote_endpoint)
Sends an RST to the given UDP endpoint in response to the given incoming low-level packet that came f...
unsigned int handle_incoming_with_simulation(util::Blob *packet_data, const util::Udp_endpoint &low_lvl_remote_endpoint, bool is_sim_duplicate_packet=false)
Helper for low_lvl_recv_and_handle() that calls handle_incoming() on the not-yet-deserialized low-lev...
void low_lvl_recv_and_handle(Error_code sys_err_code)
Handles the pre-condition that m_low_lvl_sock has a UDP packet available for reading,...
static Socket_id socket_id(Peer_socket::Const_ptr sock)
Constructs the socket pair (connection ID) for the given socket.
Udp_socket m_low_lvl_sock
The UDP socket used to receive low-level packets (to assemble into application layer data) and send t...
void async_low_lvl_packet_send_impl(const util::Udp_endpoint &low_lvl_remote_endpoint, Low_lvl_packet::Const_ptr packet, bool delayed_by_pacing, Peer_socket::Ptr sock)
Takes given low-level packet structure, serializes it, and initiates asynchronous send of these data ...
bool async_sock_low_lvl_packet_send_or_close_immediately(const Peer_socket::Ptr &sock, Low_lvl_packet::Ptr &&packet, bool defer_delta_check)
Similar to async_sock_low_lvl_packet_send_paced() except it also calls close_connection_immediately(s...
bool sock_pacing_process_q(Peer_socket::Ptr sock, Error_code *err_code, bool executing_after_delay)
async_sock_low_lvl_packet_send_paced() pacing helper: Given that we are currently in the pacing time ...
util::Blob m_packet_data
Stores incoming raw packet data; re-used repeatedly for possible performance gains.
void async_sock_low_lvl_rst_send(Peer_socket::Ptr sock)
Sends an RST to the other side of the given socket asynchronously when possible.
Node_options m_opts
This Node's global set of options.
void async_low_lvl_recv()
Registers so that during the current or next m_task_engine.run(), the latter will wait for a receivab...
void close_connection_immediately(const Socket_id &socket_id, Peer_socket::Ptr sock, const Error_code &err_code, bool defer_delta_check)
A thread W method that handles the transition of the given socket from OPEN (any sub-state) to CLOSED...
util::Task_engine m_task_engine
The main loop engine, functioning in the single-threaded-but-asynchronous callback-based "reactor" st...
void sock_pacing_new_time_slice(Peer_socket::Ptr sock, const Fine_time_pt &now)
async_sock_low_lvl_packet_send_paced() pacing helper: Resets the socket's Send_pacing_data structure ...
void rst_and_close_connection_immediately(const Socket_id &socket_id, Peer_socket::Ptr sock, const Error_code &err_code, bool defer_delta_check)
Asynchronously send RST to the other side of the given socket and close_connection_immediately().
@ S_CLOSED
Neither future reads nor writes are possible, AND Node has disowned the Peer_socket.
Sequence_number::seq_num_t order_num_t
Short-hand for order number type. 0 is reserved. Caution: Keep in sync with Drop_timer::packet_id_t.
Sent_pkt_by_sent_when_map::iterator Sent_pkt_ordered_by_when_iter
Short-hand for m_snd_flying_pkts_by_sent_when iterator type.
An internal net_flow sequence number identifying a piece of data.
boost::shared_ptr< Peer_socket > Ptr
Short-hand for ref-counted pointer to mutable values of type Target_type::element_type (a-la T*).
Const_target_ptr Const_ptr
Short-hand for ref-counted pointer to immutable values of type Target_type::element_type (a-la T cons...
#define FLOW_ERROR_SYS_ERROR_LOG_WARNING()
Logs a warning about the (often errno-based or from a library) error code in sys_err_code.
#define FLOW_ERROR_EMIT_ERROR(ARG_val)
Sets *err_code to ARG_val and logs a warning about the error using FLOW_LOG_WARNING().
#define FLOW_LOG_DATA(ARG_stream_fragment)
Logs a DATA message into flow::log::Logger *get_logger() with flow::log::Component get_log_component(...
#define FLOW_LOG_WARNING(ARG_stream_fragment)
Logs a WARNING message into flow::log::Logger *get_logger() with flow::log::Component get_log_compone...
#define FLOW_LOG_TRACE_WITHOUT_CHECKING(ARG_stream_fragment)
Logs a TRACE message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
#define FLOW_LOG_DATA_WITHOUT_CHECKING(ARG_stream_fragment)
Logs a DATA message into flow::log::Logger *get_logger() with flow::log::Component get_log_component(...
#define FLOW_LOG_TRACE(ARG_stream_fragment)
Logs a TRACE message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
@ S_DATA
Message satisfies Sev::S_TRACE description AND contains variable-length structure (like packet,...
@ S_TRACE
Message indicates any condition that may occur with great frequency (thus verbose if logged).
@ S_INTERNAL_ERROR_SYSTEM_ERROR_ASIO_TIMER
Internal error: System error: Something went wrong with boost.asio timer subsystem.
Flow module containing the API and implementation of the Flow network protocol, a TCP-inspired stream...
Scheduled_task_handle schedule_task_from_now(log::Logger *logger_ptr, const Fine_duration &from_now, bool single_threaded, Task_engine *task_engine, Scheduled_task_handler &&task_body_moved)
Schedule the given function to execute in a certain amount of time: A handy wrapper around Timer (asi...
boost::asio::io_service Task_engine
Short-hand for boost.asio event service, the central class of boost.asio.
Blob_with_log_context<> Blob
A concrete Blob_with_log_context that compile-time-disables Basic_blob::share() and the sharing API d...
boost::asio::ip::udp::endpoint Udp_endpoint
Short-hand for the UDP endpoint (IP/port) type.
boost::system::error_code Error_code
Short-hand for a boost.system error code (which basically encapsulates an integer/enum error code and...
Fine_clock::duration Fine_duration
A high-res time duration as computed from two Fine_time_pts.
Fine_clock::time_point Fine_time_pt
A high-res time point as returned by Fine_clock::now() and suitable for precise time math in general.
Internal net_flow struct that encapsulates the Flow-protocol low-level ACK packet.
Internal net_flow struct that encapsulates the Flow-protocol low-level DATA packet.
std::vector< Const_buffer > Const_buffer_sequence
Short-hand for sequence of immutable buffers; i.e., a sequence of 1 or more scattered areas in memory...
size_t m_dyn_low_lvl_max_packet_size
Any incoming low-level (UDP) packet will be truncated to this size.
unsigned int m_dyn_max_packets_per_main_loop_iteration
The UDP net-stack may deliver 2 or more datagrams to the Flow Node at the same time.
Fine_duration m_st_timer_min_period
A time period such that the boost.asio timer implementation for this platform is able to accurately a...
Metadata (and data, if retransmission is on) for a packet that has been sent one (if retransmission i...
std::vector< Sent_when > m_sent_when
Time stamps, order numbers, and other info at the times when the different attempts (including origin...
Internal net_flow struct that encapsulates the Flow-protocol low-level RST packet.
The current outgoing packet pacing state, including queue of low-level packets to be sent,...
util::Timer m_slice_timer
When running, m_packet_q is non-empty, m_bytes_allowed_this_slice < data size of m_packet_q....
size_t m_bytes_allowed_this_slice
This many bytes worth of DATA packets may still be sent, at this time, within the time slice defined ...
Send_pacing_data(util::Task_engine *task_engine)
Initializes data to initial state (no active time slice).
Packet_q m_packet_q
Queue of low-level packets to be sent to the remote endpoint, in order in which they are to be sent,...
Fine_duration m_slice_period
The length of the current pacing time slice period; this depends on congestion window and SRTT on the...
Fine_time_pt m_slice_start
The time point at which the last pacing time slice began; or epoch if no packets sent so far (i....