26#include <boost/algorithm/string.hpp>
27#include <boost/tuple/tuple.hpp>
42 m_active_connect(false),
43 m_state(
State::S_CLOSED),
46 m_rcv_buf(logger_ptr, 0),
48 m_snd_buf(logger_ptr, max_block_size()),
49 m_serialized_metadata(logger_ptr),
52 m_rcv_syn_rcvd_data_cumulative_size(0),
53 m_rcv_reassembly_q_data_size(0),
54 m_rcv_pending_acks_size_at_recv_handler_start(0),
55 m_snd_pending_rcv_wnd(0),
56 m_rcv_last_sent_rcv_wnd(0),
57 m_rcv_in_rcv_wnd_recovery(false),
58 m_rcv_delayed_ack_timer(*task_engine),
59 m_snd_flying_bytes(0),
60 m_snd_last_order_num(0),
61 m_snd_rexmit_q_size(0),
62 m_snd_remote_rcv_wnd(0),
63 m_snd_smoothed_round_trip_time(0),
64 m_round_trip_time_variance(0),
65 m_snd_drop_timeout(0),
66 m_snd_pacing_data(task_engine),
68 m_init_rexmit_count(0)
71 FLOW_LOG_TRACE(
"Peer_socket [" <<
static_cast<void*
>(
this) <<
"] created.");
109 return sync_send(
nullptr, Fine_duration::max(), err_code);
120 const Function<size_t (
size_t)> empty_snd_buf_feed_func;
121 assert(empty_snd_buf_feed_func.empty());
135 const Ptr sock = shared_from_this();
142 return m_node->
send(sock, snd_buf_feed_func, err_code);
149 using boost::adopt_lock;
154 const Ptr sock = shared_from_this();
180 snd_buf_feed_func_or_empty.empty()
183 {
return m_node->
send(sock, snd_buf_feed_func_or_empty, err_code); }),
185 wait_until, err_code);
190 return sync_receive(
nullptr, Fine_duration::max(), err_code);
201 const Function<size_t ()> empty_rcv_buf_consume_func;
202 assert(empty_rcv_buf_consume_func.empty());
216 const Ptr sock = shared_from_this();
223 return m_node->
receive(sock, rcv_buf_consume_func, err_code);
230 using boost::adopt_lock;
235 const Ptr sock = shared_from_this();
253 rcv_buf_consume_func_or_empty.empty()
256 {
return m_node->
receive(sock, rcv_buf_consume_func_or_empty, err_code); }),
258 wait_until, err_code);
275 const Ptr sock = shared_from_this();
298 const Ptr sock = shared_from_this();
326 const Const_ptr sock = shared_from_this();
349 const unsigned int* inflate_pct_val_ptr)
const
355 const unsigned int inflate_pct = inflate_pct_val_ptr ? (*inflate_pct_val_ptr) : 0;
421 os.os() << bytes <<
'~' << (bytes / block);
422 if ((bytes % block) != 0)
431 boost::shared_ptr<Data_packet> packet,
433 m_size(packet->m_data.size()),
434 m_sent_when({ sent_when }),
436 m_packet(
rexmit_on ? packet : boost::shared_ptr<Data_packet>())
450 m_data = std::move(*src_data);
466 boost::shared_ptr<const Syn_ack_packet> syn_ack)
474 FLOW_LOG_INFO(
"NetFlow worker thread continuing active-connect of [" << sock <<
"]. "
475 "Received [" << syn_ack->m_type_ostream_manip <<
"] with "
476 "ISN [" << syn_ack->m_init_seq_num <<
"]; "
477 "security token [" << syn_ack->m_packed.m_security_token <<
"].");
481 async_low_lvl_syn_ack_ack_send(sock, syn_ack);
488 sock->m_rcv_init_seq_num = syn_ack->m_init_seq_num;
489 sock->m_rcv_next_seq_num = sock->m_rcv_init_seq_num + 1;
502 setup_drop_timer(socket_id, sock);
505 sock->m_snd_remote_rcv_wnd = syn_ack->m_packed.m_rcv_wnd;
517 event_set_all_check_delta(
true);
525 boost::shared_ptr<const Syn_ack_packet> syn_ack)
533 FLOW_LOG_INFO(
"NetFlow worker thread working on [" << sock <<
"]. "
535 "received duplicate [" << syn_ack->m_type_ostream_manip <<
"] with "
536 "ISN [" << syn_ack->m_init_seq_num <<
"]; "
537 "security token [" << syn_ack->m_packed.m_security_token <<
"]. "
538 "Could be from packet loss.");
542 async_low_lvl_syn_ack_ack_send(sock, syn_ack);
547 boost::shared_ptr<Data_packet> packet,
548 bool syn_rcvd_qd_packet)
602 const bool rexmit_on = sock->rexmit_on();
605 auto& data = packet->m_data;
606 assert(!data.empty());
608 const size_t data_size = data.size();
616 FLOW_LOG_TRACE(
"NetFlow worker thread working on [" << sock <<
"]. "
617 "Received [" << packet->m_type_ostream_manip <<
"] with "
618 "sequence number [" << seq_num <<
"]; data size [" << data_size <<
"].");
623 log_rcv_window(sock);
639 const Error_code cat_result = sock_categorize_data_to_established(sock, packet, &dupe, &slide, &slide_size);
649 rst_and_close_connection_immediately(socket_id, sock, cat_result,
true);
682 async_acknowledge_packet(sock, seq_num, packet->m_rexmit_id, data_size);
698 if (!sock_data_to_rcv_buf_unless_overflow(sock, packet))
722 sock_rcv_buf_now_readable(sock, syn_rcvd_qd_packet);
730 async_acknowledge_packet(sock, seq_num, 0, data_size);
739 sock_track_new_data_after_gap_rexmit_off(sock, packet, data_size, &slide, &slide_size);
751 sock_slide_rcv_next_seq_num(sock, slide_size,
false);
766 if (!sock_data_to_rcv_buf_unless_overflow(sock, packet))
783 sock_slide_rcv_next_seq_num(sock, slide_size,
true);
787 sock_rcv_buf_now_readable(sock, syn_rcvd_qd_packet);
789 else if (!sock_data_to_reassembly_q_unless_overflow(sock, packet))
803 async_acknowledge_packet(sock, seq_num, packet->m_rexmit_id, data_size);
807 log_rcv_window(sock);
811 boost::shared_ptr<const Data_packet> packet,
812 bool* dupe,
bool* slide,
size_t* slide_size)
814 assert(dupe && slide && slide_size);
826 const auto& data = packet->m_data;
831 advance_seq_num(&seq_num_end, data.size());
834 bool first_gap_exists;
837 rcv_get_first_gap_info(sock, &first_gap_exists, &seq_num_after_first_gap);
851 "Received [" << packet->m_type_ostream_manip <<
"] with "
852 "sequence number [" << seq_num <<
"]; data size [" << data.size() <<
"]; "
853 "sequence number precedes "
854 "ISN [" << sock->m_rcv_init_seq_num <<
"].");
860 if (seq_num < rcv_next_seq_num)
867 if (seq_num_end > rcv_next_seq_num)
871 "Received [" << packet->m_type_ostream_manip <<
"] with "
872 "sequence numbers [" << seq_num <<
", " << seq_num_end <<
"); "
873 "data size [" << data.size() <<
"]; "
874 "straddle first unreceived "
875 "sequence number [" << rcv_next_seq_num <<
"].");
882 FLOW_LOG_TRACE(
"Duplicate packet before first unreceived sequence number [" << rcv_next_seq_num <<
"].");
895 if (seq_num == rcv_next_seq_num)
901 if (first_gap_exists && (seq_num_end > seq_num_after_first_gap))
905 "Received [" << packet->m_type_ostream_manip <<
"] with "
906 "sequence numbers [" << seq_num <<
", " << seq_num_end <<
"); "
907 "data size [" << data.size() <<
"]; "
908 "supposed gap-filling data "
909 "straddle the boundary of packet [" << seq_num_after_first_gap <<
", ...).");
915 FLOW_LOG_TRACE(
"Packet filled first [" << data.size() <<
"] unreceived sequence numbers "
916 "starting with [" << rcv_next_seq_num <<
"].");
920 *slide_size = size_t(seq_num_end - seq_num);
921 assert(*slide_size == data.size());
926 assert(seq_num > rcv_next_seq_num);
968 if (next_packet == rcv_packets_with_gaps.end())
976 if (first_gap_exists)
980 get_seq_num_range(last_packet, 0, &seq_num_last_end);
982 if (seq_num_last_end > seq_num)
988 "Received [" << packet->m_type_ostream_manip <<
"] with "
989 "sequence numbers [" << seq_num <<
", " << seq_num_end <<
"); "
990 "data size [" << data.size() <<
"]; "
991 "supposed middle gap-filling packet data "
992 "straddle the boundary of last packet [..., " << seq_num_last_end <<
").");
1000 FLOW_LOG_TRACE(
"New packet is newest packet after unreceived gap; "
1001 "sequence numbers [" << seq_num <<
", " << seq_num_end <<
"); "
1002 "first unreceived packet [" << rcv_next_seq_num <<
"].");
1008 FLOW_LOG_TRACE(
"New packet forms gap; sequence numbers [" << seq_num <<
", " << seq_num_end <<
"); "
1009 "first unreceived packet [" << rcv_next_seq_num <<
"].");
1019 get_seq_num_range(next_packet, &seq_num_next_start, &seq_num_next_end);
1021 if (seq_num_next_start == seq_num)
1026 if (seq_num_next_end != seq_num_end)
1032 "Received [" << packet->m_type_ostream_manip <<
"] with "
1033 "sequence numbers [" << seq_num <<
", " << seq_num_end <<
"); "
1034 "data size [" << data.size() <<
"]; "
1035 "do not match supposed "
1036 "duplicate packet [" << seq_num <<
", " << seq_num_next_end <<
").");
1047 "sequence numbers [" << seq_num <<
", " << seq_num_end <<
").");
1053 assert(seq_num_next_start > seq_num);
1065 if (seq_num_end > seq_num_next_start)
1071 "Received [" << packet->m_type_ostream_manip <<
"] with "
1072 "sequence numbers [" << seq_num <<
", " << seq_num_end <<
"); "
1073 "data size [" << data.size() <<
"]; "
1074 "supposed middle gap-filling packet data "
1075 "straddle the left boundary of packet "
1076 "[" << seq_num_next_start <<
", " << seq_num_next_end <<
").");
1082 if (next_packet == rcv_packets_with_gaps.begin())
1084 FLOW_LOG_TRACE(
"New packet partially fills first gap without sliding window; "
1085 "sequence numbers [" << seq_num <<
", " << seq_num_end <<
"); "
1086 "first unreceived packet [" << rcv_next_seq_num <<
"].");
1092 get_seq_num_range(prev_packet, &seq_num_prev_start, &seq_num_prev_end);
1094 if (seq_num_prev_end > seq_num)
1100 "Received [" << packet->m_type_ostream_manip <<
"] with "
1101 "sequence numbers [" << seq_num <<
", " << seq_num_end <<
"); "
1102 "data size [" << data.size() <<
"]; "
1103 "supposed middle gap-filling packet data "
1104 "straddle the right boundary of packet "
1105 "[" << seq_num_prev_start <<
", " << seq_num_prev_end <<
").");
1112 "sequence numbers [" << seq_num <<
", " << seq_num_end <<
"); "
1113 "first unreceived packet [" << rcv_next_seq_num <<
"].");
1119 boost::shared_ptr<Data_packet> packet)
1128 Blob& data = packet->m_data;
1130 const size_t data_size = data.size();
1146 if ((sock->m_rcv_buf.data_size() + data_size)
1147 > sock->max_block_size_multiple(sock->m_opts.m_st_rcv_buf_max_size,
1148 &sock->m_opts.m_st_rcv_buf_max_size_slack_percent))
1156 FLOW_LOG_INFO(
"NetFlow worker thread working on [" << sock <<
"]. "
1157 "Received [" << packet->m_type_ostream_manip <<
"] with "
1158 "sequence numbers [" << packet->m_seq_num <<
", " << (packet->m_seq_num + data_size) <<
"); "
1159 "data size [" << data_size <<
"]; "
1160 "dropping because Receive buffer full.");
1173 const size_t written =
1175 sock->m_rcv_buf.feed_buf_move(&data, std::numeric_limits<size_t>::max());
1177 assert(written == data_size);
1179 buf_size = sock->m_rcv_buf.data_size();
1190 receive_wnd_recovery_data_received(sock);
1218 if ((!syn_rcvd_qd_packet) &&
1222 event_set_all_check_delta(
true);
1230 boost::shared_ptr<const Data_packet> packet,
1232 bool* slide,
size_t* slide_size)
1234 using std::make_pair;
1250 const size_t max_packets_after_unrecvd_packet = sock_max_packets_after_unrecvd_packet(sock);
1257 const auto insert_result =
1259 rcv_packets_with_gaps.insert
1263 assert(!sock->rexmit_on());
1264 assert(insert_result.second);
1267 bool first_gap_exists;
1271 rcv_get_first_gap_info(sock, &first_gap_exists, &seq_num_after_first_gap);
1272 assert(first_gap_exists);
1281 if (rcv_packets_with_gaps.size() == max_packets_after_unrecvd_packet + 1)
1285 *slide_size = size_t(seq_num_after_first_gap - sock->m_rcv_next_seq_num);
1291 FLOW_LOG_INFO(
"NetFlow worker thread working on [" << sock <<
"]. "
1292 "Received [" << packet->m_type_ostream_manip <<
"] with "
1293 "sequence numbers [" << packet->m_seq_num <<
", " << (packet->m_seq_num + data_size) <<
"); "
1294 "exceeded max gapped packet list size [" << max_packets_after_unrecvd_packet <<
"]; "
1295 "assuming Dropped; "
1296 "will fake receiving all [" << slide_size <<
"] sequence numbers in the first unreceived gap.");
1301 assert(rcv_packets_with_gaps.size() <= max_packets_after_unrecvd_packet);
1306 boost::shared_ptr<Data_packet> packet)
1308 using std::make_pair;
1317 auto& data = packet->m_data;
1319 const size_t data_size = data.size();
1324 size_t max_packets_after_unrecvd_packet = sock_max_packets_after_unrecvd_packet(sock);
1372 size_t max_packets_in_reassembly_q
1373 = sock->max_block_size_multiple(sock->m_opts.m_st_rcv_buf_max_size,
1374 &sock->m_opts.m_st_rcv_buf_max_size_slack_percent);
1376 size_t rcv_buf_size;
1379 rcv_buf_size = sock->m_rcv_buf.data_size();
1384 max_packets_in_reassembly_q /= sock->max_block_size();
1388 max_packets_in_reassembly_q += rcv_packets_with_gaps.size();
1391 if (max_packets_in_reassembly_q < max_packets_after_unrecvd_packet)
1393 max_packets_after_unrecvd_packet = max_packets_in_reassembly_q;
1398 FLOW_LOG_TRACE(
"Unexpected Receive buffer limits: safety net [" << max_packets_after_unrecvd_packet <<
"] <= "
1399 "real limit [" << max_packets_in_reassembly_q <<
"], but the opposite is typical. "
1400 "See details just below.");
1403 if (rcv_packets_with_gaps.size() + 1 > max_packets_after_unrecvd_packet)
1417 "Received [" << packet->m_type_ostream_manip <<
"] with "
1418 "sequence numbers [" << packet->m_seq_num <<
", " << (packet->m_seq_num + data_size) <<
"); "
1419 "exceeded max gapped packet list size [" << max_packets_after_unrecvd_packet <<
"]; "
1420 "dropping packet.");
1425 FLOW_LOG_TRACE(
"NetFlow worker thread working on [" << sock <<
"]. "
1426 "Enqueueing [" << packet->m_type_ostream_manip <<
"] payload onto reassembly queue with "
1427 "sequence numbers [" << packet->m_seq_num <<
", " << (packet->m_seq_num + data_size) <<
") "
1428 "of size [" << data_size <<
"]; "
1429 "successfully fit into max gapped packet list size [" << max_packets_after_unrecvd_packet <<
"]; "
1430 "could have fit [" << (max_packets_after_unrecvd_packet - rcv_packets_with_gaps.size()) <<
"] more.");
1434 const auto insert_result =
1436 rcv_packets_with_gaps.insert
1439 sock->m_rcv_reassembly_q_data_size += data_size;
1440 assert(insert_result.second);
1451 receive_wnd_recovery_data_received(sock);
1469 rcv_next_seq_num += slide_size;
1472 "[" << (rcv_next_seq_num - slide_size) <<
"] to "
1473 "[" << rcv_next_seq_num <<
"].");
1485 size_t total_written = 0;
1488 for (end_contig_it = start_contig_it;
1492 (end_contig_it != rcv_packets_with_gaps.end()) && (end_contig_it->first == rcv_next_seq_num);
1497 if (reassembly_in_progress)
1517 written = sock->m_rcv_buf.feed_buf_move(&rcvd_packet.
m_data, std::numeric_limits<size_t>::max());
1519 buf_size = sock->m_rcv_buf.data_size();
1521 total_written += written;
1527 assert(written != 0);
1530 advance_seq_num(&rcv_next_seq_num, rcvd_packet.
m_size);
1533 "[" << rcv_next_seq_num <<
"]; packet subsumed by this move.");
1537 rcv_packets_with_gaps.erase(start_contig_it, end_contig_it);
1538 sock->m_rcv_reassembly_q_data_size -= total_written;
1548 return uint64_t(sock->opt(sock->m_opts.m_st_rcv_max_packets_after_unrecvd_packet_ratio_percent)) *
1549 uint64_t(sock->opt(sock->m_opts.m_st_rcv_buf_max_size)) /
1550 uint64_t(sock->max_block_size()) /
1558 *first_gap_exists = !sock->m_rcv_packets_with_gaps.empty();
1560 if (*first_gap_exists)
1562 *seq_num_after_first_gap = sock->m_rcv_packets_with_gaps.begin()->first;
1575 sock->m_rcv_stats.total_to_send_ack_packet(data_size);
1577 const size_t acks_pending_before_this = sock->m_rcv_pending_acks.size();
1579 static_assert(std::is_aggregate_v<Peer_socket::Individual_ack>,
1580 "We want it to be direct-initializable.");
1581 static_assert((!std::is_copy_constructible_v<Peer_socket::Individual_ack>)
1582 && (!std::is_copy_assignable_v<Peer_socket::Individual_ack>),
1583 "We want it to be noncopyable but rather passed-around via its ::Ptr.");
1589 sock->m_rcv_pending_acks.push_back
1618 if (m_socks_with_accumulated_pending_acks.insert(sock).second)
1624 sock->m_rcv_pending_acks_size_at_recv_handler_start = acks_pending_before_this;
1631 using boost::chrono::milliseconds;
1632 using boost::chrono::microseconds;
1633 using boost::chrono::duration_cast;
1634 using boost::chrono::round;
1642 vector<Peer_socket::Individual_ack::Ptr>& pending_acks = sock->m_rcv_pending_acks;
1647 FLOW_LOG_TRACE(
"Was about to perform accumulated acknowledgment tasks on [" << sock <<
"] but skipping because "
1648 "state is now [" << sock->m_int_state <<
"].");
1653 assert(!pending_acks.empty());
1698 const Fine_duration delayed_ack_timer_period = sock->opt(sock->m_opts.m_st_delayed_ack_timer_period);
1700 bool force_ack = delayed_ack_timer_period == Fine_duration::zero();
1705 (
"Delayed [ACK] feature disabled on [" << sock <<
"]; forcing immediate [ACK]. "
1706 "Receive window state: [" << sock->m_rcv_init_seq_num <<
", " << sock->m_rcv_next_seq_num <<
") "
1707 "| " << sock->m_rcv_packets_with_gaps.size() <<
":{...}.");
1709 else if (!sock->m_rcv_packets_with_gaps.empty())
1721 for (
size_t ack_idx = sock->m_rcv_pending_acks_size_at_recv_handler_start;
1722 ack_idx != pending_acks.size(); ++ack_idx)
1724 ack = pending_acks[ack_idx];
1725 if (ack->m_seq_num > sock->m_rcv_next_seq_num)
1735 (
"On [" << sock <<
"] "
1736 "received out-of-order packet [" << ack->m_seq_num <<
", size " << ack->m_data_size <<
", "
1737 "rexmit " << ack->m_rexmit_id <<
"]; "
1738 "forcing immediate [ACK]. "
1739 "Receive window state: [" << sock->m_rcv_init_seq_num <<
", " << sock->m_rcv_next_seq_num <<
") "
1740 "| " << sock->m_rcv_packets_with_gaps.size() <<
":{...}.");
1748 = sock->opt(sock->m_opts.m_st_max_full_blocks_before_ack_send) * sock->max_block_size();
1752 bytes += ack->m_data_size;
1763 "accumulated at least [" << limit <<
"] bytes to acknowledge; "
1764 "forcing immediate [ACK].");
1790 if (sock->m_rcv_pending_acks_size_at_recv_handler_start != 0)
1793 "canceling delayed [ACK] timer due to forcing "
1794 "immediate [ACK]; would have fired "
1795 "in [" << round<milliseconds>(sock->m_rcv_delayed_ack_timer.expiry() - Fine_clock::now()) <<
"] "
1798 if (sock->m_rcv_delayed_ack_timer.cancel() == 0)
1804 "tried to cancel delayed [ACK] timer while "
1805 "forcing [ACK], but it was already just about to fire.");
1813 async_low_lvl_ack_send(sock);
1814 assert(pending_acks.empty());
1836 if (sock->m_rcv_pending_acks_size_at_recv_handler_start == 0)
1840 sock->m_rcv_delayed_ack_timer.expires_after(delayed_ack_timer_period);
1843 "scheduled delayed [ACK] timer to fire "
1844 "in [" << round<milliseconds>(delayed_ack_timer_period) <<
"].");
1847 sock->m_rcv_delayed_ack_timer.async_wait([
this, socket_id, sock](
const Error_code& sys_err_code)
1849 async_low_lvl_ack_send(sock, sys_err_code);
1857 sock->m_rcv_stats.current_pending_to_ack_packets(pending_acks.size());
1864 using boost::algorithm::join;
1879 (
"Receive window state for [" << sock <<
"]: "
1880 "[" << sock->m_rcv_init_seq_num <<
", " << sock->m_rcv_next_seq_num <<
") "
1881 "| " << rcv_packets_with_gaps.size() <<
":{...}.");
1892 vector<string> pkt_strs;
1893 pkt_strs.reserve(rcv_packets_with_gaps.size());
1895 const size_t MAX_TO_SHOW = 100;
1896 bool skipped_some =
false;
1900 pkt_it != rcv_packets_with_gaps.end();
1903 const bool last_iteration = (count == rcv_packets_with_gaps.size() - 1);
1905 if ((!skipped_some) && (count > MAX_TO_SHOW) && (!last_iteration))
1908 skipped_some =
true;
1919 if (!last_iteration)
1926 pkt_str =
"[...skipped...] ";
1931 get_seq_num_range(pkt_it, &start, &end);
1934 pkt_strs.push_back(pkt_str);
1941 "Receive window state for [" << sock <<
"]: "
1942 "[" << sock->m_rcv_init_seq_num <<
", " << sock->m_rcv_next_seq_num <<
") "
1943 "| " << rcv_packets_with_gaps.size() <<
":{" << join(pkt_strs,
" ") <<
"}.");
1947 boost::shared_ptr<const Ack_packet> ack)
2001 sock->m_snd_pending_rcv_wnd = ack->m_rcv_wnd;
2004 sock->m_rcv_acked_packets.insert(sock->m_rcv_acked_packets.end(),
2005 ack->m_rcv_acked_packets.begin(), ack->m_rcv_acked_packets.end());
2006 m_socks_with_accumulated_acks.insert(sock);
2008 FLOW_LOG_TRACE(
"NetFlow worker thread working on [" << sock <<
"]. "
2009 "Received and accumulated [" << ack->m_type_ostream_manip <<
"] with "
2010 "[" << ack->m_rcv_acked_packets.size() <<
"] individual acknowledgments "
2011 "and rcv_wnd = [" << ack->m_rcv_wnd <<
"]; total for this socket in this "
2012 "receive handler is [" << sock->m_rcv_acked_packets.size() <<
"] individual acknowledgments.");
2014 sock->m_snd_stats.received_low_lvl_ack_packet(ack->m_rcv_acked_packets.empty());
2022 using boost::unordered_set;
2023 using boost::chrono::round;
2024 using boost::chrono::milliseconds;
2025 using boost::chrono::seconds;
2035 log_accumulated_acks(sock);
2039 using Acks = vector<Ack_packet::Individual_ack::Ptr>;
2040 Acks& acked_packets = sock->m_rcv_acked_packets;
2053 FLOW_LOG_INFO(
"NetFlow worker thread working on [" << sock <<
"]. "
2054 "Accumulated [ACK] packets with [" << acked_packets.size() <<
"] "
2055 "individual acknowledgments, but state is now [" << sock->m_int_state <<
"]; ignoring ACKs forever.");
2199 const bool rexmit_on = sock->rexmit_on();
2200 auto& snd_stats = sock->m_snd_stats;
2201 auto& snd_flying_pkts_by_when = sock->m_snd_flying_pkts_by_sent_when;
2204 auto& pkts_marked_to_drop = sock->m_snd_temp_pkts_marked_to_drop;
2205 pkts_marked_to_drop.clear();
2208 const bool could_send_before_acks = can_send(sock);
2210 const bool had_rexmit_data_before_acks = !sock->m_snd_rexmit_q.empty();
2215 unordered_set<Peer_socket::order_num_t> flying_now_acked_pkts;
2218 size_t clean_acked_bytes = 0;
2219 size_t clean_acked_packets = 0;
2223 using Clean_acked_packet = tuple<Fine_duration, size_t, size_t>;
2224 vector<Clean_acked_packet> clean_acked_packet_events;
2225 clean_acked_packet_events.reserve(min(acked_packets.size(), snd_flying_pkts_by_when.size()));
2243 const bool error_ack = !categorize_individual_ack(socket_id, sock, ack, &dupe_or_late, &flying_pkt_it);
2256 if (flying_pkt_it != snd_flying_pkts_by_when.past_oldest())
2259 flying_pkt = flying_pkt_it->second;
2260 round_trip_time = compute_rtt_on_ack(flying_pkt, time_now, ack, &sent_when);
2269 assert(!dupe_or_late);
2271 assert(flying_pkt_it != snd_flying_pkts_by_when.past_oldest());
2275 new_round_trip_time_sample(sock, round_trip_time);
2285 const size_t bytes_acked = flying_pkt->m_size;
2287 clean_acked_packet_events.emplace_back(round_trip_time, bytes_acked, cwnd_bytes);
2290 snd_flying_pkts_erase_one(sock, flying_pkt_it);
2293 clean_acked_bytes += bytes_acked;
2294 ++clean_acked_packets;
2319 flying_now_acked_pkts.insert(sent_when->
m_order_num);
2328 = categorize_pkts_as_dropped_on_acks(sock, flying_now_acked_pkts);
2334 size_t dropped_pkts;
2335 size_t dropped_bytes;
2336 size_t cong_ctl_dropped_bytes;
2337 size_t cong_ctl_dropped_pkts;
2338 if (!drop_pkts_on_acks(sock, last_dropped_pkt_it,
2339 &cong_ctl_dropped_pkts, &cong_ctl_dropped_bytes,
2340 &dropped_pkts, &dropped_bytes, &pkts_marked_to_drop))
2361 if (dropped_pkts != 0)
2364 FLOW_LOG_INFO(
"NetFlow worker thread working on [" << sock <<
"]. "
2365 "Considering Dropped: [" << dropped_bytes <<
"] bytes = [" << dropped_pkts <<
"] packets.");
2367 if (cong_ctl_dropped_pkts != 0)
2370 assert(cong_ctl_dropped_bytes != 0);
2372 FLOW_LOG_INFO(
"cong_ctl [" << sock <<
"] update: loss event: "
2373 "Dropped [" << cong_ctl_dropped_bytes <<
"] bytes "
2374 "= [" << cong_ctl_dropped_pkts <<
"] packets.");
2376 sock->m_snd_cong_ctl->on_loss_event(cong_ctl_dropped_bytes, cong_ctl_dropped_pkts);
2377 sock->m_snd_last_loss_event_when = Fine_clock::now();
2384 assert(dropped_pkts == 0);
2385 assert(cong_ctl_dropped_pkts == 0);
2388 if (clean_acked_packets != 0)
2390 assert(clean_acked_bytes != 0);
2391 assert(!clean_acked_packet_events.empty());
2394 for (
const auto& [rtt, bytes, cwnd_bytes] : clean_acked_packet_events)
2396 FLOW_LOG_TRACE(
"cong_ctl [" << sock <<
"] update: clean individual acknowledgment: "
2397 "[" << sock->bytes_blocks_str(bytes) <<
"] with RTT [" << round<milliseconds>(rtt) <<
2398 "] and sent_cwnd_bytes [" << cwnd_bytes <<
"].");
2400 sock->m_snd_cong_ctl->on_individual_ack(rtt, bytes, cwnd_bytes);
2403 FLOW_LOG_TRACE(
"cong_ctl/bw_est [" << sock <<
"] update: clean acknowledgments: "
2404 "[" << sock->bytes_blocks_str(clean_acked_bytes) <<
"] = "
2405 "[" << clean_acked_packets <<
"] packets.");
2408 sock->m_snd_bandwidth_estimator->on_acks(clean_acked_bytes);
2409 sock->m_snd_cong_ctl->on_acks(clean_acked_bytes, clean_acked_packets);
2417 if (dropped_pkts != 0)
2420 snd_stats.dropped_data(dropped_bytes, dropped_pkts);
2422 const seconds MIN_TIME_BETWEEN_LOGS(1);
2423 const Fine_duration since_last_loss_sock_log = Fine_clock::now() - m_last_loss_sock_log_when;
2425 if (since_last_loss_sock_log > MIN_TIME_BETWEEN_LOGS)
2427 FLOW_LOG_INFO(
"Will log socket state on loss, because last such loss-driven logging was "
2428 "[" << round<milliseconds>(since_last_loss_sock_log) <<
" >"
2429 " " << MIN_TIME_BETWEEN_LOGS <<
"] ago.");
2430 sock_log_detail(sock);
2431 m_last_loss_sock_log_when = Fine_clock::now();
2435 FLOW_LOG_INFO(
"Will NOT log socket state on loss, because last such loss-driven logging was "
2436 "[" << round<milliseconds>(since_last_loss_sock_log) <<
" <="
2437 " " << MIN_TIME_BETWEEN_LOGS <<
"] ago.");
2442 log_snd_window(sock);
2452 drop_timer->start_contemporaneous_events();
2454 for (
const auto pkt_order_num : flying_now_acked_pkts)
2456 drop_timer->on_ack(pkt_order_num);
2457 drop_timer->on_packet_no_longer_in_flight(pkt_order_num);
2459 for (
const auto pkt_order_num : pkts_marked_to_drop)
2461 drop_timer->on_packet_no_longer_in_flight(pkt_order_num);
2464 drop_timer->end_contemporaneous_events();
2468 if (sock->m_snd_pending_rcv_wnd != sock->m_snd_remote_rcv_wnd)
2471 "rcv_wnd change [" << sock->m_snd_remote_rcv_wnd <<
"] => [" << sock->m_snd_pending_rcv_wnd <<
"].");
2472 sock->m_snd_remote_rcv_wnd = sock->m_snd_pending_rcv_wnd;
2483 sock->m_snd_stats.updated_rcv_wnd(sock->m_snd_remote_rcv_wnd < sock->max_block_size());
2492 if ((!could_send_before_acks) || (
rexmit_on && (!had_rexmit_data_before_acks)))
2494 send_worker(sock,
true);
2505 assert(dupe_or_late);
2506 assert(acked_pkt_it);
2550 const bool rexmit_on = sock->rexmit_on();
2551 auto& snd_flying_pkts_by_when = sock->m_snd_flying_pkts_by_sent_when;
2552 auto& snd_flying_pkts_by_seq = sock->m_snd_flying_pkts_by_seq_num;
2553 auto& snd_stats = sock->m_snd_stats;
2558 const unsigned int rexmit_id = ack->m_rexmit_id;
2562 snd_stats.received_ack();
2578 "acknowledgment [" << seq_num <<
", ...) is outside (ISN, snd_next) "
2579 "range (" << sock->m_snd_init_seq_num <<
", " << sock->m_snd_next_seq_num <<
").");
2582 snd_stats.error_ack();
2588 rst_and_close_connection_immediately(socket_id, sock,
2598 *acked_pkt_it = snd_flying_pkts_by_when.find(seq_num);
2599 if (*acked_pkt_it == snd_flying_pkts_by_when.past_oldest())
2666 if (pkt_it != snd_flying_pkts_by_seq.begin())
2672 get_seq_num_range(pkt_it->second, &l1, &l2);
2674 assert(l1 < seq_num);
2680 snd_stats.error_ack();
2684 "acknowledgment [" << seq_num <<
", ...) is at least partially inside "
2685 "packet [" << l1 <<
", " << l2 <<
").");
2721 snd_stats.late_or_dupe_ack();
2723 FLOW_LOG_INFO(
"NetFlow worker thread working on [" << sock <<
"]. "
2724 "Acknowledged packet [" << seq_num <<
", ...) is duplicate or late (or invalid). "
2725 "RTT unknown. Ignoring.");
2728 *dupe_or_late =
true;
2729 assert(*acked_pkt_it == snd_flying_pkts_by_when.past_oldest());
2733 assert(*acked_pkt_it != snd_flying_pkts_by_when.past_oldest());
2738 const unsigned int acked_rexmit_id =
rexmit_on ? acked_pkt.
m_packet->m_rexmit_id : 0;
2740 get_seq_num_range(*acked_pkt_it, 0, &seq_num_end);
2744 if (rexmit_id > acked_rexmit_id)
2748 "Acknowledged packet [" << seq_num <<
", " << seq_num_end <<
") "
2749 "rexmit_id [" <<
int(rexmit_id) <<
"] "
2750 "exceeds highest sent rexmit_id [" <<
int(acked_rexmit_id) <<
"].");
2759 if (rexmit_id != acked_rexmit_id)
2761 assert(rexmit_id < acked_rexmit_id);
2799 snd_stats.late_or_dupe_ack();
2801 FLOW_LOG_INFO(
"NetFlow worker thread working on [" << sock <<
"]. "
2802 "Acknowledged packet [" << seq_num <<
", " << seq_num_end <<
") "
2803 "order_num [" << acked_pkt.
m_sent_when[rexmit_id].m_order_num <<
"] "
2804 "rexmit_id [" <<
int(rexmit_id) <<
"] "
2805 "is less than highest sent [" <<
int(acked_rexmit_id) <<
"]. Ignoring.");
2808 *dupe_or_late =
true;
2809 assert(*acked_pkt_it != snd_flying_pkts_by_when.past_oldest());
2813 assert(rexmit_id == acked_rexmit_id);
2818 snd_stats.good_ack(acked_pkt.
m_size);
2821 *dupe_or_late =
false;
2822 assert(*acked_pkt_it != snd_flying_pkts_by_when.past_oldest());
2831 using boost::chrono::milliseconds;
2832 using boost::chrono::round;
2852 const unsigned int rexmit_id = ack->m_rexmit_id;
2854 *sent_when = &(flying_pkt->m_sent_when[rexmit_id]);
2863 const auto& ack_delay = ack->m_delay;
2864 round_trip_time = time_now - (*sent_when)->m_sent_time - ack_delay;
2866 if (round_trip_time.count() < 0)
2875 FLOW_LOG_TRACE(
"Acknowledged packet [" << ack->m_seq_num <<
", ...) "
2876 "order_num [" << order_num <<
"] has negative "
2877 "RTT [" << round_trip_time <<
"]; assuming zero. "
2878 "Sent at [" << (*sent_when)->m_sent_time <<
"]; "
2879 "received at [" << time_now <<
"]; "
2880 "receiver-reported ACK delay [" << ack_delay <<
"].");
2881 round_trip_time = Fine_duration::zero();
2883 FLOW_LOG_TRACE(
"Acknowledged packet [" << ack->m_seq_num <<
", ...) "
2884 "order_num [" << order_num <<
"] "
2885 "has RTT [" << round<milliseconds>(round_trip_time) <<
"] "
2886 "(ACK delay [" << round<milliseconds>(ack_delay) <<
"]).");
2888 return round_trip_time;
2893 const boost::unordered_set<Peer_socket::order_num_t>& flying_now_acked_pkts)
2895 using std::priority_queue;
2911 auto& snd_flying_pkts_by_when = sock->m_snd_flying_pkts_by_sent_when;
2982 priority_queue<Peer_socket::order_num_t>
2983 high_ack_count_q(flying_now_acked_pkts.begin(), flying_now_acked_pkts.end());
2987 ack_count_t ack_increment_after_me = 0;
2991 for (last_dropped_pkt_it = snd_flying_pkts_by_when.newest();
2992 last_dropped_pkt_it != snd_flying_pkts_by_when.past_oldest();
2993 ++last_dropped_pkt_it)
3009 while ((!high_ack_count_q.empty()) &&
3011 (high_ack_count_q.top() > cur_pkt_sent_when.
m_order_num))
3014 ++ack_increment_after_me;
3017 high_ack_count_q.pop();
3024 if (cur_sent_pkt.
m_acks_after_me > S_MAX_LATER_ACKS_BEFORE_CONSIDERING_DROPPED)
3038 get_seq_num_range(last_dropped_pkt_it, &cur_pkt_seq_num, &cur_pkt_seq_num_end);
3041 (
"Unacknowledged packet [" << cur_pkt_seq_num <<
", " << cur_pkt_seq_num_end <<
") "
3042 "order_num [" << cur_pkt_sent_when.
m_order_num <<
"] has "
3044 "for later packets; considering it and "
3045 "all unacknowledged packets sent earlier as Dropped.");
3055 return last_dropped_pkt_it;
3060 size_t* cong_ctl_dropped_pkts,
size_t* cong_ctl_dropped_bytes,
3061 size_t* dropped_pkts,
size_t* dropped_bytes,
3062 std::vector<Peer_socket::order_num_t>* pkts_marked_to_drop)
3082 const bool rexmit_on = sock->rexmit_on();
3083 auto& snd_flying_pkts_by_when = sock->m_snd_flying_pkts_by_sent_when;
3084 auto& snd_stats = sock->m_snd_stats;
3115 *dropped_pkts = snd_flying_pkts_by_when.size();
3116 *dropped_bytes = sock->m_snd_flying_bytes;
3118 *cong_ctl_dropped_bytes = 0;
3119 *cong_ctl_dropped_pkts = 0;
3120 bool loss_event_finished =
false;
3135 auto& snd_rexmit_q = sock->m_snd_rexmit_q;
3136 decltype(sock->m_snd_rexmit_q)::iterator snd_rexmit_q_fulcrum_it = snd_rexmit_q.end();
3139 assert(pkts_marked_to_drop->empty());
3141 auto pkt_it = last_dropped_pkt_it;
3142 while (pkt_it != snd_flying_pkts_by_when.past_oldest())
3145 auto next_pkt_it = boost::next(pkt_it);
3153 if (!loss_event_finished)
3158 && (sent_when.
m_sent_time < sock->m_snd_last_loss_event_when))
3162 loss_event_finished =
true;
3167 *cong_ctl_dropped_bytes += sent_pkt->m_size;
3168 ++(*cong_ctl_dropped_pkts);
3177 if (!ok_to_rexmit_or_close(sock, pkt_it,
true))
3191 snd_rexmit_q_fulcrum_it = snd_rexmit_q.insert(snd_rexmit_q_fulcrum_it, sent_pkt);
3192 ++sock->m_snd_rexmit_q_size;
3202 "Scoreboard must not get otherwise changed when a packet is erased.");
3203 pkts_marked_to_drop->push_back(sent_when.
m_order_num);
3204 snd_flying_pkts_erase_one(sock, pkt_it);
3206 pkt_it = next_pkt_it;
3210 *dropped_pkts -= snd_flying_pkts_by_when.size();
3211 *dropped_bytes -= sock->m_snd_flying_bytes;
3213 if (*cong_ctl_dropped_pkts != 0)
3216 snd_stats.loss_event();
3224 using boost::algorithm::join;
3225 using boost::chrono::symbol_format;
3228 using std::transform;
3237 using Acks = vector<Ack::Ptr>;
3238 const Acks& acked_packets = sock->m_rcv_acked_packets;
3244 vector<string> ack_strs(acked_packets.size());
3245 transform(acked_packets.begin(), acked_packets.end(), ack_strs.begin(),
3246 [](Ack::Const_ptr ack) ->
string
3248 return util::ostream_op_string(
'[', ack->m_seq_num,
", ", int(ack->m_rexmit_id),
", ",
3252 const string ack_str = join(ack_strs,
" ");
3255 "Accumulated [ACK] packets with "
3256 "acknowledgments [seq_num, rexmit_id, delay]: "
3257 "[" << ack_str <<
"].");
3261 FLOW_LOG_TRACE(
"NetFlow worker thread working on [" << sock <<
"]. "
3262 "Accumulated [ACK] packets with "
3263 "[" << acked_packets.size() <<
"] individual acknowledgments.");
3268 log_snd_window(sock);
3284 auto& snd_flying_pkts_by_when = sock->m_snd_flying_pkts_by_sent_when;
3285 auto& snd_flying_pkts_by_seq = sock->m_snd_flying_pkts_by_seq_num;
3288 assert(!snd_flying_pkts_by_when.empty());
3295 log_snd_window(sock);
3297 const bool rexmit_on = sock->rexmit_on();
3299 const bool could_send_before_drops = can_send(sock);
3301 const bool had_rexmit_data_before_drops = !sock->m_snd_rexmit_q.empty();
3306 size_t cong_ctl_dropped_bytes = 0;
3307 size_t cong_ctl_dropped_pkts = 0;
3309 if (drop_all_packets)
3311 cong_ctl_dropped_bytes = sock->m_snd_flying_bytes;
3312 cong_ctl_dropped_pkts = snd_flying_pkts_by_when.size();
3319 pkt_it != snd_flying_pkts_by_when.past_newest();
3323 if (!ok_to_rexmit_or_close(sock, prior(pkt_it.base()),
false))
3334 sock->m_snd_rexmit_q.push_back(pkt_it->second);
3336 sock->m_snd_rexmit_q_size += cong_ctl_dropped_pkts;
3343 snd_flying_pkts_updated(sock, snd_flying_pkts_by_when.newest(), snd_flying_pkts_by_when.past_oldest(),
false);
3344 snd_flying_pkts_by_when.clear();
3345 snd_flying_pkts_by_seq.clear();
3347 packet_marked_to_drop_or_drop_all = 0;
3355 cong_ctl_dropped_bytes = oldest_pkt->m_size;
3356 cong_ctl_dropped_pkts = 1;
3361 if (!ok_to_rexmit_or_close(sock, oldest_pkt_it,
false))
3368 sock->m_snd_rexmit_q.push_back(oldest_pkt);
3369 ++sock->m_snd_rexmit_q_size;
3374 packet_marked_to_drop_or_drop_all = oldest_pkt->m_sent_when.back().m_order_num;
3377 snd_flying_pkts_erase_one(sock, oldest_pkt_it);
3402 FLOW_LOG_INFO(
"cong_ctl [" << sock <<
"] update: Drop Timeout event: "
3403 "Dropped [" << cong_ctl_dropped_bytes <<
"] bytes = [" << cong_ctl_dropped_pkts <<
"] packets.");
3406 sock->m_snd_cong_ctl->on_drop_timeout(cong_ctl_dropped_bytes, cong_ctl_dropped_pkts);
3407 sock->m_snd_last_loss_event_when = Fine_clock::now();
3410 sock->m_snd_stats.drop_timeout();
3411 sock->m_snd_stats.dropped_data(cong_ctl_dropped_bytes, cong_ctl_dropped_pkts);
3414 log_snd_window(sock);
3419 drop_timer->start_contemporaneous_events();
3427 if (packet_marked_to_drop_or_drop_all == 0)
3430 drop_timer->on_no_packets_in_flight_any_longer();
3434 drop_timer->on_packet_no_longer_in_flight(packet_marked_to_drop_or_drop_all);
3439 drop_timer->end_contemporaneous_events();
3446 if ((!could_send_before_drops) || (
rexmit_on && (!had_rexmit_data_before_drops)))
3448 send_worker(sock,
false);
3458 using boost::ratio_subtract;
3459 using boost::ratio_string;
3460 using boost::chrono::round;
3461 using boost::chrono::milliseconds;
3462 using boost::chrono::microseconds;
3463 using boost::chrono::seconds;
3492 if (srtt == Fine_duration::zero())
3500 "srtt = [" << round<milliseconds>(srtt) <<
" = " << srtt <<
"]; "
3501 "rtt_var = [" << round<milliseconds>(rtt_var) <<
" = " << rtt_var <<
"]; "
3502 "rtt = [" << rtt <<
"].");
3520 using Alpha = ratio<1, 8>;
3521 using One_minus_alpha = ratio_subtract<ratio<1>, Alpha>;
3522 using Beta = ratio<1, 4>;
3523 using One_minus_beta = ratio_subtract<ratio<1>, Beta>;
3528 if (abs_srtt_minus_rtt.count() < 0)
3530 abs_srtt_minus_rtt = -abs_srtt_minus_rtt;
3535 = rtt_var * One_minus_beta::num / One_minus_beta::den
3536 + abs_srtt_minus_rtt * Beta::num / Beta::den;
3538 = srtt * One_minus_alpha::num / One_minus_alpha::den
3539 + rtt * Alpha::num / Alpha::den;
3543 "srtt = [" << round<milliseconds>(srtt) <<
" = " << srtt <<
"]; "
3544 "rtt_var = [" << round<milliseconds>(rtt_var) <<
" = " << rtt_var <<
"]; "
3545 "rtt = [" << rtt <<
"]; "
3546 "prev_srtt = [" << prev_srtt <<
"]; "
3547 "prev_rtt_var = [" << prev_rtt_var <<
"]; "
3548 "alpha = " << (ratio_string<Alpha, char>::prefix()) <<
"; "
3549 "(1 - alpha) = " << (ratio_string<One_minus_alpha, char>::prefix()) <<
"; "
3550 "beta = " << (ratio_string<Beta, char>::prefix()) <<
"; "
3551 "(1 - beta) = " << (ratio_string<One_minus_beta, char>::prefix()) <<
"; "
3552 "|srtt - rtt| = [" << abs_srtt_minus_rtt <<
"].");
3569 const Fine_duration clock_resolution_at_least = microseconds(500);
3571 const Fine_duration ceiling = sock->opt(sock->m_opts.m_dyn_drop_timeout_ceiling);
3572 const unsigned int k = 4;
3576 const Fine_duration srtt_plus_var_term = srtt + max(clock_resolution_at_least, rtt_var_k);
3577 dto = max(srtt_plus_var_term, floor);
3578 dto = min(dto, ceiling);
3582 "dto = [" << round<milliseconds>(dto) <<
" = " << dto <<
"]; "
3583 "rtt_var * k = [" << rtt_var_k <<
"]; "
3584 "srtt + max(G, rtt_var * k) = [" << srtt_plus_var_term <<
"]; "
3585 "k = [" << k <<
"]; "
3586 "floor = [" << floor <<
"]; ceiling = [" << ceiling <<
"]; "
3587 "clock_resolution = [" << clock_resolution_at_least <<
"]; "
3588 "prev_dto = [" << prev_dto <<
"].");
3596 using boost::algorithm::join;
3604 const auto& snd_flying_pkts_by_seq = sock->m_snd_flying_pkts_by_seq_num;
3605 const auto& snd_flying_pkts_by_when = sock->m_snd_flying_pkts_by_sent_when;
3606 const size_t num_flying_pkts = snd_flying_pkts_by_seq.size();
3610 if (snd_flying_pkts_by_seq.empty())
3614 "Send window state for [" << sock <<
"]: cong_wnd "
3615 "[" << sock->bytes_blocks_str(sock->m_snd_cong_ctl->congestion_window_bytes()) <<
"]; "
3616 "sent+acked/dropped "
3617 "[" << sock->m_snd_init_seq_num <<
", " << sock->m_snd_next_seq_num <<
") "
3618 "unsent [" << sock->m_snd_next_seq_num <<
", ...).");
3629 (
"Send window state for [" << sock <<
"]: cong_wnd "
3630 "[" << sock->bytes_blocks_str(sock->m_snd_cong_ctl->congestion_window_bytes()) <<
"]; "
3631 "sent+acked/dropped [" << sock->m_snd_init_seq_num <<
", " << snd_flying_pkts_by_seq.begin()->first <<
") "
3632 "in-flight [" << sock->m_snd_flying_bytes <<
"] bytes: " << num_flying_pkts <<
":{...} "
3633 "unsent [" << sock->m_snd_next_seq_num <<
", ...).");
3640 const bool rexmit_on = sock->rexmit_on();
3642 vector<string> pkt_strs;
3643 pkt_strs.reserve(num_flying_pkts);
3645 pkt_it_it != snd_flying_pkts_by_seq.end();
3649 get_seq_num_range(pkt_it_it->second, &start, &end);
3653 String_ostream pkt_str_os;
3654 pkt_str_os.os() <<
'[' << start;
3657 pkt_str_os.os() <<
'[' << int(sent_pkt->m_packet->m_rexmit_id) <<
'/' << sent_pkt->m_sent_when.back().m_order_num
3662 pkt_str_os.os() <<
", ";
3664 pkt_str_os.os() << end <<
")<" << sent_pkt->m_acks_after_me <<
"acks" << flush;
3666 pkt_strs.push_back(pkt_str_os.str());
3671 "Send window state for [" << sock <<
"]: cong_wnd "
3672 "[" << sock->bytes_blocks_str(sock->m_snd_cong_ctl->congestion_window_bytes()) <<
"]; "
3673 "sent+acked/dropped [" << sock->m_snd_init_seq_num <<
", " << snd_flying_pkts_by_seq.begin()->first <<
") "
3675 "[" << sock->m_snd_flying_bytes <<
"] bytes: " << num_flying_pkts <<
":{" << join(pkt_strs,
" ") <<
3676 "} unsent [" << sock->m_snd_next_seq_num <<
", ...).");
3686 vector<string> pkt_strs_time;
3687 pkt_strs_time.reserve(num_flying_pkts);
3690 pkt_it != snd_flying_pkts_by_when.const_past_newest();
3695 get_seq_num_range(prior(pkt_it.base()), &start, &end);
3701 start,
'[',
int(sent_pkt->m_packet->m_rexmit_id),
'/',
3702 sent_pkt->m_sent_when.back().m_order_num,
"], ", end,
")<",
3703 sent_pkt->m_acks_after_me,
"acks");
3704 pkt_strs_time.push_back(pkt_str);
3708 if (pkt_strs_time != pkt_strs)
3712 "Sorted by time sent: {" << join(pkt_strs_time,
" ") <<
"}.");
3721 if (flying_packets.empty())
3728 const Peer_socket::Sent_pkt_by_seq_num_map::value_type& highest_val = *(prior(flying_packets.end()));
3732 advance_seq_num(&seq_num, highest_val.second->second->m_size);
3747 get_seq_num_range(pkt_it, &seq_num, &seq_num_end);
3749 if (sock->rexmit_on())
3752 (
"On [" << sock <<
"] erasing packet [" << seq_num <<
", " << seq_num_end <<
") "
3753 "order_num [" << order_num <<
"] rexmit_id [" <<
int(sent_pkt.
m_packet->m_rexmit_id) <<
"] from "
3754 "snd_flying_pkts* and friends.");
3759 (
"On [" << sock <<
"] erasing packet [" << seq_num <<
", " << seq_num_end <<
") "
3760 "order_num [" << order_num <<
"] from snd_flying_pkts* and friends.");
3765 snd_flying_pkts_updated(sock, pkt_it, boost::next(pkt_it),
false);
3768 sock->m_snd_flying_pkts_by_seq_num.erase(pkt_it->first);
3769 sock->m_snd_flying_pkts_by_sent_when.erase(pkt_it);
3779 using std::make_pair;
3783 auto& snd_flying_pkts_by_when = sock->m_snd_flying_pkts_by_sent_when;
3786 const auto insert_result =
3788 snd_flying_pkts_by_when.insert(make_pair(seq_num, sent_pkt));
3792 assert(insert_result.second);
3793 assert(insert_result.first == pkt_it);
3795 snd_flying_pkts_updated(sock, pkt_it, boost::next(pkt_it),
true);
3799 const auto insert_result_by_seq =
3801 sock->m_snd_flying_pkts_by_seq_num.insert(make_pair(seq_num, pkt_it));
3804 assert(insert_result_by_seq.second);
3825 get_seq_num_range(pkt_it, 0, &seq_num_end);
3826 if (sock->rexmit_on())
3829 (
"On [" << sock <<
"] pushing packet [" << seq_num <<
", " << seq_num_end <<
") "
3830 "rexmit_id [" <<
int(sent_pkt->m_packet->m_rexmit_id) <<
"] onto snd_flying_pkts and friends.");
3835 (
"On [" << sock <<
"] pushing packet [" << seq_num <<
", " << seq_num_end <<
") "
3836 "onto snd_flying_pkts and friends.");
3847 if (pkt_begin == pkt_end)
3853 const auto& snd_flying_pkts_by_when = sock->m_snd_flying_pkts_by_sent_when;
3854 size_t& snd_flying_bytes = sock->m_snd_flying_bytes;
3858 && (pkt_begin == snd_flying_pkts_by_when.const_newest())
3859 && (pkt_end == snd_flying_pkts_by_when.const_past_oldest()))
3861 snd_flying_bytes = 0;
3865 size_t delta_bytes = 0;
3866 for ( ; pkt_begin != pkt_end; ++pkt_begin)
3868 delta_bytes += pkt_begin->second->m_size;
3870 added ? (snd_flying_bytes += delta_bytes) : (snd_flying_bytes -= delta_bytes);
3874 "In-flight [" << sock->bytes_blocks_str(snd_flying_bytes) <<
"].");
3879 bool defer_delta_check)
3884 get_seq_num_range(pkt_it, &seq_num, &seq_num_end);
3886 const unsigned int rexmit_id = pkt.
m_packet->m_rexmit_id;
3887 FLOW_LOG_TRACE(
"On [" << sock <<
"] attempting to queue for retransmission "
3888 "[" << seq_num <<
", " << seq_num_end <<
"] which has been "
3889 "retransmitted [" << rexmit_id <<
"] times so far.");
3890 if (rexmit_id == sock->opt(sock->m_opts.m_st_max_rexmissions_per_packet))
3892 rst_and_close_connection_immediately(socket_id(sock), sock,
3903 return connect_with_metadata(to, boost::asio::buffer(&S_DEFAULT_CONN_METADATA,
sizeof(S_DEFAULT_CONN_METADATA)),
3908 const boost::asio::const_buffer& serialized_metadata,
3943 [&]() { connect_worker(to, serialized_metadata, sock_opts, &sock); });
3947 if (sock->m_disconnect_cause)
3949 *err_code = sock->m_disconnect_cause;
3961 using boost::asio::buffer;
3962 using boost::asio::ip::address;
3970 auto& sock = *sock_ptr;
3977 const bool opts_ok = sock_validate_options(*sock_opts, 0, &err_code);
3980 sock.reset(sock_create(*sock_opts));
3985 sock->m_disconnect_cause = err_code;
4002 sock_non_ptr = sock_create(
m_opts.m_dyn_sock_opts);
4004 sock.reset(sock_non_ptr);
4009 sock->m_active_connect =
true;
4010 sock->m_node =
this;
4012 sock->m_remote_endpoint = to;
4014 sock->m_serialized_metadata.assign_copy(serialized_metadata);
4023 sock->m_snd_cong_ctl.reset
4032 bool ip_addr_any_error =
false;
4036 if (addr.to_v4() == util::Ip_address_v4::any())
4038 ip_addr_any_error =
true;
4041 else if (addr.is_v6())
4043 if (addr.to_v6() == util::Ip_address_v6::any())
4045 ip_addr_any_error =
true;
4049 if (ip_addr_any_error)
4052 Error_code* err_code = &sock->m_disconnect_cause;
4060 sock->m_local_port = m_ports.reserve_ephemeral_port(&sock->m_disconnect_cause);
4069 FLOW_LOG_INFO(
"NetFlow worker thread starting active-connect of [" << sock <<
"].");
4078 FLOW_LOG_WARNING(
"Cannot add [" << sock <<
"], because such a connection already exists. "
4079 "This is an ephemeral port collision and "
4080 "constitutes either a bug or an extremely unlikely condition.");
4083 Error_code* err_code = &sock->m_disconnect_cause;
4088 m_ports.return_port(sock->m_local_port, &return_err_code);
4089 assert(!return_err_code);
4100 setup_connection_timers(socket_id, sock,
true);
4105 init_seq_num = m_seq_num_generator.generate_init_seq_num();
4109 init_seq_num.
set_metadata(
'L', init_seq_num + 1, sock->max_block_size());
4111 sock->m_snd_next_seq_num = init_seq_num + 1;
4114 auto syn = create_syn(sock);
4123 m_socks[socket_id] = sock;
4132 return sync_connect_with_metadata(to, Fine_duration::max(),
4133 boost::asio::buffer(&S_DEFAULT_CONN_METADATA,
sizeof(S_DEFAULT_CONN_METADATA)),
4134 err_code, sock_opts);
4138 const boost::asio::const_buffer& serialized_metadata,
4141 return sync_connect_with_metadata(to, Fine_duration::max(), serialized_metadata, err_code, opts);
4145 const boost::asio::const_buffer& serialized_metadata,
4149 to, max_wait, serialized_metadata, _1, sock_opts);
4177 event_set->close(&dummy_prevents_throw);
4180 const auto sock = connect_with_metadata(to, serialized_metadata, err_code, sock_opts);
4194 &dummy_prevents_throw);
4198 result = event_set->sync_wait(max_wait, err_code);
4213 sock->close_abruptly(&dummy_prevents_throw);
4219 const bool ready = event_set->events_detected(err_code);
4242 *err_code = sock->m_disconnect_cause;
4252 sock->close_abruptly(&dummy_prevents_throw);
4261 using boost::chrono::microseconds;
4262 using boost::chrono::duration_cast;
4263 using boost::weak_ptr;
4267 Fine_duration rexmit_from_now = sock->opt(sock->m_opts.m_st_connect_retransmit_period);
4274 ++sock->m_init_rexmit_count;
4284 sock->m_init_rexmit_scheduled_task
4287 sock_observer = weak_ptr<Peer_socket>(sock)]
4290 auto sock = sock_observer.lock();
4293 handle_connection_rexmit_timer_event(socket_id, sock);
4301 sock->m_connection_timeout_scheduled_task
4303 sock->opt(sock->m_opts.m_st_connect_retransmit_timeout),
4304 true, &m_task_engine,
4306 sock_observer = weak_ptr<Peer_socket>(sock)]
4311 auto sock = sock_observer.lock();
4318 FLOW_LOG_INFO(
"Connection handshake timeout timer [" << sock <<
"] has been triggered; was on "
4319 "attempt [" << (sock->m_init_rexmit_count + 1) <<
"].");
4346 FLOW_LOG_INFO(
"Connection handshake retransmit timer [" << sock <<
"] triggered; was on "
4347 "attempt [" << (sock->m_init_rexmit_count + 1) <<
"].");
4358 if (sock->m_active_connect)
4397 sock->m_rcv_delayed_ack_timer.cancel();
4398 sock->m_snd_pacing_data.m_slice_timer.cancel();
4400 if (sock->m_init_rexmit_scheduled_task)
4405 if (sock->m_connection_timeout_scheduled_task)
4410 if (sock->m_rcv_in_rcv_wnd_recovery)
4413 sock->m_rcv_in_rcv_wnd_recovery =
false;
4416 if (sock->m_snd_drop_timer)
4419 sock->m_snd_drop_timer->done();
4423 sock->m_snd_drop_timer.reset();
4429 sock->m_snd_drop_timeout = sock->opt(sock->m_opts.m_st_init_drop_timeout);
4436 const auto on_timer = [
this,
socket_id, sock](
bool drop_all_packets)
4450 const Function<
size_t (
size_t max_data_size)>& snd_buf_feed_func,
4453 using boost::asio::post;
4473 if (sock->m_disconnect_cause)
4507 const size_t sent = snd_buf_feed_func(sock->max_block_size_multiple(sock->m_opts.m_st_snd_buf_max_size));
4510 sock->m_snd_stats.buffer_fed(sock->m_snd_buf.data_size());
4631 if ((!was_deqable) && (sent != 0))
4644 using boost::any_cast;
4692 switch (sock->m_int_state)
4707 "in state [" << sock->m_int_state <<
"] "
4708 "closed before asynchronous send_worker() could proceed.");
4714 "in state [" << sock->m_int_state <<
"] "
4715 "somehow had send() called on it.");
4723 using boost::asio::buffer;
4726 using boost::ratio_string;
4727 using boost::chrono::milliseconds;
4728 using boost::chrono::round;
4729 using boost::shared_ptr;
4771 using Idle_timeout_dto_factor = ratio<110, 100>;
4773 = sock->m_snd_drop_timeout * Idle_timeout_dto_factor::num / Idle_timeout_dto_factor::den;
4774 const Fine_duration since_last_send = Fine_clock::now() - sock->m_snd_last_data_sent_when;
4776 if ((sock->m_snd_last_data_sent_when !=
Fine_time_pt()) && (since_last_send > idle_timeout))
4779 FLOW_LOG_INFO(
"Idle timeout triggered for [" << sock <<
"]; "
4780 "last activity [" << round<milliseconds>(since_last_send) <<
"] ago "
4781 "exceeds idle timeout [" << round<milliseconds>(idle_timeout) <<
"] "
4782 "= " << (ratio_string<Idle_timeout_dto_factor, char>::prefix()) <<
" x "
4783 "[" << round<milliseconds>(sock->m_snd_drop_timeout) <<
"].");
4784 sock->m_snd_cong_ctl->on_idle_timeout();
4785 sock->m_snd_stats.idle_timeout();
4794 "Initial check: can_send() is false.");
4805 const bool rexmit_on = sock->rexmit_on();
4814 "Initial check: can_send() is true, but no data to send.");
4821 list<Peer_socket::Sent_packet::Ptr>& rexmit_q = sock->m_snd_rexmit_q;
4822 size_t& rexmit_q_size = sock->m_snd_rexmit_q_size;
4829 "Initial check: Will send from rexmit queue of size [" << rexmit_q_size <<
"] and/or "
4830 "Send buffer with total size [" << snd_buf.
data_size() <<
"].");
4837 shared_ptr<Data_packet> data;
4839 bool rexmit =
false;
4847 if (rexmit_q.empty())
4852 data = Low_lvl_packet::create_uninit_packet<Data_packet>(
get_logger());
4853 data->m_rexmit_id = 0;
4864 assert(!data->m_data.empty());
4867 data->m_seq_num = snd_next_seq_num;
4891 sent_pkt = rexmit_q.front();
4894 rexmit_q.pop_front();
4897 data = sent_pkt->m_packet;
4900 ++data->m_rexmit_id;
4903 sent_pkt->m_sent_when.push_back(sent_when);
4906 sent_pkt->m_acks_after_me = 0;
4930 sock->m_snd_stats.data_sent(data->m_data.size(), rexmit);
4936 "can_send() == [" <<
can_send(sock) <<
"]; "
4937 "snd_deqable() == [" <<
snd_deqable(sock) <<
"].");
4990 const size_t pipe_taken = sock->m_snd_flying_bytes;
4991 const size_t cong_wnd = sock->m_snd_cong_ctl->congestion_window_bytes();
4992 const size_t& rcv_wnd = sock->m_snd_remote_rcv_wnd;
4994 const size_t pipe_total = min(cong_wnd, rcv_wnd);
4997 = (pipe_taken < pipe_total) && ((pipe_total - pipe_taken) >= sock->max_block_size());
4999 FLOW_LOG_TRACE(
"cong_ctl [" << sock <<
"] info: can_send = [" << can <<
"]; "
5000 "pipe_taken = [" << sock->bytes_blocks_str(pipe_taken) <<
"]; "
5001 "cong_wnd = [" << sock->bytes_blocks_str(cong_wnd) <<
"]; "
5002 "rcv_wnd = [" << sock->bytes_blocks_str(rcv_wnd) <<
"].");
5008 const Function<
size_t ()>& rcv_buf_consume_func,
5011 using boost::asio::post;
5039 const bool no_bytes_available = sock->m_rcv_buf.empty();
5040 const size_t bytes_consumed = rcv_buf_consume_func();
5042 if (bytes_consumed != 0)
5050 "has successfully returned [" << bytes_consumed <<
"] bytes.");
5059 if (sock->m_rcv_buf.empty()
5075 return bytes_consumed;
5084 "has successfully returned no bytes because still not fully connected.");
5106 if (!no_bytes_available)
5110 "has data to return, but the provided buffer size is too small.");
5118 "returning no data because Receive buffer empty.");
5159 using boost::any_cast;
5249 FLOW_LOG_INFO(
'[' << sock <<
"] Receive buffer space freed, "
5250 "but state is now [" << sock->m_int_state <<
"]; ignoring.");
5255 if (sock->m_rcv_in_rcv_wnd_recovery)
5261 FLOW_LOG_TRACE(
'[' << sock <<
"] Receive buffer space freed, but "
5262 "we are already in rcv_wnd recovery mode. Nothing to do.");
5271 const size_t& last_rcv_wnd = sock->m_rcv_last_sent_rcv_wnd;
5273 if (rcv_wnd <= last_rcv_wnd)
5277 FLOW_LOG_TRACE(
'[' << sock <<
"] Receive buffer space freed, but "
5278 "free space [" << sock->bytes_blocks_str(rcv_wnd) <<
"] <= prev "
5279 "free space [" << sock->bytes_blocks_str(last_rcv_wnd) <<
"]. Nothing to do.");
5284 const size_t diff = rcv_wnd - last_rcv_wnd;
5285 const unsigned int pct = sock->opt(sock->m_opts.m_st_rcv_buf_max_size_to_advertise_percent);
5286 const size_t max_rcv_buf_size = sock->max_block_size_multiple(sock->m_opts.m_st_rcv_buf_max_size);
5287 const size_t min_inc = max_rcv_buf_size * pct / 100;
5293 "freed is [" << sock->bytes_blocks_str(diff) <<
"] since last advertisement; "
5294 "< threshold [" << pct <<
"%] x "
5295 "[" << sock->bytes_blocks_str(max_rcv_buf_size) <<
"] = "
5296 "[" << sock->bytes_blocks_str(min_inc) <<
"]. Not advertising rcv_wnd yet.");
5303 "freed is [" << sock->bytes_blocks_str(diff) <<
"] since last advertisement; "
5304 "rcv_wnd = [" << sock->bytes_blocks_str(rcv_wnd) <<
"]; "
5305 ">= threshold [" << pct <<
"%] x "
5306 "[" << sock->bytes_blocks_str(max_rcv_buf_size) <<
"] = "
5307 "[" << sock->bytes_blocks_str(min_inc) <<
"]. Sending unsolicited rcv_wnd-advertising ACK "
5308 "and entering rcv_wnd recovery.");
5311 sock->m_rcv_in_rcv_wnd_recovery =
true;
5313 sock->m_rcv_wnd_recovery_start_time = Fine_clock::now();
5316 sock->m_rcv_stats.rcv_wnd_recovery_event_start();
5323 using boost::chrono::milliseconds;
5324 using boost::chrono::round;
5325 using boost::weak_ptr;
5332 auto ack = Low_lvl_packet::create_uninit_packet<Ack_packet>(
get_logger());
5333 ack->m_rcv_wnd = rcv_wnd;
5335 sock->m_rcv_last_sent_rcv_wnd = rcv_wnd;
5340 sock->m_rcv_stats.sent_low_lvl_ack_packet(
true);
5344 const Fine_duration fire_when_from_now = sock->opt(sock->m_opts.m_dyn_rcv_wnd_recovery_timer_period);
5347 "[" << round<milliseconds>(fire_when_from_now) <<
"] from now.");
5356 sock->m_rcv_wnd_recovery_scheduled_task
5358 [
this, sock_observer = weak_ptr<Peer_socket>(sock)](
bool)
5362 auto sock = sock_observer.lock();
5369 const Fine_duration since_recovery_started = Fine_clock::now() - sock->m_rcv_wnd_recovery_start_time;
5370 if (since_recovery_started > sock->opt(sock->m_opts.m_dyn_rcv_wnd_recovery_max_period))
5375 FLOW_LOG_INFO(
'[' << sock <<
"]: still no new DATA arrived since last rcv_wnd advertisement; "
5376 "Time since entering recovery [" << round<milliseconds>(since_recovery_started) <<
"] expired. "
5377 "Ending rcv_wnd recovery.");
5378 sock->m_rcv_in_rcv_wnd_recovery =
false;
5381 sock->m_rcv_stats.rcv_wnd_recovery_event_finish(
false);
5393 FLOW_LOG_INFO(
'[' << sock <<
"]: still no new DATA arrived since last rcv_wnd advertisement; "
5394 "rcv_wnd = [" << sock->bytes_blocks_str(rcv_wnd) <<
"]; "
5395 "time since entering recovery [" << round<milliseconds>(since_recovery_started) <<
"]. "
5396 "Sending unsolicited rcv_wnd-advertising ACK and continuing rcv_wnd recovery.");
5404 using boost::chrono::milliseconds;
5405 using boost::chrono::round;
5412 if (!sock->m_rcv_in_rcv_wnd_recovery)
5420 FLOW_LOG_INFO(
'[' << sock <<
"]: Canceling rcv_wnd recovery; "
5421 "Time since entering recovery "
5422 "[" << round<milliseconds>(Fine_clock::now() - sock->m_rcv_wnd_recovery_start_time) <<
"].");
5424 sock->m_rcv_in_rcv_wnd_recovery =
false;
5426 const bool canceled =
5432 sock->m_rcv_stats.rcv_wnd_recovery_event_finish(
true);
5437 using std::numeric_limits;
5441 if (!sock->opt(sock->m_opts.m_st_rcv_flow_control_on))
5446 return numeric_limits<size_t>::max();
5451 size_t rcv_buf_size;
5454 rcv_buf_size = sock->m_rcv_buf.data_size();
5458 if (sock->rexmit_on())
5460 rcv_buf_size += sock->m_rcv_reassembly_q_data_size;
5463 const size_t max_rcv_buf_size = sock->max_block_size_multiple(sock->m_opts.m_st_rcv_buf_max_size);
5465 return (max_rcv_buf_size > rcv_buf_size) ? (max_rcv_buf_size - rcv_buf_size) : 0;
5487 "was completely closed before asynchronous "
5488 "receive_emptied_rcv_buf_while_disconnecting() could proceed.");
5506 "is gracefully closing, and Receive buffer is empty, but graceful close itself not yet finished.");
5512 if (!sock->m_rcv_buf.empty())
5517 "is gracefully closing, but Receive buffer has data again.");
5524 "is gracefully closing, and Receive buffer is now empty. Ready to permanently close.");
5534 using boost::adopt_lock;
5578 *err_code = sock->m_disconnect_cause;
5601 const Error_code& err_code,
bool defer_delta_check)
5603 using boost::lexical_cast;
5614 FLOW_LOG_INFO(
"Closing and destroying [" << sock <<
"] abruptly.");
5619 FLOW_LOG_INFO(
"Closing and destroying [" << sock <<
"] after graceful close.");
5639 sock->m_info_on_close.m_disconnect_cause = err_code;
5664 const auto erased = 1 ==
5670 if (!sock->m_active_connect)
5678 Port_to_server_map::const_iterator port_to_server_it =
m_servs.find(sock->m_local_port);
5679 if (port_to_server_it !=
m_servs.end())
5694 if (sock->m_active_connect)
5698 assert(!return_err_code);
5713 if (inserted_rd || inserted_wr)
5721 const Error_code& err_code,
bool defer_delta_check)
5732 auto syn = Low_lvl_packet::create_uninit_packet<Syn_packet>(
get_logger());
5734 syn->m_init_seq_num = sock->m_snd_init_seq_num;
5738 syn->m_serialized_metadata =
static_cast<const Blob&
>(sock->m_serialized_metadata);
5745 auto syn_ack = Low_lvl_packet::create_uninit_packet<Syn_ack_packet>(
get_logger());
5747 syn_ack->m_init_seq_num = sock->m_snd_init_seq_num;
5749 syn_ack->m_packed.m_security_token = sock->m_security_token;
5751 syn_ack->m_packed.m_rcv_wnd = sock->m_rcv_last_sent_rcv_wnd;
5757 boost::shared_ptr<const Syn_ack_packet>& syn_ack)
5760 auto syn_ack_ack = Low_lvl_packet::create_uninit_packet<Syn_ack_ack_packet>(
get_logger());
5763 syn_ack_ack->m_packed.m_security_token = syn_ack->m_packed.m_security_token;
5765 syn_ack_ack->m_packed.m_rcv_wnd = sock->m_rcv_last_sent_rcv_wnd =
sock_rcv_wnd(sock);
5773 using boost::chrono::milliseconds;
5774 using boost::chrono::duration_cast;
5775 using std::make_pair;
5777 using std::numeric_limits;
5784 vector<Peer_socket::Individual_ack::Ptr>& pending_acks = sock->m_rcv_pending_acks;
5786 if (sys_err_code == boost::asio::error::operation_aborted)
5789 "pending acknowledgment count [" << pending_acks.size() <<
"].");
5794 FLOW_LOG_TRACE(
"Delayed [ACK] timer [" << sock <<
"] triggered, or ACK forced; "
5795 "pending acknowledgment count [" << pending_acks.size() <<
"].");
5808 FLOW_LOG_TRACE(
"Delayed [ACK] timer [" << sock <<
"] triggered, "
5809 "but socket already in inapplicable state [" << sock->m_int_state <<
"]. Ignoring.");
5814 if (pending_acks.empty())
5819 "but socket has no pending acknowledgments. This is likely an internal bug. Ignoring.");
5846 const size_t& rcv_wnd = sock->m_rcv_last_sent_rcv_wnd =
sock_rcv_wnd(sock);
5848 auto ack = Low_lvl_packet::create_uninit_packet<Ack_packet>(
get_logger());
5849 ack->m_rcv_wnd = rcv_wnd;
5854 if (sock->rexmit_on())
5870 sock->m_rcv_stats.sent_low_lvl_ack_packet(
false);
5873 ack = Low_lvl_packet::create_uninit_packet<Ack_packet>(
get_logger());
5874 ack->m_rcv_wnd = rcv_wnd;
5902 if (delay.count() < 0)
5907 "delay for packet [" << seq_num <<
", ...) is "
5908 "negative: [" << delay <<
"]; using zero.");
5909 delay = Fine_duration::zero();
5924 if (uint64_t(pkt_delay.count()) > uint64_t(MAX_DELAY_VALUE))
5929 "delay for packet [" << seq_num <<
", ...) is [" << pkt_delay <<
"]; overflow; "
5930 "using max value [" << MAX_DELAY_VALUE <<
"] units.");
5936 if (sock->rexmit_on())
5938 ack->m_rcv_acked_packets_rexmit_on_out.push_back
5940 ind_ack->m_rexmit_id,
5945 ack->m_rcv_acked_packets_rexmit_off_out.push_back
5949 size_est_so_far += size_est_inc;
5952 sock->m_rcv_stats.sent_individual_ack();
5956 if (size_est_so_far != 0)
5962 sock->m_rcv_stats.sent_low_lvl_ack_packet(
false);
5965 pending_acks.clear();
5968 sock->m_rcv_stats.current_pending_to_ack_packets(0);
5976 return Socket_id{ sock->remote_endpoint(), sock->local_port() };
5982 return !(sock->m_snd_rexmit_q.empty() && sock->m_snd_buf.empty());
5998 return sock->m_snd_buf.data_size() + sock->max_block_size()
5999 <= sock->opt(sock->m_opts.m_st_snd_buf_max_size);
6005 return !sock->m_rcv_buf.empty();
6013 sock->m_int_state <<
"] to [" << new_state <<
"].");
6014 sock->m_int_state = new_state;
6023 sock->m_state = state;
6026 sock->m_open_sub_state = open_sub_state;
6043 sock->m_disconnect_cause = disconnect_cause;
6065 assert(sock->m_disconnect_cause);
6075 sock->m_rcv_buf.clear();
6076 sock->m_snd_buf.clear();
6077 sock->m_rcv_packets_with_gaps.clear();
6078 sock->m_rcv_reassembly_q_data_size = 0;
6079 sock->m_snd_flying_pkts_by_sent_when.clear();
6080 sock->m_snd_flying_pkts_by_seq_num.clear();
6081 sock->m_snd_rexmit_q.clear();
6082 sock->m_serialized_metadata.make_zero();
6083 sock->m_rcv_syn_rcvd_data_q.clear();
6084 sock->m_rcv_pending_acks.clear();
6085 sock->m_rcv_acked_packets.clear();
6086 sock->m_snd_pacing_data.m_packet_q.clear();
6091 sock->m_snd_cong_ctl.reset();
6093 sock->m_snd_bandwidth_estimator.reset();
6126 sock->m_opts = opts;
6136#define VALIDATE_STATIC_OPTION(ARG_opt) \
6137 validate_static_option(opts.ARG_opt, prev_opts->ARG_opt, #ARG_opt, err_code)
6138#define VALIDATE_CHECK(ARG_check) \
6139 validate_option_check(ARG_check, #ARG_check, err_code)
6161 using boost::chrono::seconds;
6162 using std::numeric_limits;
6173 const bool static_ok
6174 = VALIDATE_STATIC_OPTION(m_st_max_block_size) &&
6175 VALIDATE_STATIC_OPTION(m_st_connect_retransmit_period) &&
6176 VALIDATE_STATIC_OPTION(m_st_connect_retransmit_timeout) &&
6177 VALIDATE_STATIC_OPTION(m_st_snd_buf_max_size) &&
6178 VALIDATE_STATIC_OPTION(m_st_rcv_buf_max_size) &&
6179 VALIDATE_STATIC_OPTION(m_st_rcv_flow_control_on) &&
6180 VALIDATE_STATIC_OPTION(m_st_rcv_buf_max_size_slack_percent) &&
6181 VALIDATE_STATIC_OPTION(m_st_rcv_buf_max_size_to_advertise_percent) &&
6182 VALIDATE_STATIC_OPTION(m_st_rcv_max_packets_after_unrecvd_packet_ratio_percent) &&
6183 VALIDATE_STATIC_OPTION(m_st_delayed_ack_timer_period) &&
6184 VALIDATE_STATIC_OPTION(m_st_max_full_blocks_before_ack_send) &&
6185 VALIDATE_STATIC_OPTION(m_st_rexmit_on) &&
6186 VALIDATE_STATIC_OPTION(m_st_max_rexmissions_per_packet) &&
6187 VALIDATE_STATIC_OPTION(m_st_init_drop_timeout) &&
6188 VALIDATE_STATIC_OPTION(m_st_snd_pacing_enabled) &&
6189 VALIDATE_STATIC_OPTION(m_st_snd_bandwidth_est_sample_period_floor) &&
6190 VALIDATE_STATIC_OPTION(m_st_cong_ctl_strategy) &&
6191 VALIDATE_STATIC_OPTION(m_st_cong_ctl_init_cong_wnd_blocks) &&
6192 VALIDATE_STATIC_OPTION(m_st_cong_ctl_max_cong_wnd_blocks) &&
6193 VALIDATE_STATIC_OPTION(m_st_cong_ctl_cong_wnd_on_drop_timeout_blocks) &&
6194 VALIDATE_STATIC_OPTION(m_st_cong_ctl_classic_wnd_decay_percent) &&
6195 VALIDATE_STATIC_OPTION(m_st_drop_packet_exactly_after_drop_timeout) &&
6196 VALIDATE_STATIC_OPTION(m_st_drop_all_on_drop_timeout) &&
6197 VALIDATE_STATIC_OPTION(m_st_out_of_order_ack_restarts_drop_timer);
6208 const bool checks_ok
6239#undef VALIDATE_CHECK
6240#undef VALIDATE_STATIC_OPTION
6247 using boost::adopt_lock;
6289 using boost::lexical_cast;
6294 stats->
m_rcv = sock->m_rcv_stats.stats();
6295 stats->
m_snd = sock->m_snd_stats.stats();
6317 = sock->m_rcv_syn_rcvd_data_q.empty() ? 0 : sock->m_rcv_syn_rcvd_data_cumulative_size;
6333 = util::to_mbit_per_sec<Send_bandwidth_estimator::Time_unit>
6334 (sock->m_snd_bandwidth_estimator->bandwidth_bytes_per_time());
6351 FLOW_LOG_INFO(
"[=== Socket state for [" << sock <<
"]. ===\n" << stats);
6358 FLOW_LOG_INFO(
"=== Socket state for [" << sock <<
"]. ===]");
6380 *seq_num += data_size;
6383template<
typename Packet_map_iter>
6390 *seq_num_start = seq_num_start_cref;
6394 *seq_num_end = seq_num_start_cref;
6402 return ++sock->m_snd_last_order_num;
6408 return sock_create_forward_plus_ctor_args<Peer_socket>(opts);
6418 <<
"NetFlow_socket "
6420 "@" <<
static_cast<const void*
>(sock))
6421 : (os <<
"NetFlow_socket@null");
6429#define STATE_TO_CASE_STATEMENT(ARG_state) \
6430 case Peer_socket::Int_state::S_##ARG_state: \
6431 return os << #ARG_state
6440 STATE_TO_CASE_STATEMENT(CLOSED);
6441 STATE_TO_CASE_STATEMENT(SYN_SENT);
6442 STATE_TO_CASE_STATEMENT(SYN_RCVD);
6443 STATE_TO_CASE_STATEMENT(ESTABLISHED);
6446#undef STATE_TO_CASE_STATEMENT
const Component & get_log_component() const
Returns reference to the stored Component object, particularly as many FLOW_LOG_*() macros expect.
Logger * get_logger() const
Returns the stored Logger pointer, particularly as many FLOW_LOG_*() macros expect.
Interface that the user should implement, passing the implementing Logger into logging classes (Flow'...
static Congestion_control_strategy * create_strategy(Strategy_choice strategy_choice, log::Logger *logger_ptr, Peer_socket::Const_ptr sock)
Factory method that, given an enum identifying the desired strategy, allocates the appropriate Conges...
static Ptr create_drop_timer(log::Logger *logger_ptr, util::Task_engine *node_task_engine, Fine_duration *sock_drop_timeout, Peer_socket::Const_ptr &&sock, const Function< void(const Error_code &err_code)> &timer_failure, const Function< void(bool drop_all_packets)> &timer_fired)
Constructs Drop_timer and returns a ref-counted pointer wrapping it.
@ S_PEER_SOCKET_WRITABLE
Event type specifying the condition of interest wherein a target Peer_socket sock is such that callin...
@ S_PEER_SOCKET_READABLE
Event type specifying the condition of interest wherein a target Peer_socket sock is such that callin...
An object of this class is a single Flow-protocol networking node, in the sense that: (1) it has a di...
void snd_flying_pkts_updated(Peer_socket::Ptr sock, Peer_socket::Sent_pkt_ordered_by_when_const_iter pkt_begin, const Peer_socket::Sent_pkt_ordered_by_when_const_iter &pkt_end, bool added)
Updates Peer_socket::m_snd_flying_bytes according to an operation (add packets, remove packets) calle...
bool categorize_individual_ack(const Socket_id &socket_id, Peer_socket::Ptr sock, Ack_packet::Individual_ack::Const_ptr ack, bool *dupe_or_late, Peer_socket::Sent_pkt_ordered_by_when_iter *acked_pkt_it)
Helper of perform_accumulated_on_recv_tasks() that categorizes the given accumulated individual ackno...
void handle_data_to_established(const Socket_id &socket_id, Peer_socket::Ptr sock, boost::shared_ptr< Data_packet > packet, bool syn_rcvd_qd_packet)
Handles a just-deserialized, just-demultiplexed, low-level DATA packet delivered to the given peer so...
bool sock_is_writable(const boost::any &sock_as_any) const
Returns true if and only if calling sock->send() with at least some arguments would return either non...
Peer_socket_info sock_info(Peer_socket::Const_ptr sock)
Implementation of sock->info() for socket sock in all cases except when sock->state() == Peer_socket:...
void receive_wnd_updated(Peer_socket::Ptr sock)
Placed by receive() onto W if it has dequeued data from Receive buffer and given it to the user,...
void sock_track_new_data_after_gap_rexmit_off(Peer_socket::Ptr sock, boost::shared_ptr< const Data_packet > packet, size_t data_size, bool *slide, size_t *slide_size)
Helper for handle_data_to_established() that aims to register the given DATA packet as an out-of-orde...
bool sock_data_to_reassembly_q_unless_overflow(Peer_socket::Ptr sock, boost::shared_ptr< Data_packet > packet)
Helper for handle_data_to_established() that aims to register the given DATA packet as an out-of-orde...
static bool ensure_sock_open(Socket_ptr sock, Error_code *err_code)
Helper method that checks whether the given Peer_socket or Server_socket is CLOSED; if so,...
void send_worker(Peer_socket::Ptr sock, bool defer_delta_check)
Thread W implemention of send(): synchronously or asynchronously send the contents of sock->m_snd_buf...
void handle_accumulated_acks(const Socket_id &socket_id, Peer_socket::Ptr sock)
Helper of perform_accumulated_on_recv_tasks() that handles any incoming acknowledgments and rcv_wnd u...
void async_rcv_wnd_recovery(Peer_socket::Ptr sock, size_t rcv_wnd)
receive_wnd_updated() helper that continues rcv_wnd recovery: that is, sends unsolicited ACK with a r...
void log_accumulated_acks(Peer_socket::Const_ptr sock) const
Helper of handle_accumulated_acks() that logs the about-to-be-handled accumulated individual acknowle...
void sock_free_memory(Peer_socket::Ptr sock)
Helper that clears all non-O(1)-space data structures stored inside sock.
void sock_load_info_struct(Peer_socket::Const_ptr sock, Peer_socket_info *stats) const
Given a Peer_socket, copies all stats info (as available via Peer_socket::info()) from various struct...
void log_snd_window(Peer_socket::Const_ptr sock, bool force_verbose_info_logging=false) const
Logs TRACE or DATA messages thats show the detailed state of the sending sequence number space.
void send_worker_check_state(Peer_socket::Ptr sock)
Helper placed by send() onto W to invoke send_worker() but ensures that the socket has not entered so...
size_t m_low_lvl_max_buf_size
OS-reported m_low_lvl_sock UDP receive buffer maximum size, obtained right after we OS-set that setti...
Non_blocking_func_ret_type sync_op(typename Socket::Ptr sock, const Function< Non_blocking_func_ret_type()> &non_blocking_func, Non_blocking_func_ret_type would_block_ret_val, Event_set::Event_type ev_type, const Fine_time_pt &wait_until, Error_code *err_code)
Implementation of core blocking transfer methods, namely Peer_socket::sync_send(),...
size_t sock_max_packets_after_unrecvd_packet(Peer_socket::Const_ptr sock) const
Computes and returns the max size for Peer_socket::m_rcv_packets_with_gaps for sock.
Peer_socket::Sent_pkt_ordered_by_when_iter categorize_pkts_as_dropped_on_acks(Peer_socket::Ptr sock, const boost::unordered_set< Peer_socket::order_num_t > &flying_now_acked_pkts)
Helper of perform_accumulated_on_recv_tasks() that determines the range of In-flight packets that sho...
void rcv_get_first_gap_info(Peer_socket::Const_ptr sock, bool *first_gap_exists, Sequence_number *seq_num_after_first_gap)
Helper for handle_data_to_established() that gets simple info about Peer_socket::m_rcv_packets_with_g...
bool snd_deqable(Peer_socket::Const_ptr sock) const
Return true if and only if there are enough data either in Peer_socket::m_snd_rexmit_q of sock (if re...
void cancel_timers(Peer_socket::Ptr sock)
Cancel any timers and scheduled tasks active in the given socket.
void sock_rcv_buf_now_readable(Peer_socket::Ptr sock, bool syn_rcvd_qd_packet)
Helper for handle_data_to_established() that assumes the given's socket Receive buffer is currently r...
void snd_flying_pkts_erase_one(Peer_socket::Ptr sock, Peer_socket::Sent_pkt_ordered_by_when_iter pkt_it)
Erases (for example if considered Acknowledged or Dropped) a packet struct from the "scoreboard" (Pee...
Opt_type opt(const Opt_type &opt_val_ref) const
Obtain a copy of the value of a given option in a thread-safe manner.
bool sock_validate_options(const Peer_socket_options &opts, const Peer_socket_options *prev_opts, Error_code *err_code) const
Analogous to validate_options() but checks per-socket options instead of per-Node options.
void handle_accumulated_pending_acks(const Socket_id &socket_id, Peer_socket::Ptr sock)
Helper of perform_accumulated_on_recv_tasks() that handles any additional individual outgoing acknowl...
void receive_wnd_recovery_data_received(Peer_socket::Ptr sock)
Pertaining to the async_rcv_wnd_recovery() mechanism, this handles the event that we have received an...
static Peer_socket::order_num_t sock_get_new_snd_order_num(Peer_socket::Ptr sock)
Returns the "order number" to use for Peer_socket::Sent_packet::Sent_when structure corresponding to ...
Peer_socket::Ptr sync_connect_impl(const Remote_endpoint &to, const Fine_duration &max_wait, const boost::asio::const_buffer &serialized_metadata, Error_code *err_code, const Peer_socket_options *opts)
Implementation core of sync_connect*() that gets rid of templated or missing arguments thereof.
size_t max_block_size() const
The maximum number of bytes of user data per received or sent block on connections generated from thi...
void snd_flying_pkts_push_one(Peer_socket::Ptr sock, const Sequence_number &seq_num, Peer_socket::Sent_packet::Ptr sent_pkt)
Adds a new packet struct (presumably representing packet to be sent shortly) to the "scoreboard" (Pee...
Syn_packet::Ptr create_syn(Peer_socket::Const_ptr sock)
Helper that creates a new SYN packet object to the extent that is suitable for immediately passing to...
void close_abruptly(Peer_socket::Ptr sock, Error_code *err_code)
Implementation of non-blocking sock->close_abruptly() for socket sock in all cases except when sock->...
void async_low_lvl_ack_send(Peer_socket::Ptr sock, const Error_code &sys_err_code=Error_code())
Sends a low-level ACK packet, with all accumulated in Peer_socket::m_rcv_pending_acks of sock individ...
static void get_seq_num_range(const Packet_map_iter &packet_it, Sequence_number *seq_num_start, Sequence_number *seq_num_end)
Given an iterator into a Peer_socket::Sent_pkt_by_sent_when_map or Peer_socket::Recv_pkt_map,...
Peer_socket::Ptr sync_connect_with_metadata(const Remote_endpoint &to, const boost::chrono::duration< Rep, Period > &max_wait, const boost::asio::const_buffer &serialized_metadata, Error_code *err_code=0, const Peer_socket_options *opts=0)
A combination of sync_connect() and connect_with_metadata() (blocking connect, with supplied metadata...
Syn_ack_packet::Ptr create_syn_ack(Peer_socket::Const_ptr sock)
Like create_syn() but for SYN_ACK.
virtual Peer_socket * sock_create(const Peer_socket_options &opts)
Internal factory used for ALL Peer_socket objects created by this Node (including subclasses).
bool snd_buf_enqable(Peer_socket::Const_ptr sock) const
Return true if and only if there is enough free space in Peer_socket::m_snd_buf of sock to enqueue an...
bool can_send(Peer_socket::Const_ptr sock) const
Answers the perennial question of congestion and flow control: assuming there is a DATA packet to sen...
void sock_slide_rcv_next_seq_num(Peer_socket::Ptr sock, size_t slide_size, bool reassembly_in_progress)
Helper for handle_data_to_established() that aims to register a set of received DATA packet data as i...
void sock_log_detail(Peer_socket::Const_ptr sock) const
Logs a verbose state report for the given socket.
static void advance_seq_num(Sequence_number *seq_num, boost::shared_ptr< const Data_packet > data)
Assuming *seq_num points to the start of data.m_data, increments *seq_num to point to the datum just ...
static Sequence_number snd_past_last_flying_datum_seq_num(Peer_socket::Const_ptr sock)
Obtain the sequence number for the datum just past the last (latest) In-flight (i....
Peer_socket::Ptr connect(const Remote_endpoint &to, Error_code *err_code=0, const Peer_socket_options *opts=0)
Initiates an active connect to the specified remote Flow server.
void event_set_all_check_delta(bool defer_delta_check)
For each WAITING Event_set within the Node: checks for any events that hold, and if any do hold,...
void serv_peer_socket_closed(Server_socket::Ptr serv, Peer_socket::Ptr sock)
Records that a Server_socket-contained (i.e., currently un-established, or established but not yet ac...
bool rcv_buf_deqable(Peer_socket::Const_ptr sock) const
Return true if and only if there are enough data in Peer_socket::m_rcv_buf of sock to give the user s...
void async_acknowledge_packet(Peer_socket::Ptr sock, const Sequence_number &seq_num, unsigned int rexmit_id, size_t data_size)
Causes an acknowledgment of the given received packet to be included in a future Ack_packet sent to t...
Socket_id_to_socket_map m_socks
The peer-to-peer connections this Node is currently tracking.
Peer_socket::Options_lock Options_lock
Short-hand for lock that acquires exclusive access to an Options_mutex.
static Socket_id socket_id(Peer_socket::Const_ptr sock)
Constructs the socket pair (connection ID) for the given socket.
void handle_syn_ack_to_syn_sent(const Socket_id &socket_id, Peer_socket::Ptr sock, boost::shared_ptr< const Syn_ack_packet > syn_ack)
Handles a just-deserialized, just-demultiplexed low-level SYN_ACK packet delivered to the given peer ...
size_t send(Peer_socket::Ptr sock, const Function< size_t(size_t max_data_size)> &snd_buf_feed_func, Error_code *err_code)
Implementation of non-blocking sock->send() for socket sock in all cases except when sock->state() ==...
void sock_set_int_state(Peer_socket::Ptr sock, Peer_socket::Int_state new_state)
Sets internal state of given socket to the given state and logs a TRACE message about it.
bool sock_is_readable(const boost::any &sock_as_any) const
Returns true if and only if calling sock->receive() with at least some arguments would return either ...
bool sock_data_to_rcv_buf_unless_overflow(Peer_socket::Ptr sock, boost::shared_ptr< Data_packet > packet)
Helper for handle_data_to_established() that aims to pass the payload of the given DATA packet to the...
bool sock_set_options(Peer_socket::Ptr sock, const Peer_socket_options &opts, Error_code *err_code)
Thread W implementation of sock->set_options().
bool running() const
Returns true if and only if the Node is operating.
Port_to_server_map m_servs
The server sockets this Node is currently tracking.
Event_set::Ev_type_to_socks_map m_sock_events
All sockets that have been detected to be "ready" (by the Event_set doc header definition) at any poi...
static const uint8_t S_DEFAULT_CONN_METADATA
Type and value to supply as user-supplied metadata in SYN, if user chooses to use [[a]sync_]connect()...
void setup_drop_timer(const Socket_id &socket_id, Peer_socket::Ptr sock)
Creates a new Drop Timer and saves it to sock->m_snd_drop_timer.
void handle_ack_to_established(Peer_socket::Ptr sock, boost::shared_ptr< const Ack_packet > ack)
Handles a just-deserialized, just-demultiplexed, low-level ACK packet delivered to the given peer soc...
Peer_socket::Ptr sync_connect(const Remote_endpoint &to, const boost::chrono::duration< Rep, Period > &max_wait, Error_code *err_code=0, const Peer_socket_options *opts=0)
The blocking (synchronous) version of connect().
void handle_syn_ack_to_established(Peer_socket::Ptr sock, boost::shared_ptr< const Syn_ack_packet > syn_ack)
Handles a just-deserialized, just-demultiplexed, duplicate (equal to already-received SYN_ACK) low-le...
void setup_connection_timers(const Socket_id &socket_id, Peer_socket::Ptr sock, bool initial)
Assuming we've just sent SYN or SYN_ACK, sets up an asynchronous scheduled task to fire within some a...
void log_rcv_window(Peer_socket::Const_ptr sock, bool force_verbose_info_logging=false) const
Logs TRACE or DATA messages that show the detailed state of the receiving sequence number space.
size_t sock_rcv_wnd(Peer_socket::Const_ptr sock) const
Computes and returns the currently correct rcv_wnd value; that is the amount of space free in Receive...
void connect_worker(const Remote_endpoint &to, const boost::asio::const_buffer &serialized_metadata, const Peer_socket_options *opts, Peer_socket::Ptr *sock)
Thread W implementation of connect().
bool drop_pkts_on_acks(Peer_socket::Ptr sock, const Peer_socket::Sent_pkt_ordered_by_when_iter &last_dropped_pkt_it, size_t *cong_ctl_dropped_pkts, size_t *cong_ctl_dropped_bytes, size_t *dropped_pkts, size_t *dropped_bytes, std::vector< Peer_socket::order_num_t > *pkts_marked_to_drop)
Helper of perform_accumulated_on_recv_tasks() that acts on the determination made by categorize_pkts_...
static const Peer_socket::Sent_packet::ack_count_t S_MAX_LATER_ACKS_BEFORE_CONSIDERING_DROPPED
For a given unacknowledged sent packet P, the maximum number of times any individual packet with high...
Error_code sock_categorize_data_to_established(Peer_socket::Ptr sock, boost::shared_ptr< const Data_packet > packet, bool *dupe, bool *slide, size_t *slide_size)
Helper for handle_data_to_established() that categorizes the DATA packet received as either illegal; ...
void async_sock_low_lvl_rst_send(Peer_socket::Ptr sock)
Sends an RST to the other side of the given socket asynchronously when possible.
void sock_set_state(Peer_socket::Ptr sock, Peer_socket::State state, Peer_socket::Open_sub_state open_sub_state=Peer_socket::Open_sub_state::S_CONNECTED)
Sets Peer_socket::m_state and Peer_socket::m_open_sub_state.
void receive_emptied_rcv_buf_while_disconnecting(Peer_socket::Ptr sock)
Placed by receive() onto W during a graceful close, after the Receive buffer had been emptied by the ...
void sock_disconnect_detected(Peer_socket::Ptr sock, const Error_code &disconnect_cause, bool close)
Records that thread W shows underlying connection is broken (graceful termination,...
size_t receive(Peer_socket::Ptr sock, const Function< size_t()> &rcv_buf_consume_func, Error_code *err_code)
Implementation of non-blocking sock->receive() for socket sock in all cases except when sock->state()...
void handle_connection_rexmit_timer_event(const Socket_id &socket_id, Peer_socket::Ptr sock)
Handles the triggering of the retransmit timer wait set up by setup_connection_timers(); it will re-s...
Node_options m_opts
This Node's global set of options.
void close_connection_immediately(const Socket_id &socket_id, Peer_socket::Ptr sock, const Error_code &err_code, bool defer_delta_check)
A thread W method that handles the transition of the given socket from OPEN (any sub-state) to CLOSED...
void sock_disconnect_completed(Peer_socket::Ptr sock)
While in S_OPEN+S_DISCONNECTING state (i.e., after beginning a graceful close with sock_disconnect_de...
Fine_duration compute_rtt_on_ack(Peer_socket::Sent_packet::Const_ptr flying_pkt, const Fine_time_pt &time_now, Ack_packet::Individual_ack::Const_ptr ack, const Peer_socket::Sent_packet::Sent_when **sent_when) const
Helper of perform_accumulated_on_recv_tasks() that computes the RTT implied by a given individual ack...
void async_low_lvl_syn_ack_ack_send(const Peer_socket::Ptr &sock, boost::shared_ptr< const Syn_ack_packet > &syn_ack)
Helper to create, fully fill out, and asynchronously send via async_sock_low_lvl_packet_send_paced() ...
Peer_socket::Ptr connect_with_metadata(const Remote_endpoint &to, const boost::asio::const_buffer &serialized_metadata, Error_code *err_code=0, const Peer_socket_options *opts=0)
Same as connect() but sends, as part of the connection handshake, the user-supplied metadata,...
void new_round_trip_time_sample(Peer_socket::Ptr sock, Fine_duration round_trip_time)
Handles a just-computed new RTT (round trip time) measurement for an individual packet earlier sent: ...
void async_sock_low_lvl_packet_send_paced(const Peer_socket::Ptr &sock, Low_lvl_packet::Ptr &&packet)
Begins the process of asynchronously sending the given low-level packet to the remote Node specified ...
bool ok_to_rexmit_or_close(Peer_socket::Ptr sock, const Peer_socket::Sent_pkt_ordered_by_when_iter &pkt_it, bool defer_delta_check)
Checks whether the given sent packet has been retransmitted the maximum number of allowed times; if s...
util::Task_engine m_task_engine
The main loop engine, functioning in the single-threaded-but-asynchronous callback-based "reactor" st...
Port_space m_ports
Flow port space for both client and server sockets. All threads may access this.
void rst_and_close_connection_immediately(const Socket_id &socket_id, Peer_socket::Ptr sock, const Error_code &err_code, bool defer_delta_check)
Asynchronously send RST to the other side of the given socket and close_connection_immediately().
void drop_timer_action(Peer_socket::Ptr sock, bool drop_all_packets)
Handles a Drop_timer (Peer_socket::m_snd_drop_timer) event in ESTABLISHED state by dropping the speci...
A class that keeps a Peer_socket_receive_stats data store, includes methods to conveniently accumulat...
void good_data_accepted_packet(size_t data)
Indicates good_data_packet(), and these data are not dropped (so either delivered into Receive buffer...
void good_data_dropped_reassembly_q_overflow_packet(size_t data)
Indicates good_data_packet(), but these data are dropped due to insufficient Receive reassembly queue...
void presumed_dropped_data(size_t data)
Indicates that one or more unreceived data packets have been considered Dropped due to the number of ...
void good_data_delivered_packet(size_t data)
Indicates good_data_accepted_packet(), and these data are delivered into Receive buffer (either immed...
void late_or_dupe_to_send_ack_packet(size_t data)
Indicates that late_or_dupe_data_packet() and therefore an individual acknowledgment for this packet ...
void total_data_packet(size_t data)
Indicates one DATA packet has been received on socket.
void good_to_send_ack_packet(size_t data)
Indicates that good_data_delivered_packet() and therefore an individual acknowledgment for this packe...
void good_data_packet(size_t data)
Indicates total_data_packet(), and these data are new and acceptable into Receive buffer assuming the...
void error_data_packet(size_t data)
Indicates total_data_packet(), but there is some error about the sequence numbers so that they are no...
void buffer_fed(size_t size)
Indicates the Receive buffer was enqueued with data from network (so its data_size() increased).
void good_data_first_qd_packet(size_t data)
Indicates good_data_accepted_packet(), and these data are, upon receipt, queued for reassembly (not i...
void good_data_dropped_buf_overflow_packet(size_t data)
Indicates good_data_packet(), but these data are dropped due to insufficient Receive buffer space.
void late_or_dupe_data_packet(size_t data)
Indicates total_data_packet(), but the arrived data have either already been received before or (more...
A peer (non-server) socket operating over the Flow network protocol, with optional stream-of-bytes an...
State m_state
See state().
size_t get_connect_metadata(const boost::asio::mutable_buffer &buffer, Error_code *err_code=0) const
Obtains the serialized connect metadata, as supplied by the user during the connection handshake.
size_t max_block_size_multiple(const size_t &opt_val_ref, const unsigned int *inflate_pct_val_ptr=0) const
Returns the smallest multiple of max_block_size() that is >= the given option value,...
bool sync_send_reactor_pattern_impl(const Fine_time_pt &wait_until, Error_code *err_code)
Helper similar to sync_send_impl() but for the nullptr_t versions of sync_send().
std::map< Sequence_number, Sent_pkt_ordered_by_when_iter > Sent_pkt_by_seq_num_map
Short-hand for m_snd_flying_pkts_by_seq_num type; see that data member.
bool sync_receive_reactor_pattern_impl(const Fine_time_pt &wait_until, Error_code *err_code)
Helper similar to sync_receive_impl() but for the nullptr_t versions of sync_receive().
Remote_endpoint m_remote_endpoint
See remote_endpoint(). Should be set before user gets access to *this and not changed afterwards.
size_t sync_receive(const Mutable_buffer_sequence &target, const boost::chrono::duration< Rep, Period > &max_wait, Error_code *err_code=0)
Blocking (synchronous) version of receive().
util::Blob m_serialized_metadata
If !m_active_connect, this contains the serialized metadata that the user supplied on the other side ...
size_t node_sync_send(const Function< size_t(size_t max_data_size)> &snd_buf_feed_func_or_empty, const Fine_time_pt &wait_until, Error_code *err_code)
This is to sync_send() as node_send() is to send().
Error_code m_disconnect_cause
The Error_code causing disconnection (if one has occurred or is occurring) on this socket; otherwise ...
Peer_socket(log::Logger *logger_ptr, util::Task_engine *task_engine, const Peer_socket_options &opts)
Constructs object; initializes most values to well-defined (0, empty, etc.) but not necessarily meani...
Sequence_number m_rcv_init_seq_num
The Initial Sequence Number (ISN) contained in the original Syn_packet or Syn_ack_packet we received.
const Remote_endpoint & remote_endpoint() const
Intended other side of the connection (regardless of success, failure, or current State).
State
State of a Peer_socket.
@ S_OPEN
Future reads or writes may be possible. A socket in this state may be Writable or Readable.
@ S_CLOSED
Neither future reads nor writes are possible, AND Node has disowned the Peer_socket.
Open_sub_state
The sub-state of a Peer_socket when state is State::S_OPEN.
@ S_CONNECTED
This Peer_socket was created through a passive connect (Node::accept() and the like) or an active con...
@ S_CONNECTING
This Peer_socket was created through an active connect (Node::connect() and the like),...
@ S_DISCONNECTING
This Peer_socket was created through a passive connect (Node::accept() and the like) or an active con...
size_t sync_send(const Const_buffer_sequence &data, const boost::chrono::duration< Rep, Period > &max_wait, Error_code *err_code=0)
Blocking (synchronous) version of send().
~Peer_socket() override
Boring virtual destructor. Note that deletion is to be handled exclusively via shared_ptr,...
Error_code disconnect_cause() const
The error code that perviously caused state() to become State::S_CLOSED, or success code if state is ...
Sequence_number::seq_num_t order_num_t
Short-hand for order number type. 0 is reserved. Caution: Keep in sync with Drop_timer::packet_id_t.
flow_port_t local_port() const
The local Flow-protocol port chosen by the Node (if active or passive open) or user (if passive open)...
flow_port_t m_local_port
See local_port(). Should be set before user gets access to *this and not changed afterwards.
friend class Send_bandwidth_estimator
Stats modules have const access to all socket internals.
bool set_options(const Peer_socket_options &opts, Error_code *err_code=0)
Dynamically replaces the current options set (options()) with the given options set.
size_t node_send(const Function< size_t(size_t max_data_size)> &snd_buf_feed_func, Error_code *err_code)
Non-template helper for template send() that forwards the send() logic to Node::send().
bool rexmit_on() const
Whether retransmission is enabled on this connection.
size_t node_sync_receive(const Function< size_t()> &rcv_buf_consume_func_or_empty, const Fine_time_pt &wait_until, Error_code *err_code)
This is to sync_receive() as node_receive() is to receive().
util::Lock_guard< Mutex > Lock_guard
Short-hand for RAII lock guard of Mutex.
Int_state
The state of the socket (and the connection from this end's point of view) for the internal state mac...
@ S_ESTABLISHED
Public state is OPEN+CONNECTED; in our opinion the connection is established.
@ S_SYN_SENT
Public state is OPEN+CONNECTING; user requested active connect; we sent SYN and are awaiting response...
@ S_CLOSED
Closed (dead or new) socket.
@ S_SYN_RCVD
Public state is OPEN+CONNECTING; other side requested passive connect via SYN; we sent SYN_ACK and ar...
util::Lock_guard< Options_mutex > Options_lock
Short-hand for lock that acquires exclusive access to an Options_mutex.
void close_abruptly(Error_code *err_code=0)
Acts as if fatal error error::Code::S_USER_CLOSED_ABRUPTLY has been discovered on the connection.
size_t max_block_size() const
The maximum number of bytes of user data per received or sent packet on this connection.
Node * node() const
Node that produced this Peer_socket.
Peer_socket_info info() const
Returns a structure containing the most up-to-date stats about this connection.
Recvd_pkt_map::iterator Recvd_pkt_iter
Short-hand for m_rcv_packets_with_gaps iterator type.
Mutex m_mutex
This object's mutex.
Sent_pkt_by_sent_when_map::iterator Sent_pkt_ordered_by_when_iter
Short-hand for m_snd_flying_pkts_by_sent_when iterator type.
Sent_pkt_by_seq_num_map::const_iterator Sent_pkt_ordered_by_seq_const_iter
Short-hand for m_snd_flying_pkts_by_seq_num const iterator type.
Peer_socket_info m_info_on_close
This is the final set of stats collected at the time the socket was moved to S_CLOSED m_state.
bool ensure_open(Error_code *err_code) const
Helper that is equivalent to Node::ensure_sock_open(this, err_code).
Sent_pkt_by_sent_when_map::const_iterator Sent_pkt_ordered_by_when_const_iter
Short-hand for m_snd_flying_pkts_by_sent_when const iterator type.
Opt_type opt(const Opt_type &opt_val_ref) const
Analogous to Node::opt() but for per-socket options.
std::string bytes_blocks_str(size_t bytes) const
Helper that, given a byte count, returns a string with that byte count and the number of max_block_si...
Peer_socket_options m_opts
This socket's per-socket set of options.
Peer_socket_options options() const
Copies this socket's option set and returns that copy.
Options_mutex m_opts_mutex
The mutex protecting m_opts.
std::map< Sequence_number, boost::shared_ptr< Received_packet > > Recvd_pkt_map
Short-hand for m_rcv_packets_with_gaps type; see that data member.
Recvd_pkt_map::const_iterator Recvd_pkt_const_iter
Short-hand for m_rcv_packets_with_gaps const iterator type.
Open_sub_state m_open_sub_state
See state().
size_t node_receive(const Function< size_t()> &rcv_buf_consume_func, Error_code *err_code)
Non-template helper for template receive() that forwards the receive() logic to Node::receive().
State state(Open_sub_state *open_sub_state=0) const
Current State of the socket.
void return_port(flow_port_t port, Error_code *err_code)
Return a previously reserved port (of any type).
An internal net_flow sequence number identifying a piece of data.
void set_metadata(char num_line_id=0, const Sequence_number &zero_point=Sequence_number(), seq_num_delta_t multiple_size=0)
Updates the full set of metadata (used at least for convenient convention-based logging but not actua...
uint64_t seq_num_t
Raw sequence number type.
Internal net_flow class that implements a socket buffer, as used by Peer_socket for Send and Receive ...
void consume_buf_move(util::Blob *target_buf, size_t max_data_size)
Consumes (removes from the front of the internal byte buffer and returns them to the caller) a byte s...
size_t data_size() const
The total number of bytes of application-layer data stored in this object.
Properties of various container types.
typename Value_list::const_reverse_iterator Const_reverse_iterator
Type for reverse iterator pointing into an immutable structure of this type.
typename Value_list::reverse_iterator Reverse_iterator
Type for reverse iterator pointing into a mutable structure of this type.
std::pair< Iterator, bool > insert(Value const &key_and_mapped)
Attempts to insert the given key/mapped-value pair into the map.
static Ptr ptr_cast(const From_ptr &ptr_to_cast)
Provides syntactic-sugary way to perform a static_pointer_cast<> from a compatible smart pointer type...
boost::shared_ptr< Peer_socket > Ptr
Short-hand for ref-counted pointer to mutable values of type Target_type::element_type (a-la T*).
Const_target_ptr Const_ptr
Short-hand for ref-counted pointer to immutable values of type Target_type::element_type (a-la T cons...
Similar to ostringstream but allows fast read-only access directly into the std::string being written...
#define FLOW_ERROR_SYS_ERROR_LOG_WARNING()
Logs a warning about the (often errno-based or from a library) error code in sys_err_code.
#define FLOW_ERROR_LOG_ERROR(ARG_val)
Logs a warning about the given error code using FLOW_LOG_WARNING().
#define FLOW_ERROR_EXEC_AND_THROW_ON_ERROR(ARG_ret_type, ARG_function_name,...)
Narrow-use macro that implements the error code/exception semantics expected of most public-facing Fl...
#define FLOW_ERROR_EMIT_ERROR(ARG_val)
Sets *err_code to ARG_val and logs a warning about the error using FLOW_LOG_WARNING().
#define FLOW_ERROR_EMIT_ERROR_LOG_INFO(ARG_val)
Identical to FLOW_ERROR_EMIT_ERROR(), but the message logged has flow::log::Sev::S_INFO severity inst...
#define FLOW_LOG_DATA(ARG_stream_fragment)
Logs a DATA message into flow::log::Logger *get_logger() with flow::log::Component get_log_component(...
#define FLOW_LOG_INFO(ARG_stream_fragment)
Logs an INFO message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
#define FLOW_LOG_WITHOUT_CHECKING(ARG_sev, ARG_stream_fragment)
Identical to FLOW_LOG_WITH_CHECKING() but foregoes the filter (Logger::should_log()) check.
#define FLOW_LOG_WARNING(ARG_stream_fragment)
Logs a WARNING message into flow::log::Logger *get_logger() with flow::log::Component get_log_compone...
#define FLOW_LOG_WITH_CHECKING(ARG_sev, ARG_stream_fragment)
Logs a message of the specified severity into flow::log::Logger *get_logger() with flow::log::Compone...
#define FLOW_LOG_TRACE_WITHOUT_CHECKING(ARG_stream_fragment)
Logs a TRACE message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
#define FLOW_LOG_DATA_WITHOUT_CHECKING(ARG_stream_fragment)
Logs a DATA message into flow::log::Logger *get_logger() with flow::log::Component get_log_component(...
#define FLOW_LOG_TRACE(ARG_stream_fragment)
Logs a TRACE message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
Synchronicity
Enumeration indicating the manner in which asio_exec_ctx_post(), and various boost....
void asio_exec_ctx_post(log::Logger *logger_ptr, Execution_context *exec_ctx, Synchronicity synchronicity, Task &&task)
An extension of boost.asio's post() and dispatch() free function templates, this free function templa...
bool exec_void_and_throw_on_error(const Func &func, Error_code *err_code, util::String_view context)
Equivalent of exec_and_throw_on_error() for operations with void return type.
@ S_DATA
Message satisfies Sev::S_TRACE description AND contains variable-length structure (like packet,...
@ S_TRACE
Message indicates any condition that may occur with great frequency (thus verbose if logged).
@ S_INFO
Message indicates a not-"bad" condition that is not frequent enough to be of severity Sev::S_TRACE.
@ S_CONN_TIMEOUT
Other side did not complete connection handshake within the allowed time; perhaps no one is listening...
@ S_USER_CLOSED_ABRUPTLY
User code on this side abruptly closed connection; other side may be informed of this.
@ S_CONN_RESET_TOO_MANY_REXMITS
Connection reset because a packet has been retransmitted too many times.
@ S_SEQ_NUM_IMPLIES_CONNECTION_COLLISION
Other side has sent packet with sequence number that implies a port collision between two connections...
@ S_SEQ_NUM_ARITHMETIC_FAILURE
Other side has sent packets with inconsistent sequence numbers.
@ S_CONN_METADATA_TOO_LARGE
During connection user supplied metadata that is too large.
@ S_CANNOT_CONNECT_TO_IP_ANY
Cannot ask to connect to "any" IP address. Use specific IP address.
@ S_WAIT_USER_TIMEOUT
A blocking (sync_) or background-blocking (async_) operation timed out versus user-supplied time limi...
@ S_WAIT_INTERRUPTED
A blocking (sync_) or background-blocking (async_) operation was interrupted, such as by a signal.
@ S_EVENT_SET_CLOSED
Attempted operation on an event set, when that event set was closed.
@ S_INTERNAL_ERROR_PORT_COLLISION
Internal error: Ephemeral port double reservation allowed.
@ S_NODE_NOT_RUNNING
Node not running.
Flow module containing the API and implementation of the Flow network protocol, a TCP-inspired stream...
uint16_t flow_port_t
Logical Flow port type (analogous to a UDP/TCP port in spirit but in no way relevant to UDP/TCP).
const flow_port_t S_PORT_ANY
Special Flow port value used to indicate "invalid port" or "please pick a random available ephemeral ...
std::ostream & operator<<(std::ostream &os, const Congestion_control_selector::Strategy_choice &strategy_choice)
Serializes a Peer_socket_options::Congestion_control_strategy_choice enum to a standard ostream – the...
bool key_exists(const Container &container, const typename Container::key_type &key)
Returns true if and only if the given key is present at least once in the given associative container...
Auto_cleanup setup_auto_cleanup(const Cleanup_func &func)
Provides a way to execute arbitrary (cleanup) code at the exit of the current block.
std::string buffers_dump_string(const Const_buffer_sequence &data, const std::string &indentation, size_t bytes_per_line)
Identical to buffers_to_ostream() but returns an std::string instead of writing to a given ostream.
bool subtract_with_floor(Minuend *minuend, const Subtrahend &subtrahend, const Minuend &floor)
Performs *minuend -= subtrahend, subject to a floor of floor.
Integer ceil_div(Integer dividend, Integer divisor)
Returns the result of the given non-negative integer divided by a positive integer,...
bool in_open_open_range(T const &min_val, T const &val, T const &max_val)
Returns true if and only if the given value is within the given range, given as a (low,...
Scheduled_task_handle schedule_task_from_now(log::Logger *logger_ptr, const Fine_duration &from_now, bool single_threaded, Task_engine *task_engine, Scheduled_task_handler &&task_body_moved)
Schedule the given function to execute in a certain amount of time: A handy wrapper around Timer (asi...
bool scheduled_task_fired(log::Logger *logger_ptr, Scheduled_task_const_handle task)
Returns whether a previously scheduled (by schedule_task_from_now() or similar) task has already fire...
bool in_open_closed_range(T const &min_val, T const &val, T const &max_val)
Returns true if and only if the given value is within the given range, given as a (low,...
void ostream_op_to_string(std::string *target_str, T const &... ostream_args)
Writes to the specified string, as if the given arguments were each passed, via << in sequence,...
Fine_duration scheduled_task_fires_from_now_or_canceled(log::Logger *logger_ptr, Scheduled_task_const_handle task)
Returns how long remains until a previously scheduled (by schedule_task_from_now() or similar) task f...
boost::shared_ptr< Scheduled_task_handle_state > Scheduled_task_handle
Black-box type that represents a handle to a scheduled task as scheduled by schedule_task_at() or sch...
bool in_closed_range(T const &min_val, T const &val, T const &max_val)
Returns true if and only if the given value is within the given range, inclusive.
boost::shared_ptr< void > Auto_cleanup
Helper type for setup_auto_cleanup().
boost::asio::io_context Task_engine
Short-hand for boost.asio event service, the central class of boost.asio.
bool scheduled_task_cancel(log::Logger *logger_ptr, Scheduled_task_handle task)
Attempts to prevent the execution of a previously scheduled (by schedule_task_from_now() or similar) ...
Blob_with_log_context<> Blob
A concrete Blob_with_log_context that compile-time-disables Basic_blob::share() and the sharing API d...
boost::system::error_code Error_code
Short-hand for a boost.system error code (which basically encapsulates an integer/enum error code and...
Flow_log_component
The flow::log::Component payload enumeration comprising various log components used by Flow's own int...
Fine_clock::duration Fine_duration
A high-res time duration as computed from two Fine_time_pts.
Fine_clock::time_point Fine_time_pt
A high-res time point as returned by Fine_clock::now() and suitable for precise time math in general.
unsigned char uint8_t
Byte. Best way to represent a byte of binary data. This is 8 bits on all modern systems.
Specifies the outgoing (pre-serialization) acknowledgment of a single received Data_packet,...
Equivalent of Individual_ack_rexmit_off but for sockets with retransmission enabled.
Specifies the incoming (post-deserialization) acknowledgment of a single received Data_packet.
boost::shared_ptr< const Individual_ack > Const_ptr
Short-hand for ref-counted pointer to immutable objects of this class.
uint64_t ack_delay_t
Type used to store the ACK delay for a given individual acknowledged packet.
Fine_duration Ack_delay_time_unit
Ack_delay_time_unit(1) is the duration corresponding to the ack_delay_t value 1; and proportionally f...
uint32_t rcv_wnd_t
Type used to store the size of m_rcv_wnd member in a couple of different packet types.
uint8_t rexmit_id_t
Type used to store the retransmission count in DATA and ACK packets.
The data nugget uniquely identifying a peer-to-peer connection from a remote endpoint to a port in th...
Metadata describing the data sent in the acknowledgment of an individual received packet.
boost::shared_ptr< const Individual_ack > Const_ptr
Short-hand for ref-counted pointer to immutable objects of this class.
boost::shared_ptr< Individual_ack > Ptr
Short-hand for ref-counted pointer to mutable objects of this class.
Metadata (and data, if retransmission is on) for a packet that has been received (and,...
const size_t m_size
Number of bytes in the Data_packet::m_data field of that packet.
Received_packet(log::Logger *logger_ptr, size_t size, util::Blob *src_data)
Constructs object by storing size of data and, if so instructed, the data themselves.
util::Blob m_data
Byte sequence equal to that of Data_packet::m_data of the packet.
Data store to keep timing related info when a packet is sent out.
const order_num_t m_order_num
Order number of the packet.
size_t m_sent_cwnd_bytes
The congestion window size (in bytes) that is used when the packet is sent out.
Fine_time_pt m_sent_time
The timestamp when the packet is sent out.
Metadata (and data, if retransmission is on) for a packet that has been sent one (if retransmission i...
Sent_packet(bool rexmit_on, boost::shared_ptr< Data_packet > packet, const Sent_when &sent_when)
Constructs object with the given values and m_acks_after_me at zero.
std::vector< Sent_when > m_sent_when
Time stamps, order numbers, and other info at the times when the different attempts (including origin...
const size_t m_size
Number of bytes in the Data_packet::m_data field of the sent packet.
const boost::shared_ptr< Data_packet > m_packet
If retransmission is on, this is the DATA packet itself that was sent; otherwise null.
uint16_t ack_count_t
Type used for m_acks_after_me.
ack_count_t m_acks_after_me
The number of times any packet with m_sent_when.back().m_order_num > this->m_sent_when....
A data store that keeps stats about the a Peer_socket connection.
Peer_socket_send_stats m_snd
Stats for outgoing direction of traffic. As opposed to the other m_snd_* members, this typically accu...
Node_options m_node_opts
Per-node options currently set on the socket's Node.
size_t m_low_lvl_max_buf_size
The UDP receive buffer maximum size, as reported by an appropriate call to the appropriate getsockopt...
size_t m_rcv_buf_size
The number of bytes in the internal Receive buffer.
size_t m_rcv_wnd_last_advertised
The last rcv_wnd (receive window) size sent to sender (not necessarily received; packets can be lost)...
Fine_duration m_snd_pacing_slice_period
In pacing, the duration of the current pacing time slice.
size_t m_rcv_reassembly_q_data_size
If rexmit_on is false then 0; otherwise the total DATA payload in the reassembly queue of the socket.
size_t m_snd_pacing_bytes_allowed_this_slice
This many bytes worth of DATA packets may still be sent, at this time, within the time slice defined ...
Peer_socket_options m_sock_opts
Per-socket options currently set on the socket.
size_t m_snd_buf_size
The number of bytes in the internal Send buffer.
size_t m_rcv_syn_rcvd_data_cumulative_size
Total size of DATA payload queued while waiting for SYN_ACK_ACK in SYN_RCVD state.
size_t m_rcv_syn_rcvd_data_q_size
Number of DATA packets queued while waiting for SYN_ACK_ACK in SYN_RCVD state.
std::string m_int_state_str
The internal state of the socket, rendered into string (e.g., "SYN_RECEIVED" or "ESTABLISHED").
Fine_time_pt m_snd_pacing_slice_start
In pacing, the time point marking the beginning of the current pacing time slice.
size_t m_snd_cong_ctl_in_flight_count
In congestion control, the current sent data packets that have been neither acknowledged nor consider...
size_t m_snd_cong_ctl_in_flight_bytes
In congestion control, the current sent data bytes that have been neither acknowledged nor considered...
double m_snd_est_bandwidth_mbit_per_sec
Estimate of the currently available (to this connection) outgoing bandwidth, in megabits per second.
size_t m_rcv_wnd
Receive window size = max Receive buffer space minus space taken. Infinity if flow control disabled.
size_t m_rcv_packets_with_gaps
Number of DATA packets tracked in structure tracking all valid received packets such at least one pac...
size_t m_snd_cong_ctl_wnd_bytes
In congestion control, the current congestion window (number of outgoing data bytes allowed In-flight...
Fine_duration m_snd_smoothed_round_trip_time
Estimated current round trip time of packets, computed as a smooth value over the past individual RTT...
Error_code m_disconnect_cause
If the socket is closing or closed, this is the reason for the closure; otherwise the default-constru...
size_t m_snd_cong_ctl_wnd_count_approx
In congestion control, the approximate equivalent of m_snd_cong_ctl_in_flight_bytes as a full packet ...
size_t m_snd_rcv_wnd
The receive window (rcv_wnd a/k/a free Receive buffer space) value of the peer socket on the other si...
bool m_is_active_connect
true if this is the "client" socket (connect()ed); false otherwise (accept()ed).
size_t m_snd_pacing_packet_q_size
In pacing, number of packets currently queued to be sent out by the pacing module.
Fine_duration m_snd_round_trip_time_variance
RTTVAR used for m_snd_smoothed_round_trip_time calculation; it is the current RTT variance.
Peer_socket_receive_stats m_rcv
Stats for incoming direction of traffic. As opposed to the other m_rcv_* members, this typically accu...
Fine_duration m_snd_drop_timeout
Drop Timeout: how long a given packet must remain unacknowledged to be considered dropped due to Drop...
A set of low-level options affecting a single Peer_socket.
Fine_duration m_st_init_drop_timeout
Once socket enters ESTABLISHED state, this is the value for Peer_socket::m_snd_drop_timeout until the...
unsigned int m_st_max_rexmissions_per_packet
If retransmission is enabled and a given packet is retransmitted this many times and has to be retran...
size_t m_st_rcv_buf_max_size
Maximum number of bytes that the Receive buffer can hold.
size_t m_st_cong_ctl_max_cong_wnd_blocks
The constant that determines the CWND limit in Congestion_control_classic_data::congestion_window_at_...
Fine_duration m_st_snd_bandwidth_est_sample_period_floor
When estimating the available send bandwidth, each sample must be compiled over at least this long of...
unsigned int m_st_cong_ctl_cong_avoidance_increment_blocks
The multiple of max-block-size by which to increment CWND in congestion avoidance mode after receivin...
size_t m_st_cong_ctl_cong_wnd_on_drop_timeout_blocks
On Drop Timeout, set congestion window to this value times max-block-size.
size_t m_st_cong_ctl_init_cong_wnd_blocks
The initial size of the congestion window, given in units of max-block-size-sized blocks.
bool m_st_rexmit_on
Whether to enable reliability via retransmission.
size_t m_st_snd_buf_max_size
Maximum number of bytes that the Send buffer can hold.
Fine_duration m_st_connect_retransmit_period
How often to resend SYN or SYN_ACK while SYN_ACK or SYN_ACK_ACK, respectively, has not been received.
Fine_duration m_dyn_rcv_wnd_recovery_timer_period
When the mode triggered by rcv-buf-max-size-to-advertise-percent being exceeded is in effect,...
Fine_duration m_st_connect_retransmit_timeout
How long from the first SYN or SYN_ACK to allow for connection handshake before aborting connection.
size_t m_st_max_full_blocks_before_ack_send
If there are at least this many TIMES max-block-size bytes' worth of individual acknowledgments to be...
Fine_duration m_st_delayed_ack_timer_period
The maximum amount of time to delay sending ACK with individual packet's acknowledgment since receivi...
unsigned int m_dyn_drop_timeout_backoff_factor
Whenever the Drop Timer fires, upon the requisite Dropping of packet(s), the DTO (Drop Timeout) is se...
size_t m_st_max_block_size
The size of block that we will strive to (and will, assuming at least that many bytes are available i...
unsigned int m_st_cong_ctl_classic_wnd_decay_percent
In classic congestion control, RFC 5681 specifies the window should be halved on loss; this option al...
unsigned int m_st_rcv_buf_max_size_to_advertise_percent
% of rcv-buf-max-size that has to be freed, since the last receive window advertisement,...
unsigned int m_st_rcv_max_packets_after_unrecvd_packet_ratio_percent
The limit on the size of Peer_socket::m_rcv_packets_with_gaps, expressed as what percentage the maxim...
Fine_duration m_dyn_drop_timeout_ceiling
Ceiling to impose on the Drop Timeout.
Represents the remote endpoint of a Flow-protocol connection; identifies the UDP endpoint of the remo...
util::Udp_endpoint m_udp_endpoint
UDP address (IP address/UDP port) where the Node identified by this endpoint bound its low-level UDP ...
#define FLOW_UTIL_WHERE_AM_I_STR()
Same as FLOW_UTIL_WHERE_AM_I() but evaluates to an std::string.