26#include <boost/algorithm/string.hpp>
27#include <boost/tuple/tuple.hpp>
42 m_active_connect(false),
43 m_state(
State::S_CLOSED),
46 m_rcv_buf(logger_ptr, 0),
48 m_snd_buf(logger_ptr, max_block_size()),
49 m_serialized_metadata(logger_ptr),
52 m_rcv_syn_rcvd_data_cumulative_size(0),
53 m_rcv_reassembly_q_data_size(0),
54 m_rcv_pending_acks_size_at_recv_handler_start(0),
55 m_snd_pending_rcv_wnd(0),
56 m_rcv_last_sent_rcv_wnd(0),
57 m_rcv_in_rcv_wnd_recovery(false),
58 m_rcv_delayed_ack_timer(*task_engine),
59 m_snd_flying_bytes(0),
60 m_snd_last_order_num(0),
61 m_snd_rexmit_q_size(0),
62 m_snd_remote_rcv_wnd(0),
63 m_snd_smoothed_round_trip_time(0),
64 m_round_trip_time_variance(0),
65 m_snd_drop_timeout(0),
66 m_snd_pacing_data(task_engine),
68 m_init_rexmit_count(0)
71 FLOW_LOG_TRACE(
"Peer_socket [" <<
static_cast<void*
>(
this) <<
"] created.");
109 return sync_send(tag, Fine_duration::max(), err_code);
116 namespace bind_ns = util::bind_ns;
120 bind_ns::cref(wait_until), _1);
124 const Function<size_t (
size_t)> empty_snd_buf_feed_func;
125 assert(empty_snd_buf_feed_func.empty());
139 const Ptr sock = shared_from_this();
146 return m_node->
send(sock, snd_buf_feed_func, err_code);
153 using boost::adopt_lock;
158 const Ptr sock = shared_from_this();
184 snd_buf_feed_func_or_empty.empty()
187 {
return m_node->
send(sock, snd_buf_feed_func_or_empty, err_code); }),
189 wait_until, err_code);
194 return sync_receive(tag, Fine_duration::max(), err_code);
201 namespace bind_ns = util::bind_ns;
205 bind_ns::cref(wait_until), _1);
209 const Function<size_t ()> empty_rcv_buf_consume_func;
210 assert(empty_rcv_buf_consume_func.empty());
224 const Ptr sock = shared_from_this();
231 return m_node->
receive(sock, rcv_buf_consume_func, err_code);
238 using boost::adopt_lock;
243 const Ptr sock = shared_from_this();
261 rcv_buf_consume_func_or_empty.empty()
264 {
return m_node->
receive(sock, rcv_buf_consume_func_or_empty, err_code); }),
266 wait_until, err_code);
283 const Ptr sock = shared_from_this();
299 namespace bind_ns = util::bind_ns;
307 const Ptr sock = shared_from_this();
335 const Const_ptr sock = shared_from_this();
358 const unsigned int* inflate_pct_val_ptr)
const
364 const unsigned int inflate_pct = inflate_pct_val_ptr ? (*inflate_pct_val_ptr) : 0;
393 namespace bind_ns = util::bind_ns;
431 os.os() << bytes <<
'~' << (bytes / block);
432 if ((bytes % block) != 0)
441 boost::shared_ptr<Data_packet> packet,
443 m_size(packet->m_data.size()),
444 m_sent_when({ sent_when }),
446 m_packet(
rexmit_on ? packet : boost::shared_ptr<Data_packet>())
460 m_data = std::move(*src_data);
476 boost::shared_ptr<const Syn_ack_packet> syn_ack)
484 FLOW_LOG_INFO(
"NetFlow worker thread continuing active-connect of [" << sock <<
"]. "
485 "Received [" << syn_ack->m_type_ostream_manip <<
"] with "
486 "ISN [" << syn_ack->m_init_seq_num <<
"]; "
487 "security token [" << syn_ack->m_packed.m_security_token <<
"].");
491 if (!async_low_lvl_syn_ack_ack_send_or_close_immediately(sock, syn_ack))
503 sock->m_rcv_init_seq_num = syn_ack->m_init_seq_num;
504 sock->m_rcv_next_seq_num = sock->m_rcv_init_seq_num + 1;
517 setup_drop_timer(socket_id, sock);
520 sock->m_snd_remote_rcv_wnd = syn_ack->m_packed.m_rcv_wnd;
532 event_set_all_check_delta(
true);
540 boost::shared_ptr<const Syn_ack_packet> syn_ack)
548 FLOW_LOG_INFO(
"NetFlow worker thread working on [" << sock <<
"]. "
550 "received duplicate [" << syn_ack->m_type_ostream_manip <<
"] with "
551 "ISN [" << syn_ack->m_init_seq_num <<
"]; "
552 "security token [" << syn_ack->m_packed.m_security_token <<
"]. "
553 "Could be from packet loss.");
557 async_low_lvl_syn_ack_ack_send_or_close_immediately(sock, syn_ack);
562 boost::shared_ptr<Data_packet> packet,
563 bool syn_rcvd_qd_packet)
617 const bool rexmit_on = sock->rexmit_on();
620 auto& data = packet->m_data;
621 assert(!data.empty());
623 const size_t data_size = data.size();
631 FLOW_LOG_TRACE(
"NetFlow worker thread working on [" << sock <<
"]. "
632 "Received [" << packet->m_type_ostream_manip <<
"] with "
633 "sequence number [" << seq_num <<
"]; data size [" << data_size <<
"].");
638 log_rcv_window(sock);
654 const Error_code cat_result = sock_categorize_data_to_established(sock, packet, &dupe, &slide, &slide_size);
664 rst_and_close_connection_immediately(socket_id, sock, cat_result,
true);
697 async_acknowledge_packet(sock, seq_num, packet->m_rexmit_id, data_size);
713 if (!sock_data_to_rcv_buf_unless_overflow(sock, packet))
737 sock_rcv_buf_now_readable(sock, syn_rcvd_qd_packet);
745 async_acknowledge_packet(sock, seq_num, 0, data_size);
754 sock_track_new_data_after_gap_rexmit_off(sock, packet, data_size, &slide, &slide_size);
766 sock_slide_rcv_next_seq_num(sock, slide_size,
false);
781 if (!sock_data_to_rcv_buf_unless_overflow(sock, packet))
798 sock_slide_rcv_next_seq_num(sock, slide_size,
true);
802 sock_rcv_buf_now_readable(sock, syn_rcvd_qd_packet);
804 else if (!sock_data_to_reassembly_q_unless_overflow(sock, packet))
818 async_acknowledge_packet(sock, seq_num, packet->m_rexmit_id, data_size);
822 log_rcv_window(sock);
826 boost::shared_ptr<const Data_packet> packet,
827 bool* dupe,
bool* slide,
size_t* slide_size)
829 assert(dupe && slide && slide_size);
841 const auto& data = packet->m_data;
846 advance_seq_num(&seq_num_end, data.size());
849 bool first_gap_exists;
852 rcv_get_first_gap_info(sock, &first_gap_exists, &seq_num_after_first_gap);
866 "Received [" << packet->m_type_ostream_manip <<
"] with "
867 "sequence number [" << seq_num <<
"]; data size [" << data.size() <<
"]; "
868 "sequence number precedes "
869 "ISN [" << sock->m_rcv_init_seq_num <<
"].");
875 if (seq_num < rcv_next_seq_num)
882 if (seq_num_end > rcv_next_seq_num)
886 "Received [" << packet->m_type_ostream_manip <<
"] with "
887 "sequence numbers [" << seq_num <<
", " << seq_num_end <<
"); "
888 "data size [" << data.size() <<
"]; "
889 "straddle first unreceived "
890 "sequence number [" << rcv_next_seq_num <<
"].");
897 FLOW_LOG_TRACE(
"Duplicate packet before first unreceived sequence number [" << rcv_next_seq_num <<
"].");
910 if (seq_num == rcv_next_seq_num)
916 if (first_gap_exists && (seq_num_end > seq_num_after_first_gap))
920 "Received [" << packet->m_type_ostream_manip <<
"] with "
921 "sequence numbers [" << seq_num <<
", " << seq_num_end <<
"); "
922 "data size [" << data.size() <<
"]; "
923 "supposed gap-filling data "
924 "straddle the boundary of packet [" << seq_num_after_first_gap <<
", ...).");
930 FLOW_LOG_TRACE(
"Packet filled first [" << data.size() <<
"] unreceived sequence numbers "
931 "starting with [" << rcv_next_seq_num <<
"].");
935 *slide_size = size_t(seq_num_end - seq_num);
936 assert(*slide_size == data.size());
941 assert(seq_num > rcv_next_seq_num);
983 if (next_packet == rcv_packets_with_gaps.end())
991 if (first_gap_exists)
995 get_seq_num_range(last_packet, 0, &seq_num_last_end);
997 if (seq_num_last_end > seq_num)
1003 "Received [" << packet->m_type_ostream_manip <<
"] with "
1004 "sequence numbers [" << seq_num <<
", " << seq_num_end <<
"); "
1005 "data size [" << data.size() <<
"]; "
1006 "supposed middle gap-filling packet data "
1007 "straddle the boundary of last packet [..., " << seq_num_last_end <<
").");
1015 FLOW_LOG_TRACE(
"New packet is newest packet after unreceived gap; "
1016 "sequence numbers [" << seq_num <<
", " << seq_num_end <<
"); "
1017 "first unreceived packet [" << rcv_next_seq_num <<
"].");
1023 FLOW_LOG_TRACE(
"New packet forms gap; sequence numbers [" << seq_num <<
", " << seq_num_end <<
"); "
1024 "first unreceived packet [" << rcv_next_seq_num <<
"].");
1034 get_seq_num_range(next_packet, &seq_num_next_start, &seq_num_next_end);
1036 if (seq_num_next_start == seq_num)
1041 if (seq_num_next_end != seq_num_end)
1047 "Received [" << packet->m_type_ostream_manip <<
"] with "
1048 "sequence numbers [" << seq_num <<
", " << seq_num_end <<
"); "
1049 "data size [" << data.size() <<
"]; "
1050 "do not match supposed "
1051 "duplicate packet [" << seq_num <<
", " << seq_num_next_end <<
").");
1062 "sequence numbers [" << seq_num <<
", " << seq_num_end <<
").");
1068 assert(seq_num_next_start > seq_num);
1080 if (seq_num_end > seq_num_next_start)
1086 "Received [" << packet->m_type_ostream_manip <<
"] with "
1087 "sequence numbers [" << seq_num <<
", " << seq_num_end <<
"); "
1088 "data size [" << data.size() <<
"]; "
1089 "supposed middle gap-filling packet data "
1090 "straddle the left boundary of packet "
1091 "[" << seq_num_next_start <<
", " << seq_num_next_end <<
").");
1097 if (next_packet == rcv_packets_with_gaps.begin())
1099 FLOW_LOG_TRACE(
"New packet partially fills first gap without sliding window; "
1100 "sequence numbers [" << seq_num <<
", " << seq_num_end <<
"); "
1101 "first unreceived packet [" << rcv_next_seq_num <<
"].");
1107 get_seq_num_range(prev_packet, &seq_num_prev_start, &seq_num_prev_end);
1109 if (seq_num_prev_end > seq_num)
1115 "Received [" << packet->m_type_ostream_manip <<
"] with "
1116 "sequence numbers [" << seq_num <<
", " << seq_num_end <<
"); "
1117 "data size [" << data.size() <<
"]; "
1118 "supposed middle gap-filling packet data "
1119 "straddle the right boundary of packet "
1120 "[" << seq_num_prev_start <<
", " << seq_num_prev_end <<
").");
1127 "sequence numbers [" << seq_num <<
", " << seq_num_end <<
"); "
1128 "first unreceived packet [" << rcv_next_seq_num <<
"].");
1134 boost::shared_ptr<Data_packet> packet)
1143 Blob& data = packet->m_data;
1145 const size_t data_size = data.size();
1161 if ((sock->m_rcv_buf.data_size() + data_size)
1162 > sock->max_block_size_multiple(sock->m_opts.m_st_rcv_buf_max_size,
1163 &sock->m_opts.m_st_rcv_buf_max_size_slack_percent))
1171 FLOW_LOG_INFO(
"NetFlow worker thread working on [" << sock <<
"]. "
1172 "Received [" << packet->m_type_ostream_manip <<
"] with "
1173 "sequence numbers [" << packet->m_seq_num <<
", " << (packet->m_seq_num + data_size) <<
"); "
1174 "data size [" << data_size <<
"]; "
1175 "dropping because Receive buffer full.");
1188 const size_t written =
1190 sock->m_rcv_buf.feed_buf_move(&data, std::numeric_limits<size_t>::max());
1192 assert(written == data_size);
1194 buf_size = sock->m_rcv_buf.data_size();
1205 receive_wnd_recovery_data_received(sock);
1233 if ((!syn_rcvd_qd_packet) &&
1237 event_set_all_check_delta(
true);
1245 boost::shared_ptr<const Data_packet> packet,
1247 bool* slide,
size_t* slide_size)
1249 using std::make_pair;
1265 const size_t max_packets_after_unrecvd_packet = sock_max_packets_after_unrecvd_packet(sock);
1272 const auto insert_result =
1274 rcv_packets_with_gaps.insert
1278 assert(!sock->rexmit_on());
1279 assert(insert_result.second);
1282 bool first_gap_exists;
1286 rcv_get_first_gap_info(sock, &first_gap_exists, &seq_num_after_first_gap);
1287 assert(first_gap_exists);
1296 if (rcv_packets_with_gaps.size() == max_packets_after_unrecvd_packet + 1)
1300 *slide_size = size_t(seq_num_after_first_gap - sock->m_rcv_next_seq_num);
1306 FLOW_LOG_INFO(
"NetFlow worker thread working on [" << sock <<
"]. "
1307 "Received [" << packet->m_type_ostream_manip <<
"] with "
1308 "sequence numbers [" << packet->m_seq_num <<
", " << (packet->m_seq_num + data_size) <<
"); "
1309 "exceeded max gapped packet list size [" << max_packets_after_unrecvd_packet <<
"]; "
1310 "assuming Dropped; "
1311 "will fake receiving all [" << slide_size <<
"] sequence numbers in the first unreceived gap.");
1316 assert(rcv_packets_with_gaps.size() <= max_packets_after_unrecvd_packet);
1321 boost::shared_ptr<Data_packet> packet)
1323 using std::make_pair;
1332 auto& data = packet->m_data;
1334 const size_t data_size = data.size();
1339 size_t max_packets_after_unrecvd_packet = sock_max_packets_after_unrecvd_packet(sock);
1387 size_t max_packets_in_reassembly_q
1388 = sock->max_block_size_multiple(sock->m_opts.m_st_rcv_buf_max_size,
1389 &sock->m_opts.m_st_rcv_buf_max_size_slack_percent);
1391 size_t rcv_buf_size;
1394 rcv_buf_size = sock->m_rcv_buf.data_size();
1399 max_packets_in_reassembly_q /= sock->max_block_size();
1403 max_packets_in_reassembly_q += rcv_packets_with_gaps.size();
1406 if (max_packets_in_reassembly_q < max_packets_after_unrecvd_packet)
1408 max_packets_after_unrecvd_packet = max_packets_in_reassembly_q;
1413 FLOW_LOG_TRACE(
"Unexpected Receive buffer limits: safety net [" << max_packets_after_unrecvd_packet <<
"] <= "
1414 "real limit [" << max_packets_in_reassembly_q <<
"], but the opposite is typical. "
1415 "See details just below.");
1418 if (rcv_packets_with_gaps.size() + 1 > max_packets_after_unrecvd_packet)
1432 "Received [" << packet->m_type_ostream_manip <<
"] with "
1433 "sequence numbers [" << packet->m_seq_num <<
", " << (packet->m_seq_num + data_size) <<
"); "
1434 "exceeded max gapped packet list size [" << max_packets_after_unrecvd_packet <<
"]; "
1435 "dropping packet.");
1440 FLOW_LOG_TRACE(
"NetFlow worker thread working on [" << sock <<
"]. "
1441 "Enqueueing [" << packet->m_type_ostream_manip <<
"] payload onto reassembly queue with "
1442 "sequence numbers [" << packet->m_seq_num <<
", " << (packet->m_seq_num + data_size) <<
") "
1443 "of size [" << data_size <<
"]; "
1444 "successfully fit into max gapped packet list size [" << max_packets_after_unrecvd_packet <<
"]; "
1445 "could have fit [" << (max_packets_after_unrecvd_packet - rcv_packets_with_gaps.size()) <<
"] more.");
1449 const auto insert_result =
1451 rcv_packets_with_gaps.insert
1454 sock->m_rcv_reassembly_q_data_size += data_size;
1455 assert(insert_result.second);
1466 receive_wnd_recovery_data_received(sock);
1484 rcv_next_seq_num += slide_size;
1487 "[" << (rcv_next_seq_num - slide_size) <<
"] to "
1488 "[" << rcv_next_seq_num <<
"].");
1500 size_t total_written = 0;
1503 for (end_contig_it = start_contig_it;
1507 (end_contig_it != rcv_packets_with_gaps.end()) && (end_contig_it->first == rcv_next_seq_num);
1512 if (reassembly_in_progress)
1532 written = sock->m_rcv_buf.feed_buf_move(&rcvd_packet.
m_data, std::numeric_limits<size_t>::max());
1534 buf_size = sock->m_rcv_buf.data_size();
1536 total_written += written;
1542 assert(written != 0);
1545 advance_seq_num(&rcv_next_seq_num, rcvd_packet.
m_size);
1548 "[" << rcv_next_seq_num <<
"]; packet subsumed by this move.");
1552 rcv_packets_with_gaps.erase(start_contig_it, end_contig_it);
1553 sock->m_rcv_reassembly_q_data_size -= total_written;
1563 return uint64_t(sock->opt(sock->m_opts.m_st_rcv_max_packets_after_unrecvd_packet_ratio_percent)) *
1564 uint64_t(sock->opt(sock->m_opts.m_st_rcv_buf_max_size)) /
1565 uint64_t(sock->max_block_size()) /
1573 *first_gap_exists = !sock->m_rcv_packets_with_gaps.empty();
1575 if (*first_gap_exists)
1577 *seq_num_after_first_gap = sock->m_rcv_packets_with_gaps.begin()->first;
1590 sock->m_rcv_stats.total_to_send_ack_packet(data_size);
1592 const size_t acks_pending_before_this = sock->m_rcv_pending_acks.size();
1598 sock->m_rcv_pending_acks.push_back
1627 if (m_socks_with_accumulated_pending_acks.insert(sock).second)
1633 sock->m_rcv_pending_acks_size_at_recv_handler_start = acks_pending_before_this;
1640 using boost::chrono::milliseconds;
1641 using boost::chrono::microseconds;
1642 using boost::chrono::duration_cast;
1643 using boost::chrono::round;
1651 vector<Peer_socket::Individual_ack::Ptr>& pending_acks = sock->m_rcv_pending_acks;
1656 FLOW_LOG_TRACE(
"Was about to perform accumulated acknowledgment tasks on [" << sock <<
"] but skipping because "
1657 "state is now [" << sock->m_int_state <<
"].");
1662 assert(!pending_acks.empty());
1707 const Fine_duration delayed_ack_timer_period = sock->opt(sock->m_opts.m_st_delayed_ack_timer_period);
1709 bool force_ack = delayed_ack_timer_period == Fine_duration::zero();
1714 (
"Delayed [ACK] feature disabled on [" << sock <<
"]; forcing immediate [ACK]. "
1715 "Receive window state: [" << sock->m_rcv_init_seq_num <<
", " << sock->m_rcv_next_seq_num <<
") "
1716 "| " << sock->m_rcv_packets_with_gaps.size() <<
":{...}.");
1718 else if (!sock->m_rcv_packets_with_gaps.empty())
1730 for (
size_t ack_idx = sock->m_rcv_pending_acks_size_at_recv_handler_start;
1731 ack_idx != pending_acks.size(); ++ack_idx)
1733 ack = pending_acks[ack_idx];
1734 if (ack->m_seq_num > sock->m_rcv_next_seq_num)
1744 (
"On [" << sock <<
"] "
1745 "received out-of-order packet [" << ack->m_seq_num <<
", size " << ack->m_data_size <<
", "
1746 "rexmit " << ack->m_rexmit_id <<
"]; "
1747 "forcing immediate [ACK]. "
1748 "Receive window state: [" << sock->m_rcv_init_seq_num <<
", " << sock->m_rcv_next_seq_num <<
") "
1749 "| " << sock->m_rcv_packets_with_gaps.size() <<
":{...}.");
1757 = sock->opt(sock->m_opts.m_st_max_full_blocks_before_ack_send) * sock->max_block_size();
1761 bytes += ack->m_data_size;
1772 "accumulated at least [" << limit <<
"] bytes to acknowledge; "
1773 "forcing immediate [ACK].");
1799 if (sock->m_rcv_pending_acks_size_at_recv_handler_start != 0)
1802 "canceling delayed [ACK] timer due to forcing "
1803 "immediate [ACK]; would have fired "
1804 "in [" << round<milliseconds>(sock->m_rcv_delayed_ack_timer.expires_from_now()) <<
"] "
1808 const size_t num_canceled = sock->m_rcv_delayed_ack_timer.cancel(sys_err_code);
1816 rst_and_close_connection_immediately(socket_id, sock,
1823 if (num_canceled == 0)
1829 "tried to cancel delayed [ACK] timer while "
1830 "forcing [ACK], but it was already just about to fire.");
1838 async_low_lvl_ack_send(sock,
true);
1841 assert(pending_acks.empty());
1863 if (sock->m_rcv_pending_acks_size_at_recv_handler_start == 0)
1868 sock->m_rcv_delayed_ack_timer.expires_from_now(delayed_ack_timer_period, sys_err_code);
1879 rst_and_close_connection_immediately(socket_id, sock,
1887 "scheduled delayed [ACK] timer to fire "
1888 "in [" << round<milliseconds>(delayed_ack_timer_period) <<
"].");
1891 sock->m_rcv_delayed_ack_timer.async_wait([
this, socket_id, sock](
const Error_code& sys_err_code)
1893 async_low_lvl_ack_send(sock,
false, sys_err_code);
1901 sock->m_rcv_stats.current_pending_to_ack_packets(pending_acks.size());
1908 using boost::algorithm::join;
1923 (
"Receive window state for [" << sock <<
"]: "
1924 "[" << sock->m_rcv_init_seq_num <<
", " << sock->m_rcv_next_seq_num <<
") "
1925 "| " << rcv_packets_with_gaps.size() <<
":{...}.");
1936 vector<string> pkt_strs;
1937 pkt_strs.reserve(rcv_packets_with_gaps.size());
1939 const size_t MAX_TO_SHOW = 100;
1940 bool skipped_some =
false;
1944 pkt_it != rcv_packets_with_gaps.end();
1947 const bool last_iteration = (count == rcv_packets_with_gaps.size() - 1);
1949 if ((!skipped_some) && (count > MAX_TO_SHOW) && (!last_iteration))
1952 skipped_some =
true;
1963 if (!last_iteration)
1970 pkt_str =
"[...skipped...] ";
1975 get_seq_num_range(pkt_it, &start, &end);
1978 pkt_strs.push_back(pkt_str);
1985 "Receive window state for [" << sock <<
"]: "
1986 "[" << sock->m_rcv_init_seq_num <<
", " << sock->m_rcv_next_seq_num <<
") "
1987 "| " << rcv_packets_with_gaps.size() <<
":{" << join(pkt_strs,
" ") <<
"}.");
1991 boost::shared_ptr<const Ack_packet> ack)
2045 sock->m_snd_pending_rcv_wnd = ack->m_rcv_wnd;
2048 sock->m_rcv_acked_packets.insert(sock->m_rcv_acked_packets.end(),
2049 ack->m_rcv_acked_packets.begin(), ack->m_rcv_acked_packets.end());
2050 m_socks_with_accumulated_acks.insert(sock);
2052 FLOW_LOG_TRACE(
"NetFlow worker thread working on [" << sock <<
"]. "
2053 "Received and accumulated [" << ack->m_type_ostream_manip <<
"] with "
2054 "[" << ack->m_rcv_acked_packets.size() <<
"] individual acknowledgments "
2055 "and rcv_wnd = [" << ack->m_rcv_wnd <<
"]; total for this socket in this "
2056 "receive handler is [" << sock->m_rcv_acked_packets.size() <<
"] individual acknowledgments.");
2058 sock->m_snd_stats.received_low_lvl_ack_packet(ack->m_rcv_acked_packets.empty());
2066 using boost::unordered_set;
2067 using boost::chrono::round;
2068 using boost::chrono::milliseconds;
2069 using boost::chrono::seconds;
2079 log_accumulated_acks(sock);
2083 using Acks = vector<Ack_packet::Individual_ack::Ptr>;
2084 Acks& acked_packets = sock->m_rcv_acked_packets;
2097 FLOW_LOG_INFO(
"NetFlow worker thread working on [" << sock <<
"]. "
2098 "Accumulated [ACK] packets with [" << acked_packets.size() <<
"] "
2099 "individual acknowledgments, but state is now [" << sock->m_int_state <<
"]; ignoring ACKs forever.");
2243 const bool rexmit_on = sock->rexmit_on();
2244 auto& snd_stats = sock->m_snd_stats;
2245 auto& snd_flying_pkts_by_when = sock->m_snd_flying_pkts_by_sent_when;
2248 auto& pkts_marked_to_drop = sock->m_snd_temp_pkts_marked_to_drop;
2249 pkts_marked_to_drop.clear();
2252 const bool could_send_before_acks = can_send(sock);
2254 const bool had_rexmit_data_before_acks = !sock->m_snd_rexmit_q.empty();
2259 unordered_set<Peer_socket::order_num_t> flying_now_acked_pkts;
2262 size_t clean_acked_bytes = 0;
2263 size_t clean_acked_packets = 0;
2267 using Clean_acked_packet = tuple<Fine_duration, size_t, size_t>;
2268 vector<Clean_acked_packet> clean_acked_packet_events;
2269 clean_acked_packet_events.reserve(min(acked_packets.size(), snd_flying_pkts_by_when.size()));
2287 const bool error_ack = !categorize_individual_ack(socket_id, sock, ack, &dupe_or_late, &flying_pkt_it);
2300 if (flying_pkt_it != snd_flying_pkts_by_when.past_oldest())
2303 flying_pkt = flying_pkt_it->second;
2304 round_trip_time = compute_rtt_on_ack(flying_pkt, time_now, ack, &sent_when);
2313 assert(!dupe_or_late);
2315 assert(flying_pkt_it != snd_flying_pkts_by_when.past_oldest());
2319 new_round_trip_time_sample(sock, round_trip_time);
2329 const size_t bytes_acked = flying_pkt->m_size;
2331 clean_acked_packet_events.emplace_back(round_trip_time, bytes_acked, cwnd_bytes);
2334 snd_flying_pkts_erase_one(sock, flying_pkt_it);
2337 clean_acked_bytes += bytes_acked;
2338 ++clean_acked_packets;
2363 flying_now_acked_pkts.insert(sent_when->
m_order_num);
2372 = categorize_pkts_as_dropped_on_acks(sock, flying_now_acked_pkts);
2378 size_t dropped_pkts;
2379 size_t dropped_bytes;
2380 size_t cong_ctl_dropped_bytes;
2381 size_t cong_ctl_dropped_pkts;
2382 if (!drop_pkts_on_acks(sock, last_dropped_pkt_it,
2383 &cong_ctl_dropped_pkts, &cong_ctl_dropped_bytes,
2384 &dropped_pkts, &dropped_bytes, &pkts_marked_to_drop))
2405 if (dropped_pkts != 0)
2408 FLOW_LOG_INFO(
"NetFlow worker thread working on [" << sock <<
"]. "
2409 "Considering Dropped: [" << dropped_bytes <<
"] bytes = [" << dropped_pkts <<
"] packets.");
2411 if (cong_ctl_dropped_pkts != 0)
2414 assert(cong_ctl_dropped_bytes != 0);
2416 FLOW_LOG_INFO(
"cong_ctl [" << sock <<
"] update: loss event: "
2417 "Dropped [" << cong_ctl_dropped_bytes <<
"] bytes "
2418 "= [" << cong_ctl_dropped_pkts <<
"] packets.");
2420 sock->m_snd_cong_ctl->on_loss_event(cong_ctl_dropped_bytes, cong_ctl_dropped_pkts);
2421 sock->m_snd_last_loss_event_when = Fine_clock::now();
2428 assert(dropped_pkts == 0);
2429 assert(cong_ctl_dropped_pkts == 0);
2432 if (clean_acked_packets != 0)
2434 assert(clean_acked_bytes != 0);
2435 assert(!clean_acked_packet_events.empty());
2438 for (
const auto& [rtt, bytes, cwnd_bytes] : clean_acked_packet_events)
2440 FLOW_LOG_TRACE(
"cong_ctl [" << sock <<
"] update: clean individual acknowledgment: "
2441 "[" << sock->bytes_blocks_str(bytes) <<
"] with RTT [" << round<milliseconds>(rtt) <<
2442 "] and sent_cwnd_bytes [" << cwnd_bytes <<
"].");
2444 sock->m_snd_cong_ctl->on_individual_ack(rtt, bytes, cwnd_bytes);
2447 FLOW_LOG_TRACE(
"cong_ctl/bw_est [" << sock <<
"] update: clean acknowledgments: "
2448 "[" << sock->bytes_blocks_str(clean_acked_bytes) <<
"] = "
2449 "[" << clean_acked_packets <<
"] packets.");
2452 sock->m_snd_bandwidth_estimator->on_acks(clean_acked_bytes);
2453 sock->m_snd_cong_ctl->on_acks(clean_acked_bytes, clean_acked_packets);
2461 if (dropped_pkts != 0)
2464 snd_stats.dropped_data(dropped_bytes, dropped_pkts);
2466 const seconds MIN_TIME_BETWEEN_LOGS(1);
2467 const Fine_duration since_last_loss_sock_log = Fine_clock::now() - m_last_loss_sock_log_when;
2469 if (since_last_loss_sock_log > MIN_TIME_BETWEEN_LOGS)
2471 FLOW_LOG_INFO(
"Will log socket state on loss, because last such loss-driven logging was "
2472 "[" << round<milliseconds>(since_last_loss_sock_log) <<
" >"
2473 " " << MIN_TIME_BETWEEN_LOGS <<
"] ago.");
2474 sock_log_detail(sock);
2475 m_last_loss_sock_log_when = Fine_clock::now();
2479 FLOW_LOG_INFO(
"Will NOT log socket state on loss, because last such loss-driven logging was "
2480 "[" << round<milliseconds>(since_last_loss_sock_log) <<
" <="
2481 " " << MIN_TIME_BETWEEN_LOGS <<
"] ago.");
2486 log_snd_window(sock);
2496 drop_timer->start_contemporaneous_events();
2498 for (
const auto pkt_order_num : flying_now_acked_pkts)
2500 drop_timer->on_ack(pkt_order_num);
2501 drop_timer->on_packet_no_longer_in_flight(pkt_order_num);
2503 for (
const auto pkt_order_num : pkts_marked_to_drop)
2505 drop_timer->on_packet_no_longer_in_flight(pkt_order_num);
2508 drop_timer->end_contemporaneous_events();
2512 if (sock->m_snd_pending_rcv_wnd != sock->m_snd_remote_rcv_wnd)
2515 "rcv_wnd change [" << sock->m_snd_remote_rcv_wnd <<
"] => [" << sock->m_snd_pending_rcv_wnd <<
"].");
2516 sock->m_snd_remote_rcv_wnd = sock->m_snd_pending_rcv_wnd;
2527 sock->m_snd_stats.updated_rcv_wnd(sock->m_snd_remote_rcv_wnd < sock->max_block_size());
2536 if ((!could_send_before_acks) || (
rexmit_on && (!had_rexmit_data_before_acks)))
2538 send_worker(sock,
true);
2549 assert(dupe_or_late);
2550 assert(acked_pkt_it);
2594 const bool rexmit_on = sock->rexmit_on();
2595 auto& snd_flying_pkts_by_when = sock->m_snd_flying_pkts_by_sent_when;
2596 auto& snd_flying_pkts_by_seq = sock->m_snd_flying_pkts_by_seq_num;
2597 auto& snd_stats = sock->m_snd_stats;
2602 const unsigned int rexmit_id = ack->m_rexmit_id;
2606 snd_stats.received_ack();
2622 "acknowledgment [" << seq_num <<
", ...) is outside (ISN, snd_next) "
2623 "range (" << sock->m_snd_init_seq_num <<
", " << sock->m_snd_next_seq_num <<
").");
2626 snd_stats.error_ack();
2632 rst_and_close_connection_immediately(socket_id, sock,
2642 *acked_pkt_it = snd_flying_pkts_by_when.find(seq_num);
2643 if (*acked_pkt_it == snd_flying_pkts_by_when.past_oldest())
2710 if (pkt_it != snd_flying_pkts_by_seq.begin())
2716 get_seq_num_range(pkt_it->second, &l1, &l2);
2718 assert(l1 < seq_num);
2724 snd_stats.error_ack();
2728 "acknowledgment [" << seq_num <<
", ...) is at least partially inside "
2729 "packet [" << l1 <<
", " << l2 <<
").");
2765 snd_stats.late_or_dupe_ack();
2767 FLOW_LOG_INFO(
"NetFlow worker thread working on [" << sock <<
"]. "
2768 "Acknowledged packet [" << seq_num <<
", ...) is duplicate or late (or invalid). "
2769 "RTT unknown. Ignoring.");
2772 *dupe_or_late =
true;
2773 assert(*acked_pkt_it == snd_flying_pkts_by_when.past_oldest());
2777 assert(*acked_pkt_it != snd_flying_pkts_by_when.past_oldest());
2782 const unsigned int acked_rexmit_id =
rexmit_on ? acked_pkt.
m_packet->m_rexmit_id : 0;
2784 get_seq_num_range(*acked_pkt_it, 0, &seq_num_end);
2788 if (rexmit_id > acked_rexmit_id)
2792 "Acknowledged packet [" << seq_num <<
", " << seq_num_end <<
") "
2793 "rexmit_id [" <<
int(rexmit_id) <<
"] "
2794 "exceeds highest sent rexmit_id [" <<
int(acked_rexmit_id) <<
"].");
2803 if (rexmit_id != acked_rexmit_id)
2805 assert(rexmit_id < acked_rexmit_id);
2843 snd_stats.late_or_dupe_ack();
2845 FLOW_LOG_INFO(
"NetFlow worker thread working on [" << sock <<
"]. "
2846 "Acknowledged packet [" << seq_num <<
", " << seq_num_end <<
") "
2847 "order_num [" << acked_pkt.
m_sent_when[rexmit_id].m_order_num <<
"] "
2848 "rexmit_id [" <<
int(rexmit_id) <<
"] "
2849 "is less than highest sent [" <<
int(acked_rexmit_id) <<
"]. Ignoring.");
2852 *dupe_or_late =
true;
2853 assert(*acked_pkt_it != snd_flying_pkts_by_when.past_oldest());
2857 assert(rexmit_id == acked_rexmit_id);
2862 snd_stats.good_ack(acked_pkt.
m_size);
2865 *dupe_or_late =
false;
2866 assert(*acked_pkt_it != snd_flying_pkts_by_when.past_oldest());
2875 using boost::chrono::milliseconds;
2876 using boost::chrono::round;
2896 const unsigned int rexmit_id = ack->m_rexmit_id;
2898 *sent_when = &(flying_pkt->m_sent_when[rexmit_id]);
2907 const auto& ack_delay = ack->m_delay;
2908 round_trip_time = time_now - (*sent_when)->m_sent_time - ack_delay;
2910 if (round_trip_time.count() < 0)
2919 FLOW_LOG_TRACE(
"Acknowledged packet [" << ack->m_seq_num <<
", ...) "
2920 "order_num [" << order_num <<
"] has negative "
2921 "RTT [" << round_trip_time <<
"]; assuming zero. "
2922 "Sent at [" << (*sent_when)->m_sent_time <<
"]; "
2923 "received at [" << time_now <<
"]; "
2924 "receiver-reported ACK delay [" << ack_delay <<
"].");
2925 round_trip_time = Fine_duration::zero();
2927 FLOW_LOG_TRACE(
"Acknowledged packet [" << ack->m_seq_num <<
", ...) "
2928 "order_num [" << order_num <<
"] "
2929 "has RTT [" << round<milliseconds>(round_trip_time) <<
"] "
2930 "(ACK delay [" << round<milliseconds>(ack_delay) <<
"]).");
2932 return round_trip_time;
2937 const boost::unordered_set<Peer_socket::order_num_t>& flying_now_acked_pkts)
2939 using std::priority_queue;
2955 auto& snd_flying_pkts_by_when = sock->m_snd_flying_pkts_by_sent_when;
3026 priority_queue<Peer_socket::order_num_t>
3027 high_ack_count_q(flying_now_acked_pkts.begin(), flying_now_acked_pkts.end());
3031 ack_count_t ack_increment_after_me = 0;
3035 for (last_dropped_pkt_it = snd_flying_pkts_by_when.newest();
3036 last_dropped_pkt_it != snd_flying_pkts_by_when.past_oldest();
3037 ++last_dropped_pkt_it)
3053 while ((!high_ack_count_q.empty()) &&
3055 (high_ack_count_q.top() > cur_pkt_sent_when.
m_order_num))
3058 ++ack_increment_after_me;
3061 high_ack_count_q.pop();
3068 if (cur_sent_pkt.
m_acks_after_me > S_MAX_LATER_ACKS_BEFORE_CONSIDERING_DROPPED)
3082 get_seq_num_range(last_dropped_pkt_it, &cur_pkt_seq_num, &cur_pkt_seq_num_end);
3085 (
"Unacknowledged packet [" << cur_pkt_seq_num <<
", " << cur_pkt_seq_num_end <<
") "
3086 "order_num [" << cur_pkt_sent_when.
m_order_num <<
"] has "
3088 "for later packets; considering it and "
3089 "all unacknowledged packets sent earlier as Dropped.");
3099 return last_dropped_pkt_it;
3104 size_t* cong_ctl_dropped_pkts,
size_t* cong_ctl_dropped_bytes,
3105 size_t* dropped_pkts,
size_t* dropped_bytes,
3106 std::vector<Peer_socket::order_num_t>* pkts_marked_to_drop)
3126 const bool rexmit_on = sock->rexmit_on();
3127 auto& snd_flying_pkts_by_when = sock->m_snd_flying_pkts_by_sent_when;
3128 auto& snd_stats = sock->m_snd_stats;
3159 *dropped_pkts = snd_flying_pkts_by_when.size();
3160 *dropped_bytes = sock->m_snd_flying_bytes;
3162 *cong_ctl_dropped_bytes = 0;
3163 *cong_ctl_dropped_pkts = 0;
3164 bool loss_event_finished =
false;
3179 auto& snd_rexmit_q = sock->m_snd_rexmit_q;
3180 decltype(sock->m_snd_rexmit_q)::iterator snd_rexmit_q_fulcrum_it = snd_rexmit_q.end();
3183 assert(pkts_marked_to_drop->empty());
3185 auto pkt_it = last_dropped_pkt_it;
3186 while (pkt_it != snd_flying_pkts_by_when.past_oldest())
3189 auto next_pkt_it = boost::next(pkt_it);
3197 if (!loss_event_finished)
3202 && (sent_when.
m_sent_time < sock->m_snd_last_loss_event_when))
3206 loss_event_finished =
true;
3211 *cong_ctl_dropped_bytes += sent_pkt->m_size;
3212 ++(*cong_ctl_dropped_pkts);
3221 if (!ok_to_rexmit_or_close(sock, pkt_it,
true))
3235 snd_rexmit_q_fulcrum_it = snd_rexmit_q.insert(snd_rexmit_q_fulcrum_it, sent_pkt);
3236 ++sock->m_snd_rexmit_q_size;
3246 "Scoreboard must not get otherwise changed when a packet is erased.");
3247 pkts_marked_to_drop->push_back(sent_when.
m_order_num);
3248 snd_flying_pkts_erase_one(sock, pkt_it);
3250 pkt_it = next_pkt_it;
3254 *dropped_pkts -= snd_flying_pkts_by_when.size();
3255 *dropped_bytes -= sock->m_snd_flying_bytes;
3257 if (*cong_ctl_dropped_pkts != 0)
3260 snd_stats.loss_event();
3268 using boost::algorithm::join;
3269 using boost::chrono::symbol_format;
3272 using std::transform;
3281 using Acks = vector<Ack::Ptr>;
3282 const Acks& acked_packets = sock->m_rcv_acked_packets;
3288 vector<string> ack_strs(acked_packets.size());
3289 transform(acked_packets.begin(), acked_packets.end(), ack_strs.begin(),
3290 [](Ack::Const_ptr ack) ->
string
3292 return util::ostream_op_string(
'[', ack->m_seq_num,
", ", int(ack->m_rexmit_id),
", ",
3296 const string ack_str = join(ack_strs,
" ");
3299 "Accumulated [ACK] packets with "
3300 "acknowledgments [seq_num, rexmit_id, delay]: "
3301 "[" << ack_str <<
"].");
3305 FLOW_LOG_TRACE(
"NetFlow worker thread working on [" << sock <<
"]. "
3306 "Accumulated [ACK] packets with "
3307 "[" << acked_packets.size() <<
"] individual acknowledgments.");
3312 log_snd_window(sock);
3328 auto& snd_flying_pkts_by_when = sock->m_snd_flying_pkts_by_sent_when;
3329 auto& snd_flying_pkts_by_seq = sock->m_snd_flying_pkts_by_seq_num;
3332 assert(!snd_flying_pkts_by_when.empty());
3339 log_snd_window(sock);
3341 const bool rexmit_on = sock->rexmit_on();
3343 const bool could_send_before_drops = can_send(sock);
3345 const bool had_rexmit_data_before_drops = !sock->m_snd_rexmit_q.empty();
3350 size_t cong_ctl_dropped_bytes = 0;
3351 size_t cong_ctl_dropped_pkts = 0;
3353 if (drop_all_packets)
3355 cong_ctl_dropped_bytes = sock->m_snd_flying_bytes;
3356 cong_ctl_dropped_pkts = snd_flying_pkts_by_when.size();
3363 pkt_it != snd_flying_pkts_by_when.past_newest();
3367 if (!ok_to_rexmit_or_close(sock, prior(pkt_it.base()),
false))
3378 sock->m_snd_rexmit_q.push_back(pkt_it->second);
3380 sock->m_snd_rexmit_q_size += cong_ctl_dropped_pkts;
3387 snd_flying_pkts_updated(sock, snd_flying_pkts_by_when.newest(), snd_flying_pkts_by_when.past_oldest(),
false);
3388 snd_flying_pkts_by_when.clear();
3389 snd_flying_pkts_by_seq.clear();
3391 packet_marked_to_drop_or_drop_all = 0;
3399 cong_ctl_dropped_bytes = oldest_pkt->m_size;
3400 cong_ctl_dropped_pkts = 1;
3405 if (!ok_to_rexmit_or_close(sock, oldest_pkt_it,
false))
3412 sock->m_snd_rexmit_q.push_back(oldest_pkt);
3413 ++sock->m_snd_rexmit_q_size;
3418 packet_marked_to_drop_or_drop_all = oldest_pkt->m_sent_when.back().m_order_num;
3421 snd_flying_pkts_erase_one(sock, oldest_pkt_it);
3446 FLOW_LOG_INFO(
"cong_ctl [" << sock <<
"] update: Drop Timeout event: "
3447 "Dropped [" << cong_ctl_dropped_bytes <<
"] bytes = [" << cong_ctl_dropped_pkts <<
"] packets.");
3450 sock->m_snd_cong_ctl->on_drop_timeout(cong_ctl_dropped_bytes, cong_ctl_dropped_pkts);
3451 sock->m_snd_last_loss_event_when = Fine_clock::now();
3454 sock->m_snd_stats.drop_timeout();
3455 sock->m_snd_stats.dropped_data(cong_ctl_dropped_bytes, cong_ctl_dropped_pkts);
3458 log_snd_window(sock);
3463 drop_timer->start_contemporaneous_events();
3471 if (packet_marked_to_drop_or_drop_all == 0)
3474 drop_timer->on_no_packets_in_flight_any_longer();
3478 drop_timer->on_packet_no_longer_in_flight(packet_marked_to_drop_or_drop_all);
3483 drop_timer->end_contemporaneous_events();
3490 if ((!could_send_before_drops) || (
rexmit_on && (!had_rexmit_data_before_drops)))
3492 send_worker(sock,
false);
3502 using boost::ratio_subtract;
3503 using boost::ratio_string;
3504 using boost::chrono::round;
3505 using boost::chrono::milliseconds;
3506 using boost::chrono::microseconds;
3507 using boost::chrono::seconds;
3536 if (srtt == Fine_duration::zero())
3544 "srtt = [" << round<milliseconds>(srtt) <<
" = " << srtt <<
"]; "
3545 "rtt_var = [" << round<milliseconds>(rtt_var) <<
" = " << rtt_var <<
"]; "
3546 "rtt = [" << rtt <<
"].");
3564 using Alpha = ratio<1, 8>;
3565 using One_minus_alpha = ratio_subtract<ratio<1>, Alpha>;
3566 using Beta = ratio<1, 4>;
3567 using One_minus_beta = ratio_subtract<ratio<1>, Beta>;
3572 if (abs_srtt_minus_rtt.count() < 0)
3574 abs_srtt_minus_rtt = -abs_srtt_minus_rtt;
3579 = rtt_var * One_minus_beta::num / One_minus_beta::den
3580 + abs_srtt_minus_rtt * Beta::num / Beta::den;
3582 = srtt * One_minus_alpha::num / One_minus_alpha::den
3583 + rtt * Alpha::num / Alpha::den;
3587 "srtt = [" << round<milliseconds>(srtt) <<
" = " << srtt <<
"]; "
3588 "rtt_var = [" << round<milliseconds>(rtt_var) <<
" = " << rtt_var <<
"]; "
3589 "rtt = [" << rtt <<
"]; "
3590 "prev_srtt = [" << prev_srtt <<
"]; "
3591 "prev_rtt_var = [" << prev_rtt_var <<
"]; "
3592 "alpha = " << (ratio_string<Alpha, char>::prefix()) <<
"; "
3593 "(1 - alpha) = " << (ratio_string<One_minus_alpha, char>::prefix()) <<
"; "
3594 "beta = " << (ratio_string<Beta, char>::prefix()) <<
"; "
3595 "(1 - beta) = " << (ratio_string<One_minus_beta, char>::prefix()) <<
"; "
3596 "|srtt - rtt| = [" << abs_srtt_minus_rtt <<
"].");
3613 const Fine_duration clock_resolution_at_least = microseconds(500);
3615 const Fine_duration ceiling = sock->opt(sock->m_opts.m_dyn_drop_timeout_ceiling);
3616 const unsigned int k = 4;
3620 const Fine_duration srtt_plus_var_term = srtt + max(clock_resolution_at_least, rtt_var_k);
3621 dto = max(srtt_plus_var_term, floor);
3622 dto = min(dto, ceiling);
3626 "dto = [" << round<milliseconds>(dto) <<
" = " << dto <<
"]; "
3627 "rtt_var * k = [" << rtt_var_k <<
"]; "
3628 "srtt + max(G, rtt_var * k) = [" << srtt_plus_var_term <<
"]; "
3629 "k = [" << k <<
"]; "
3630 "floor = [" << floor <<
"]; ceiling = [" << ceiling <<
"]; "
3631 "clock_resolution = [" << clock_resolution_at_least <<
"]; "
3632 "prev_dto = [" << prev_dto <<
"].");
3640 using boost::algorithm::join;
3648 const auto& snd_flying_pkts_by_seq = sock->m_snd_flying_pkts_by_seq_num;
3649 const auto& snd_flying_pkts_by_when = sock->m_snd_flying_pkts_by_sent_when;
3650 const size_t num_flying_pkts = snd_flying_pkts_by_seq.size();
3654 if (snd_flying_pkts_by_seq.empty())
3658 "Send window state for [" << sock <<
"]: cong_wnd "
3659 "[" << sock->bytes_blocks_str(sock->m_snd_cong_ctl->congestion_window_bytes()) <<
"]; "
3660 "sent+acked/dropped "
3661 "[" << sock->m_snd_init_seq_num <<
", " << sock->m_snd_next_seq_num <<
") "
3662 "unsent [" << sock->m_snd_next_seq_num <<
", ...).");
3673 (
"Send window state for [" << sock <<
"]: cong_wnd "
3674 "[" << sock->bytes_blocks_str(sock->m_snd_cong_ctl->congestion_window_bytes()) <<
"]; "
3675 "sent+acked/dropped [" << sock->m_snd_init_seq_num <<
", " << snd_flying_pkts_by_seq.begin()->first <<
") "
3676 "in-flight [" << sock->m_snd_flying_bytes <<
"] bytes: " << num_flying_pkts <<
":{...} "
3677 "unsent [" << sock->m_snd_next_seq_num <<
", ...).");
3684 const bool rexmit_on = sock->rexmit_on();
3686 vector<string> pkt_strs;
3687 pkt_strs.reserve(num_flying_pkts);
3689 pkt_it_it != snd_flying_pkts_by_seq.end();
3693 get_seq_num_range(pkt_it_it->second, &start, &end);
3697 String_ostream pkt_str_os;
3698 pkt_str_os.os() <<
'[' << start;
3701 pkt_str_os.os() <<
'[' << int(sent_pkt->m_packet->m_rexmit_id) <<
'/' << sent_pkt->m_sent_when.back().m_order_num
3706 pkt_str_os.os() <<
", ";
3708 pkt_str_os.os() << end <<
")<" << sent_pkt->m_acks_after_me <<
"acks" << flush;
3710 pkt_strs.push_back(pkt_str_os.str());
3715 "Send window state for [" << sock <<
"]: cong_wnd "
3716 "[" << sock->bytes_blocks_str(sock->m_snd_cong_ctl->congestion_window_bytes()) <<
"]; "
3717 "sent+acked/dropped [" << sock->m_snd_init_seq_num <<
", " << snd_flying_pkts_by_seq.begin()->first <<
") "
3719 "[" << sock->m_snd_flying_bytes <<
"] bytes: " << num_flying_pkts <<
":{" << join(pkt_strs,
" ") <<
3720 "} unsent [" << sock->m_snd_next_seq_num <<
", ...).");
3730 vector<string> pkt_strs_time;
3731 pkt_strs_time.reserve(num_flying_pkts);
3734 pkt_it != snd_flying_pkts_by_when.const_past_newest();
3739 get_seq_num_range(prior(pkt_it.base()), &start, &end);
3745 start,
'[',
int(sent_pkt->m_packet->m_rexmit_id),
'/',
3746 sent_pkt->m_sent_when.back().m_order_num,
"], ", end,
")<",
3747 sent_pkt->m_acks_after_me,
"acks");
3748 pkt_strs_time.push_back(pkt_str);
3752 if (pkt_strs_time != pkt_strs)
3756 "Sorted by time sent: {" << join(pkt_strs_time,
" ") <<
"}.");
3765 if (flying_packets.empty())
3772 const Peer_socket::Sent_pkt_by_seq_num_map::value_type& highest_val = *(prior(flying_packets.end()));
3776 advance_seq_num(&seq_num, highest_val.second->second->m_size);
3791 get_seq_num_range(pkt_it, &seq_num, &seq_num_end);
3793 if (sock->rexmit_on())
3796 (
"On [" << sock <<
"] erasing packet [" << seq_num <<
", " << seq_num_end <<
") "
3797 "order_num [" << order_num <<
"] rexmit_id [" <<
int(sent_pkt.
m_packet->m_rexmit_id) <<
"] from "
3798 "snd_flying_pkts* and friends.");
3803 (
"On [" << sock <<
"] erasing packet [" << seq_num <<
", " << seq_num_end <<
") "
3804 "order_num [" << order_num <<
"] from snd_flying_pkts* and friends.");
3809 snd_flying_pkts_updated(sock, pkt_it, boost::next(pkt_it),
false);
3812 sock->m_snd_flying_pkts_by_seq_num.erase(pkt_it->first);
3813 sock->m_snd_flying_pkts_by_sent_when.erase(pkt_it);
3823 using std::make_pair;
3827 auto& snd_flying_pkts_by_when = sock->m_snd_flying_pkts_by_sent_when;
3830 const auto insert_result =
3832 snd_flying_pkts_by_when.insert(make_pair(seq_num, sent_pkt));
3836 assert(insert_result.second);
3837 assert(insert_result.first == pkt_it);
3839 snd_flying_pkts_updated(sock, pkt_it, boost::next(pkt_it),
true);
3843 const auto insert_result_by_seq =
3845 sock->m_snd_flying_pkts_by_seq_num.insert(make_pair(seq_num, pkt_it));
3848 assert(insert_result_by_seq.second);
3869 get_seq_num_range(pkt_it, 0, &seq_num_end);
3870 if (sock->rexmit_on())
3873 (
"On [" << sock <<
"] pushing packet [" << seq_num <<
", " << seq_num_end <<
") "
3874 "rexmit_id [" <<
int(sent_pkt->m_packet->m_rexmit_id) <<
"] onto snd_flying_pkts and friends.");
3879 (
"On [" << sock <<
"] pushing packet [" << seq_num <<
", " << seq_num_end <<
") "
3880 "onto snd_flying_pkts and friends.");
3891 if (pkt_begin == pkt_end)
3897 const auto& snd_flying_pkts_by_when = sock->m_snd_flying_pkts_by_sent_when;
3898 size_t& snd_flying_bytes = sock->m_snd_flying_bytes;
3902 && (pkt_begin == snd_flying_pkts_by_when.const_newest())
3903 && (pkt_end == snd_flying_pkts_by_when.const_past_oldest()))
3905 snd_flying_bytes = 0;
3909 size_t delta_bytes = 0;
3910 for ( ; pkt_begin != pkt_end; ++pkt_begin)
3912 delta_bytes += pkt_begin->second->m_size;
3914 added ? (snd_flying_bytes += delta_bytes) : (snd_flying_bytes -= delta_bytes);
3918 "In-flight [" << sock->bytes_blocks_str(snd_flying_bytes) <<
"].");
3923 bool defer_delta_check)
3928 get_seq_num_range(pkt_it, &seq_num, &seq_num_end);
3930 const unsigned int rexmit_id = pkt.
m_packet->m_rexmit_id;
3931 FLOW_LOG_TRACE(
"On [" << sock <<
"] attempting to queue for retransmission "
3932 "[" << seq_num <<
", " << seq_num_end <<
"] which has been "
3933 "retransmitted [" << rexmit_id <<
"] times so far.");
3934 if (rexmit_id == sock->opt(sock->m_opts.m_st_max_rexmissions_per_packet))
3936 rst_and_close_connection_immediately(socket_id(sock), sock,
3947 return connect_with_metadata(to, boost::asio::buffer(&S_DEFAULT_CONN_METADATA,
sizeof(S_DEFAULT_CONN_METADATA)),
3952 const boost::asio::const_buffer& serialized_metadata,
3956 namespace bind_ns = util::bind_ns;
3958 bind_ns::cref(to), bind_ns::cref(serialized_metadata), _1, sock_opts);
3961 namespace bind_ns = util::bind_ns;
3964 using bind_ns::bind;
3991 [&]() { connect_worker(to, serialized_metadata, sock_opts, &sock); });
3995 if (sock->m_disconnect_cause)
3997 *err_code = sock->m_disconnect_cause;
4009 using boost::asio::buffer;
4010 using boost::asio::ip::address;
4018 auto& sock = *sock_ptr;
4025 const bool opts_ok = sock_validate_options(*sock_opts, 0, &err_code);
4028 sock.reset(sock_create(*sock_opts));
4033 sock->m_disconnect_cause = err_code;
4050 sock_non_ptr = sock_create(
m_opts.m_dyn_sock_opts);
4052 sock.reset(sock_non_ptr);
4057 sock->m_active_connect =
true;
4058 sock->m_node =
this;
4060 sock->m_remote_endpoint = to;
4062 sock->m_serialized_metadata.assign_copy(serialized_metadata);
4071 sock->m_snd_cong_ctl.reset
4080 bool ip_addr_any_error =
false;
4084 if (addr.to_v4() == util::Ip_address_v4::any())
4086 ip_addr_any_error =
true;
4089 else if (addr.is_v6())
4091 if (addr.to_v6() == util::Ip_address_v6::any())
4093 ip_addr_any_error =
true;
4097 if (ip_addr_any_error)
4100 Error_code* err_code = &sock->m_disconnect_cause;
4108 sock->m_local_port = m_ports.reserve_ephemeral_port(&sock->m_disconnect_cause);
4117 FLOW_LOG_INFO(
"NetFlow worker thread starting active-connect of [" << sock <<
"].");
4126 FLOW_LOG_WARNING(
"Cannot add [" << sock <<
"], because such a connection already exists. "
4127 "This is an ephemeral port collision and "
4128 "constitutes either a bug or an extremely unlikely condition.");
4131 Error_code* err_code = &sock->m_disconnect_cause;
4136 m_ports.return_port(sock->m_local_port, &return_err_code);
4137 assert(!return_err_code);
4148 setup_connection_timers(socket_id, sock,
true);
4153 init_seq_num = m_seq_num_generator.generate_init_seq_num();
4157 init_seq_num.
set_metadata(
'L', init_seq_num + 1, sock->max_block_size());
4159 sock->m_snd_next_seq_num = init_seq_num + 1;
4162 auto syn = create_syn(sock);
4165 if (!async_sock_low_lvl_packet_send_paced(sock,
4167 &sock->m_disconnect_cause))
4173 m_ports.return_port(sock->m_local_port, &return_err_code);
4174 assert(!return_err_code);
4177 cancel_timers(sock);
4185 m_socks[socket_id] = sock;
4194 return sync_connect_with_metadata(to, Fine_duration::max(),
4195 boost::asio::buffer(&S_DEFAULT_CONN_METADATA,
sizeof(S_DEFAULT_CONN_METADATA)),
4196 err_code, sock_opts);
4200 const boost::asio::const_buffer& serialized_metadata,
4203 return sync_connect_with_metadata(to, Fine_duration::max(), serialized_metadata, err_code, opts);
4207 const boost::asio::const_buffer& serialized_metadata,
4210 namespace bind_ns = util::bind_ns;
4212 bind_ns::cref(to), bind_ns::cref(max_wait), bind_ns::cref(serialized_metadata),
4216 using util::bind_ns::bind;
4243 event_set->close(&dummy_prevents_throw);
4246 const auto sock = connect_with_metadata(to, serialized_metadata, err_code, sock_opts);
4260 &dummy_prevents_throw);
4264 result = event_set->sync_wait(max_wait, err_code);
4279 sock->close_abruptly(&dummy_prevents_throw);
4285 const bool ready = event_set->events_detected(err_code);
4308 *err_code = sock->m_disconnect_cause;
4318 sock->close_abruptly(&dummy_prevents_throw);
4327 using boost::chrono::microseconds;
4328 using boost::chrono::duration_cast;
4329 using boost::weak_ptr;
4333 Fine_duration rexmit_from_now = sock->opt(sock->m_opts.m_st_connect_retransmit_period);
4340 ++sock->m_init_rexmit_count;
4350 sock->m_init_rexmit_scheduled_task
4353 sock_observer = weak_ptr<Peer_socket>(sock)]
4356 auto sock = sock_observer.lock();
4359 handle_connection_rexmit_timer_event(socket_id, sock);
4367 sock->m_connection_timeout_scheduled_task
4369 sock->opt(sock->m_opts.m_st_connect_retransmit_timeout),
4370 true, &m_task_engine,
4372 sock_observer = weak_ptr<Peer_socket>(sock)]
4377 auto sock = sock_observer.lock();
4384 FLOW_LOG_INFO(
"Connection handshake timeout timer [" << sock <<
"] has been triggered; was on "
4385 "attempt [" << (sock->m_init_rexmit_count + 1) <<
"].");
4412 FLOW_LOG_INFO(
"Connection handshake retransmit timer [" << sock <<
"] triggered; was on "
4413 "attempt [" << (sock->m_init_rexmit_count + 1) <<
"].");
4424 if (sock->m_active_connect)
4468 sock->m_rcv_delayed_ack_timer.cancel();
4469 sock->m_snd_pacing_data.m_slice_timer.cancel();
4471 if (sock->m_init_rexmit_scheduled_task)
4476 if (sock->m_connection_timeout_scheduled_task)
4481 if (sock->m_rcv_in_rcv_wnd_recovery)
4484 sock->m_rcv_in_rcv_wnd_recovery =
false;
4487 if (sock->m_snd_drop_timer)
4490 sock->m_snd_drop_timer->done();
4494 sock->m_snd_drop_timer.reset();
4500 sock->m_snd_drop_timeout = sock->opt(sock->m_opts.m_st_init_drop_timeout);
4507 const auto on_timer = [
this,
socket_id, sock](
bool drop_all_packets)
4521 const Function<
size_t (
size_t max_data_size)>& snd_buf_feed_func,
4524 using boost::asio::post;
4544 if (sock->m_disconnect_cause)
4578 const size_t sent = snd_buf_feed_func(sock->max_block_size_multiple(sock->m_opts.m_st_snd_buf_max_size));
4581 sock->m_snd_stats.buffer_fed(sock->m_snd_buf.data_size());
4702 if ((!was_deqable) && (sent != 0))
4715 using boost::any_cast;
4763 switch (sock->m_int_state)
4778 "in state [" << sock->m_int_state <<
"] "
4779 "closed before asynchronous send_worker() could proceed.");
4785 "in state [" << sock->m_int_state <<
"] "
4786 "somehow had send() called on it.");
4794 using boost::asio::buffer;
4797 using boost::ratio_string;
4798 using boost::chrono::milliseconds;
4799 using boost::chrono::round;
4800 using boost::shared_ptr;
4842 using Idle_timeout_dto_factor = ratio<110, 100>;
4844 = sock->m_snd_drop_timeout * Idle_timeout_dto_factor::num / Idle_timeout_dto_factor::den;
4845 const Fine_duration since_last_send = Fine_clock::now() - sock->m_snd_last_data_sent_when;
4847 if ((sock->m_snd_last_data_sent_when !=
Fine_time_pt()) && (since_last_send > idle_timeout))
4850 FLOW_LOG_INFO(
"Idle timeout triggered for [" << sock <<
"]; "
4851 "last activity [" << round<milliseconds>(since_last_send) <<
"] ago "
4852 "exceeds idle timeout [" << round<milliseconds>(idle_timeout) <<
"] "
4853 "= " << (ratio_string<Idle_timeout_dto_factor, char>::prefix()) <<
" x "
4854 "[" << round<milliseconds>(sock->m_snd_drop_timeout) <<
"].");
4855 sock->m_snd_cong_ctl->on_idle_timeout();
4856 sock->m_snd_stats.idle_timeout();
4865 "Initial check: can_send() is false.");
4876 const bool rexmit_on = sock->rexmit_on();
4885 "Initial check: can_send() is true, but no data to send.");
4892 list<Peer_socket::Sent_packet::Ptr>& rexmit_q = sock->m_snd_rexmit_q;
4893 size_t& rexmit_q_size = sock->m_snd_rexmit_q_size;
4900 "Initial check: Will send from rexmit queue of size [" << rexmit_q_size <<
"] and/or "
4901 "Send buffer with total size [" << snd_buf.
data_size() <<
"].");
4908 shared_ptr<Data_packet> data;
4910 bool rexmit =
false;
4918 if (rexmit_q.empty())
4923 data = Low_lvl_packet::create_uninit_packet<Data_packet>(
get_logger());
4924 data->m_rexmit_id = 0;
4935 assert(!data->m_data.empty());
4938 data->m_seq_num = snd_next_seq_num;
4962 sent_pkt = rexmit_q.front();
4965 rexmit_q.pop_front();
4968 data = sent_pkt->m_packet;
4971 ++data->m_rexmit_id;
4974 sent_pkt->m_sent_when.push_back(sent_when);
4977 sent_pkt->m_acks_after_me = 0;
5006 sock->m_snd_stats.data_sent(data->m_data.size(), rexmit);
5012 "can_send() == [" <<
can_send(sock) <<
"]; "
5013 "snd_deqable() == [" <<
snd_deqable(sock) <<
"].");
5066 const size_t pipe_taken = sock->m_snd_flying_bytes;
5067 const size_t cong_wnd = sock->m_snd_cong_ctl->congestion_window_bytes();
5068 const size_t& rcv_wnd = sock->m_snd_remote_rcv_wnd;
5070 const size_t pipe_total = min(cong_wnd, rcv_wnd);
5073 = (pipe_taken < pipe_total) && ((pipe_total - pipe_taken) >= sock->max_block_size());
5075 FLOW_LOG_TRACE(
"cong_ctl [" << sock <<
"] info: can_send = [" << can <<
"]; "
5076 "pipe_taken = [" << sock->bytes_blocks_str(pipe_taken) <<
"]; "
5077 "cong_wnd = [" << sock->bytes_blocks_str(cong_wnd) <<
"]; "
5078 "rcv_wnd = [" << sock->bytes_blocks_str(rcv_wnd) <<
"].");
5084 const Function<
size_t ()>& rcv_buf_consume_func,
5087 using boost::asio::post;
5115 const bool no_bytes_available = sock->m_rcv_buf.empty();
5116 const size_t bytes_consumed = rcv_buf_consume_func();
5118 if (bytes_consumed != 0)
5126 "has successfully returned [" << bytes_consumed <<
"] bytes.");
5135 if (sock->m_rcv_buf.empty()
5151 return bytes_consumed;
5160 "has successfully returned no bytes because still not fully connected.");
5182 if (!no_bytes_available)
5186 "has data to return, but the provided buffer size is too small.");
5194 "returning no data because Receive buffer empty.");
5235 using boost::any_cast;
5325 FLOW_LOG_INFO(
'[' << sock <<
"] Receive buffer space freed, "
5326 "but state is now [" << sock->m_int_state <<
"]; ignoring.");
5331 if (sock->m_rcv_in_rcv_wnd_recovery)
5337 FLOW_LOG_TRACE(
'[' << sock <<
"] Receive buffer space freed, but "
5338 "we are already in rcv_wnd recovery mode. Nothing to do.");
5347 const size_t& last_rcv_wnd = sock->m_rcv_last_sent_rcv_wnd;
5349 if (rcv_wnd <= last_rcv_wnd)
5353 FLOW_LOG_TRACE(
'[' << sock <<
"] Receive buffer space freed, but "
5354 "free space [" << sock->bytes_blocks_str(rcv_wnd) <<
"] <= prev "
5355 "free space [" << sock->bytes_blocks_str(last_rcv_wnd) <<
"]. Nothing to do.");
5360 const size_t diff = rcv_wnd - last_rcv_wnd;
5361 const unsigned int pct = sock->opt(sock->m_opts.m_st_rcv_buf_max_size_to_advertise_percent);
5362 const size_t max_rcv_buf_size = sock->max_block_size_multiple(sock->m_opts.m_st_rcv_buf_max_size);
5363 const size_t min_inc = max_rcv_buf_size * pct / 100;
5369 "freed is [" << sock->bytes_blocks_str(diff) <<
"] since last advertisement; "
5370 "< threshold [" << pct <<
"%] x "
5371 "[" << sock->bytes_blocks_str(max_rcv_buf_size) <<
"] = "
5372 "[" << sock->bytes_blocks_str(min_inc) <<
"]. Not advertising rcv_wnd yet.");
5379 "freed is [" << sock->bytes_blocks_str(diff) <<
"] since last advertisement; "
5380 "rcv_wnd = [" << sock->bytes_blocks_str(rcv_wnd) <<
"]; "
5381 ">= threshold [" << pct <<
"%] x "
5382 "[" << sock->bytes_blocks_str(max_rcv_buf_size) <<
"] = "
5383 "[" << sock->bytes_blocks_str(min_inc) <<
"]. Sending unsolicited rcv_wnd-advertising ACK "
5384 "and entering rcv_wnd recovery.");
5387 sock->m_rcv_in_rcv_wnd_recovery =
true;
5389 sock->m_rcv_wnd_recovery_start_time = Fine_clock::now();
5392 sock->m_rcv_stats.rcv_wnd_recovery_event_start();
5399 using boost::chrono::milliseconds;
5400 using boost::chrono::round;
5401 using boost::weak_ptr;
5408 auto ack = Low_lvl_packet::create_uninit_packet<Ack_packet>(
get_logger());
5409 ack->m_rcv_wnd = rcv_wnd;
5411 sock->m_rcv_last_sent_rcv_wnd = rcv_wnd;
5423 sock->m_rcv_stats.sent_low_lvl_ack_packet(
true);
5427 const Fine_duration fire_when_from_now = sock->opt(sock->m_opts.m_dyn_rcv_wnd_recovery_timer_period);
5430 "[" << round<milliseconds>(fire_when_from_now) <<
"] from now.");
5439 sock->m_rcv_wnd_recovery_scheduled_task
5441 [
this, sock_observer = weak_ptr<Peer_socket>(sock)](
bool)
5445 auto sock = sock_observer.lock();
5452 const Fine_duration since_recovery_started = Fine_clock::now() - sock->m_rcv_wnd_recovery_start_time;
5453 if (since_recovery_started > sock->opt(sock->m_opts.m_dyn_rcv_wnd_recovery_max_period))
5458 FLOW_LOG_INFO(
'[' << sock <<
"]: still no new DATA arrived since last rcv_wnd advertisement; "
5459 "Time since entering recovery [" << round<milliseconds>(since_recovery_started) <<
"] expired. "
5460 "Ending rcv_wnd recovery.");
5461 sock->m_rcv_in_rcv_wnd_recovery =
false;
5464 sock->m_rcv_stats.rcv_wnd_recovery_event_finish(
false);
5476 FLOW_LOG_INFO(
'[' << sock <<
"]: still no new DATA arrived since last rcv_wnd advertisement; "
5477 "rcv_wnd = [" << sock->bytes_blocks_str(rcv_wnd) <<
"]; "
5478 "time since entering recovery [" << round<milliseconds>(since_recovery_started) <<
"]. "
5479 "Sending unsolicited rcv_wnd-advertising ACK and continuing rcv_wnd recovery.");
5487 using boost::chrono::milliseconds;
5488 using boost::chrono::round;
5495 if (!sock->m_rcv_in_rcv_wnd_recovery)
5503 FLOW_LOG_INFO(
'[' << sock <<
"]: Canceling rcv_wnd recovery; "
5504 "Time since entering recovery "
5505 "[" << round<milliseconds>(Fine_clock::now() - sock->m_rcv_wnd_recovery_start_time) <<
"].");
5507 sock->m_rcv_in_rcv_wnd_recovery =
false;
5509 const bool canceled =
5515 sock->m_rcv_stats.rcv_wnd_recovery_event_finish(
true);
5520 using std::numeric_limits;
5524 if (!sock->opt(sock->m_opts.m_st_rcv_flow_control_on))
5529 return numeric_limits<size_t>::max();
5534 size_t rcv_buf_size;
5537 rcv_buf_size = sock->m_rcv_buf.data_size();
5541 if (sock->rexmit_on())
5543 rcv_buf_size += sock->m_rcv_reassembly_q_data_size;
5546 const size_t max_rcv_buf_size = sock->max_block_size_multiple(sock->m_opts.m_st_rcv_buf_max_size);
5548 return (max_rcv_buf_size > rcv_buf_size) ? (max_rcv_buf_size - rcv_buf_size) : 0;
5570 "was completely closed before asynchronous "
5571 "receive_emptied_rcv_buf_while_disconnecting() could proceed.");
5589 "is gracefully closing, and Receive buffer is empty, but graceful close itself not yet finished.");
5595 if (!sock->m_rcv_buf.empty())
5600 "is gracefully closing, but Receive buffer has data again.");
5607 "is gracefully closing, and Receive buffer is now empty. Ready to permanently close.");
5617 using boost::adopt_lock;
5661 *err_code = sock->m_disconnect_cause;
5684 const Error_code& err_code,
bool defer_delta_check)
5686 using boost::lexical_cast;
5697 FLOW_LOG_INFO(
"Closing and destroying [" << sock <<
"] abruptly.");
5702 FLOW_LOG_INFO(
"Closing and destroying [" << sock <<
"] after graceful close.");
5722 sock->m_info_on_close.m_disconnect_cause = err_code;
5747 const auto erased = 1 ==
5753 if (!sock->m_active_connect)
5761 Port_to_server_map::const_iterator port_to_server_it =
m_servs.find(sock->m_local_port);
5762 if (port_to_server_it !=
m_servs.end())
5777 if (sock->m_active_connect)
5781 assert(!return_err_code);
5796 if (inserted_rd || inserted_wr)
5804 const Error_code& err_code,
bool defer_delta_check)
5815 auto syn = Low_lvl_packet::create_uninit_packet<Syn_packet>(
get_logger());
5817 syn->m_init_seq_num = sock->m_snd_init_seq_num;
5821 syn->m_serialized_metadata =
static_cast<const Blob&
>(sock->m_serialized_metadata);
5828 auto syn_ack = Low_lvl_packet::create_uninit_packet<Syn_ack_packet>(
get_logger());
5830 syn_ack->m_init_seq_num = sock->m_snd_init_seq_num;
5832 syn_ack->m_packed.m_security_token = sock->m_security_token;
5834 syn_ack->m_packed.m_rcv_wnd = sock->m_rcv_last_sent_rcv_wnd;
5840 boost::shared_ptr<const Syn_ack_packet>& syn_ack)
5843 auto syn_ack_ack = Low_lvl_packet::create_uninit_packet<Syn_ack_ack_packet>(
get_logger());
5846 syn_ack_ack->m_packed.m_security_token = syn_ack->m_packed.m_security_token;
5848 syn_ack_ack->m_packed.m_rcv_wnd = sock->m_rcv_last_sent_rcv_wnd =
sock_rcv_wnd(sock);
5859 using boost::chrono::milliseconds;
5860 using boost::chrono::duration_cast;
5861 using std::make_pair;
5863 using std::numeric_limits;
5870 vector<Peer_socket::Individual_ack::Ptr>& pending_acks = sock->m_rcv_pending_acks;
5872 if (sys_err_code == boost::asio::error::operation_aborted)
5875 "pending acknowledgment count [" << pending_acks.size() <<
"].");
5880 FLOW_LOG_TRACE(
"Delayed [ACK] timer [" << sock <<
"] triggered, or ACK forced; "
5881 "pending acknowledgment count [" << pending_acks.size() <<
"].");
5894 FLOW_LOG_TRACE(
"Delayed [ACK] timer [" << sock <<
"] triggered, "
5895 "but socket already in inapplicable state [" << sock->m_int_state <<
"]. Ignoring.");
5900 if (pending_acks.empty())
5905 "but socket has no pending acknowledgments. This is likely an internal bug. Ignoring.");
5932 const size_t& rcv_wnd = sock->m_rcv_last_sent_rcv_wnd =
sock_rcv_wnd(sock);
5934 auto ack = Low_lvl_packet::create_uninit_packet<Ack_packet>(
get_logger());
5935 ack->m_rcv_wnd = rcv_wnd;
5940 if (sock->rexmit_on())
5962 sock->m_rcv_stats.sent_low_lvl_ack_packet(
false);
5965 ack = Low_lvl_packet::create_uninit_packet<Ack_packet>(
get_logger());
5966 ack->m_rcv_wnd = rcv_wnd;
5994 if (delay.count() < 0)
5999 "delay for packet [" << seq_num <<
", ...) is "
6000 "negative: [" << delay <<
"]; using zero.");
6001 delay = Fine_duration::zero();
6016 if (uint64_t(pkt_delay.count()) > uint64_t(MAX_DELAY_VALUE))
6021 "delay for packet [" << seq_num <<
", ...) is [" << pkt_delay <<
"]; overflow; "
6022 "using max value [" << MAX_DELAY_VALUE <<
"] units.");
6028 if (sock->rexmit_on())
6030 ack->m_rcv_acked_packets_rexmit_on_out.push_back
6032 ind_ack->m_rexmit_id,
6037 ack->m_rcv_acked_packets_rexmit_off_out.push_back
6041 size_est_so_far += size_est_inc;
6044 sock->m_rcv_stats.sent_individual_ack();
6048 if ((size_est_so_far != 0)
6051 defer_delta_check)))
6057 sock->m_rcv_stats.sent_low_lvl_ack_packet(
false);
6060 pending_acks.clear();
6063 sock->m_rcv_stats.current_pending_to_ack_packets(0);
6071 return Socket_id{ sock->remote_endpoint(), sock->local_port() };
6077 return !(sock->m_snd_rexmit_q.empty() && sock->m_snd_buf.empty());
6093 return sock->m_snd_buf.data_size() + sock->max_block_size()
6094 <= sock->opt(sock->m_opts.m_st_snd_buf_max_size);
6100 return !sock->m_rcv_buf.empty();
6108 sock->m_int_state <<
"] to [" << new_state <<
"].");
6109 sock->m_int_state = new_state;
6118 sock->m_state = state;
6121 sock->m_open_sub_state = open_sub_state;
6138 sock->m_disconnect_cause = disconnect_cause;
6160 assert(sock->m_disconnect_cause);
6170 sock->m_rcv_buf.clear();
6171 sock->m_snd_buf.clear();
6172 sock->m_rcv_packets_with_gaps.clear();
6173 sock->m_rcv_reassembly_q_data_size = 0;
6174 sock->m_snd_flying_pkts_by_sent_when.clear();
6175 sock->m_snd_flying_pkts_by_seq_num.clear();
6176 sock->m_snd_rexmit_q.clear();
6177 sock->m_serialized_metadata.make_zero();
6178 sock->m_rcv_syn_rcvd_data_q.clear();
6179 sock->m_rcv_pending_acks.clear();
6180 sock->m_rcv_acked_packets.clear();
6181 sock->m_snd_pacing_data.m_packet_q.clear();
6186 sock->m_snd_cong_ctl.reset();
6188 sock->m_snd_bandwidth_estimator.reset();
6221 sock->m_opts = opts;
6231#define VALIDATE_STATIC_OPTION(ARG_opt) \
6232 validate_static_option(opts.ARG_opt, prev_opts->ARG_opt, #ARG_opt, err_code)
6233#define VALIDATE_CHECK(ARG_check) \
6234 validate_option_check(ARG_check, #ARG_check, err_code)
6256 using boost::chrono::seconds;
6257 using std::numeric_limits;
6268 const bool static_ok
6269 = VALIDATE_STATIC_OPTION(m_st_max_block_size) &&
6270 VALIDATE_STATIC_OPTION(m_st_connect_retransmit_period) &&
6271 VALIDATE_STATIC_OPTION(m_st_connect_retransmit_timeout) &&
6272 VALIDATE_STATIC_OPTION(m_st_snd_buf_max_size) &&
6273 VALIDATE_STATIC_OPTION(m_st_rcv_buf_max_size) &&
6274 VALIDATE_STATIC_OPTION(m_st_rcv_flow_control_on) &&
6275 VALIDATE_STATIC_OPTION(m_st_rcv_buf_max_size_slack_percent) &&
6276 VALIDATE_STATIC_OPTION(m_st_rcv_buf_max_size_to_advertise_percent) &&
6277 VALIDATE_STATIC_OPTION(m_st_rcv_max_packets_after_unrecvd_packet_ratio_percent) &&
6278 VALIDATE_STATIC_OPTION(m_st_delayed_ack_timer_period) &&
6279 VALIDATE_STATIC_OPTION(m_st_max_full_blocks_before_ack_send) &&
6280 VALIDATE_STATIC_OPTION(m_st_rexmit_on) &&
6281 VALIDATE_STATIC_OPTION(m_st_max_rexmissions_per_packet) &&
6282 VALIDATE_STATIC_OPTION(m_st_init_drop_timeout) &&
6283 VALIDATE_STATIC_OPTION(m_st_snd_pacing_enabled) &&
6284 VALIDATE_STATIC_OPTION(m_st_snd_bandwidth_est_sample_period_floor) &&
6285 VALIDATE_STATIC_OPTION(m_st_cong_ctl_strategy) &&
6286 VALIDATE_STATIC_OPTION(m_st_cong_ctl_init_cong_wnd_blocks) &&
6287 VALIDATE_STATIC_OPTION(m_st_cong_ctl_max_cong_wnd_blocks) &&
6288 VALIDATE_STATIC_OPTION(m_st_cong_ctl_cong_wnd_on_drop_timeout_blocks) &&
6289 VALIDATE_STATIC_OPTION(m_st_cong_ctl_classic_wnd_decay_percent) &&
6290 VALIDATE_STATIC_OPTION(m_st_drop_packet_exactly_after_drop_timeout) &&
6291 VALIDATE_STATIC_OPTION(m_st_drop_all_on_drop_timeout) &&
6292 VALIDATE_STATIC_OPTION(m_st_out_of_order_ack_restarts_drop_timer);
6303 const bool checks_ok
6334#undef VALIDATE_CHECK
6335#undef VALIDATE_STATIC_OPTION
6342 using boost::adopt_lock;
6384 using boost::lexical_cast;
6389 stats->
m_rcv = sock->m_rcv_stats.stats();
6390 stats->
m_snd = sock->m_snd_stats.stats();
6412 = sock->m_rcv_syn_rcvd_data_q.empty() ? 0 : sock->m_rcv_syn_rcvd_data_cumulative_size;
6428 = util::to_mbit_per_sec<Send_bandwidth_estimator::Time_unit>
6429 (sock->m_snd_bandwidth_estimator->bandwidth_bytes_per_time());
6446 FLOW_LOG_INFO(
"[=== Socket state for [" << sock <<
"]. ===\n" << stats);
6453 FLOW_LOG_INFO(
"=== Socket state for [" << sock <<
"]. ===]");
6475 *seq_num += data_size;
6478template<
typename Packet_map_iter>
6485 *seq_num_start = seq_num_start_cref;
6489 *seq_num_end = seq_num_start_cref;
6497 return ++sock->m_snd_last_order_num;
6503 return sock_create_forward_plus_ctor_args<Peer_socket>(opts);
6513 <<
"NetFlow_socket "
6515 "@" <<
static_cast<const void*
>(sock))
6516 : (os <<
"NetFlow_socket@null");
6524#define STATE_TO_CASE_STATEMENT(ARG_state) \
6525 case Peer_socket::Int_state::S_##ARG_state: \
6526 return os << #ARG_state
6535 STATE_TO_CASE_STATEMENT(CLOSED);
6536 STATE_TO_CASE_STATEMENT(SYN_SENT);
6537 STATE_TO_CASE_STATEMENT(SYN_RCVD);
6538 STATE_TO_CASE_STATEMENT(ESTABLISHED);
6541#undef STATE_TO_CASE_STATEMENT
const Component & get_log_component() const
Returns reference to the stored Component object, particularly as many FLOW_LOG_*() macros expect.
Logger * get_logger() const
Returns the stored Logger pointer, particularly as many FLOW_LOG_*() macros expect.
Interface that the user should implement, passing the implementing Logger into logging classes (Flow'...
static Congestion_control_strategy * create_strategy(Strategy_choice strategy_choice, log::Logger *logger_ptr, Peer_socket::Const_ptr sock)
Factory method that, given an enum identifying the desired strategy, allocates the appropriate Conges...
static Ptr create_drop_timer(log::Logger *logger_ptr, util::Task_engine *node_task_engine, Fine_duration *sock_drop_timeout, Peer_socket::Const_ptr &&sock, const Function< void(const Error_code &err_code)> &timer_failure, const Function< void(bool drop_all_packets)> &timer_fired)
Constructs Drop_timer and returns a ref-counted pointer wrapping it.
@ S_PEER_SOCKET_WRITABLE
Event type specifying the condition of interest wherein a target Peer_socket sock is such that callin...
@ S_PEER_SOCKET_READABLE
Event type specifying the condition of interest wherein a target Peer_socket sock is such that callin...
An object of this class is a single Flow-protocol networking node, in the sense that: (1) it has a di...
void snd_flying_pkts_updated(Peer_socket::Ptr sock, Peer_socket::Sent_pkt_ordered_by_when_const_iter pkt_begin, const Peer_socket::Sent_pkt_ordered_by_when_const_iter &pkt_end, bool added)
Updates Peer_socket::m_snd_flying_bytes according to an operation (add packets, remove packets) calle...
bool categorize_individual_ack(const Socket_id &socket_id, Peer_socket::Ptr sock, Ack_packet::Individual_ack::Const_ptr ack, bool *dupe_or_late, Peer_socket::Sent_pkt_ordered_by_when_iter *acked_pkt_it)
Helper of perform_accumulated_on_recv_tasks() that categorizes the given accumulated individual ackno...
void handle_data_to_established(const Socket_id &socket_id, Peer_socket::Ptr sock, boost::shared_ptr< Data_packet > packet, bool syn_rcvd_qd_packet)
Handles a just-deserialized, just-demultiplexed, low-level DATA packet delivered to the given peer so...
bool sock_is_writable(const boost::any &sock_as_any) const
Returns true if and only if calling sock->send() with at least some arguments would return either non...
Peer_socket_info sock_info(Peer_socket::Const_ptr sock)
Implementation of sock->info() for socket sock in all cases except when sock->state() == Peer_socket:...
void receive_wnd_updated(Peer_socket::Ptr sock)
Placed by receive() onto W if it has dequeued data from Receive buffer and given it to the user,...
void sock_track_new_data_after_gap_rexmit_off(Peer_socket::Ptr sock, boost::shared_ptr< const Data_packet > packet, size_t data_size, bool *slide, size_t *slide_size)
Helper for handle_data_to_established() that aims to register the given DATA packet as an out-of-orde...
bool sock_data_to_reassembly_q_unless_overflow(Peer_socket::Ptr sock, boost::shared_ptr< Data_packet > packet)
Helper for handle_data_to_established() that aims to register the given DATA packet as an out-of-orde...
static bool ensure_sock_open(Socket_ptr sock, Error_code *err_code)
Helper method that checks whether the given Peer_socket or Server_socket is CLOSED; if so,...
void send_worker(Peer_socket::Ptr sock, bool defer_delta_check)
Thread W implemention of send(): synchronously or asynchronously send the contents of sock->m_snd_buf...
void handle_accumulated_acks(const Socket_id &socket_id, Peer_socket::Ptr sock)
Helper of perform_accumulated_on_recv_tasks() that handles any incoming acknowledgments and rcv_wnd u...
void async_rcv_wnd_recovery(Peer_socket::Ptr sock, size_t rcv_wnd)
receive_wnd_updated() helper that continues rcv_wnd recovery: that is, sends unsolicited ACK with a r...
void log_accumulated_acks(Peer_socket::Const_ptr sock) const
Helper of handle_accumulated_acks() that logs the about-to-be-handled accumulated individual acknowle...
void sock_free_memory(Peer_socket::Ptr sock)
Helper that clears all non-O(1)-space data structures stored inside sock.
void sock_load_info_struct(Peer_socket::Const_ptr sock, Peer_socket_info *stats) const
Given a Peer_socket, copies all stats info (as available via Peer_socket::info()) from various struct...
void log_snd_window(Peer_socket::Const_ptr sock, bool force_verbose_info_logging=false) const
Logs TRACE or DATA messages thats show the detailed state of the sending sequence number space.
void send_worker_check_state(Peer_socket::Ptr sock)
Helper placed by send() onto W to invoke send_worker() but ensures that the socket has not entered so...
size_t m_low_lvl_max_buf_size
OS-reported m_low_lvl_sock UDP receive buffer maximum size, obtained right after we OS-set that setti...
Non_blocking_func_ret_type sync_op(typename Socket::Ptr sock, const Function< Non_blocking_func_ret_type()> &non_blocking_func, Non_blocking_func_ret_type would_block_ret_val, Event_set::Event_type ev_type, const Fine_time_pt &wait_until, Error_code *err_code)
Implementation of core blocking transfer methods, namely Peer_socket::sync_send(),...
size_t sock_max_packets_after_unrecvd_packet(Peer_socket::Const_ptr sock) const
Computes and returns the max size for Peer_socket::m_rcv_packets_with_gaps for sock.
Peer_socket::Sent_pkt_ordered_by_when_iter categorize_pkts_as_dropped_on_acks(Peer_socket::Ptr sock, const boost::unordered_set< Peer_socket::order_num_t > &flying_now_acked_pkts)
Helper of perform_accumulated_on_recv_tasks() that determines the range of In-flight packets that sho...
void rcv_get_first_gap_info(Peer_socket::Const_ptr sock, bool *first_gap_exists, Sequence_number *seq_num_after_first_gap)
Helper for handle_data_to_established() that gets simple info about Peer_socket::m_rcv_packets_with_g...
bool snd_deqable(Peer_socket::Const_ptr sock) const
Return true if and only if there are enough data either in Peer_socket::m_snd_rexmit_q of sock (if re...
void cancel_timers(Peer_socket::Ptr sock)
Cancel any timers and scheduled tasks active in the given socket.
void sock_rcv_buf_now_readable(Peer_socket::Ptr sock, bool syn_rcvd_qd_packet)
Helper for handle_data_to_established() that assumes the given's socket Receive buffer is currently r...
void snd_flying_pkts_erase_one(Peer_socket::Ptr sock, Peer_socket::Sent_pkt_ordered_by_when_iter pkt_it)
Erases (for example if considered Acknowledged or Dropped) a packet struct from the "scoreboard" (Pee...
Opt_type opt(const Opt_type &opt_val_ref) const
Obtain a copy of the value of a given option in a thread-safe manner.
bool sock_validate_options(const Peer_socket_options &opts, const Peer_socket_options *prev_opts, Error_code *err_code) const
Analogous to validate_options() but checks per-socket options instead of per-Node options.
void handle_accumulated_pending_acks(const Socket_id &socket_id, Peer_socket::Ptr sock)
Helper of perform_accumulated_on_recv_tasks() that handles any additional individual outgoing acknowl...
void receive_wnd_recovery_data_received(Peer_socket::Ptr sock)
Pertaining to the async_rcv_wnd_recovery() mechanism, this handles the event that we have received an...
static Peer_socket::order_num_t sock_get_new_snd_order_num(Peer_socket::Ptr sock)
Returns the "order number" to use for Peer_socket::Sent_packet::Sent_when structure corresponding to ...
Peer_socket::Ptr sync_connect_impl(const Remote_endpoint &to, const Fine_duration &max_wait, const boost::asio::const_buffer &serialized_metadata, Error_code *err_code, const Peer_socket_options *opts)
Implementation core of sync_connect*() that gets rid of templated or missing arguments thereof.
size_t max_block_size() const
The maximum number of bytes of user data per received or sent block on connections generated from thi...
void snd_flying_pkts_push_one(Peer_socket::Ptr sock, const Sequence_number &seq_num, Peer_socket::Sent_packet::Ptr sent_pkt)
Adds a new packet struct (presumably representing packet to be sent shortly) to the "scoreboard" (Pee...
Syn_packet::Ptr create_syn(Peer_socket::Const_ptr sock)
Helper that creates a new SYN packet object to the extent that is suitable for immediately passing to...
void close_abruptly(Peer_socket::Ptr sock, Error_code *err_code)
Implementation of non-blocking sock->close_abruptly() for socket sock in all cases except when sock->...
static void get_seq_num_range(const Packet_map_iter &packet_it, Sequence_number *seq_num_start, Sequence_number *seq_num_end)
Given an iterator into a Peer_socket::Sent_pkt_by_sent_when_map or Peer_socket::Recv_pkt_map,...
Peer_socket::Ptr sync_connect_with_metadata(const Remote_endpoint &to, const boost::chrono::duration< Rep, Period > &max_wait, const boost::asio::const_buffer &serialized_metadata, Error_code *err_code=0, const Peer_socket_options *opts=0)
A combination of sync_connect() and connect_with_metadata() (blocking connect, with supplied metadata...
Syn_ack_packet::Ptr create_syn_ack(Peer_socket::Const_ptr sock)
Like create_syn() but for SYN_ACK.
virtual Peer_socket * sock_create(const Peer_socket_options &opts)
Internal factory used for ALL Peer_socket objects created by this Node (including subclasses).
bool snd_buf_enqable(Peer_socket::Const_ptr sock) const
Return true if and only if there is enough free space in Peer_socket::m_snd_buf of sock to enqueue an...
bool can_send(Peer_socket::Const_ptr sock) const
Answers the perennial question of congestion and flow control: assuming there is a DATA packet to sen...
void sock_slide_rcv_next_seq_num(Peer_socket::Ptr sock, size_t slide_size, bool reassembly_in_progress)
Helper for handle_data_to_established() that aims to register a set of received DATA packet data as i...
void sock_log_detail(Peer_socket::Const_ptr sock) const
Logs a verbose state report for the given socket.
static void advance_seq_num(Sequence_number *seq_num, boost::shared_ptr< const Data_packet > data)
Assuming *seq_num points to the start of data.m_data, increments *seq_num to point to the datum just ...
void async_low_lvl_ack_send(Peer_socket::Ptr sock, bool defer_delta_check, const Error_code &sys_err_code=Error_code())
Sends a low-level ACK packet, with all accumulated in Peer_socket::m_rcv_pending_acks of sock individ...
static Sequence_number snd_past_last_flying_datum_seq_num(Peer_socket::Const_ptr sock)
Obtain the sequence number for the datum just past the last (latest) In-flight (i....
Peer_socket::Ptr connect(const Remote_endpoint &to, Error_code *err_code=0, const Peer_socket_options *opts=0)
Initiates an active connect to the specified remote Flow server.
void event_set_all_check_delta(bool defer_delta_check)
For each WAITING Event_set within the Node: checks for any events that hold, and if any do hold,...
void serv_peer_socket_closed(Server_socket::Ptr serv, Peer_socket::Ptr sock)
Records that a Server_socket-contained (i.e., currently un-established, or established but not yet ac...
bool rcv_buf_deqable(Peer_socket::Const_ptr sock) const
Return true if and only if there are enough data in Peer_socket::m_rcv_buf of sock to give the user s...
void async_acknowledge_packet(Peer_socket::Ptr sock, const Sequence_number &seq_num, unsigned int rexmit_id, size_t data_size)
Causes an acknowledgment of the given received packet to be included in a future Ack_packet sent to t...
Socket_id_to_socket_map m_socks
The peer-to-peer connections this Node is currently tracking.
Peer_socket::Options_lock Options_lock
Short-hand for lock that acquires exclusive access to an Options_mutex.
static Socket_id socket_id(Peer_socket::Const_ptr sock)
Constructs the socket pair (connection ID) for the given socket.
void handle_syn_ack_to_syn_sent(const Socket_id &socket_id, Peer_socket::Ptr sock, boost::shared_ptr< const Syn_ack_packet > syn_ack)
Handles a just-deserialized, just-demultiplexed low-level SYN_ACK packet delivered to the given peer ...
size_t send(Peer_socket::Ptr sock, const Function< size_t(size_t max_data_size)> &snd_buf_feed_func, Error_code *err_code)
Implementation of non-blocking sock->send() for socket sock in all cases except when sock->state() ==...
void sock_set_int_state(Peer_socket::Ptr sock, Peer_socket::Int_state new_state)
Sets internal state of given socket to the given state and logs a TRACE message about it.
bool sock_is_readable(const boost::any &sock_as_any) const
Returns true if and only if calling sock->receive() with at least some arguments would return either ...
bool async_sock_low_lvl_packet_send_or_close_immediately(const Peer_socket::Ptr &sock, Low_lvl_packet::Ptr &&packet, bool defer_delta_check)
Similar to async_sock_low_lvl_packet_send_paced() except it also calls close_connection_immediately(s...
bool sock_data_to_rcv_buf_unless_overflow(Peer_socket::Ptr sock, boost::shared_ptr< Data_packet > packet)
Helper for handle_data_to_established() that aims to pass the payload of the given DATA packet to the...
bool sock_set_options(Peer_socket::Ptr sock, const Peer_socket_options &opts, Error_code *err_code)
Thread W implementation of sock->set_options().
bool running() const
Returns true if and only if the Node is operating.
Port_to_server_map m_servs
The server sockets this Node is currently tracking.
Event_set::Ev_type_to_socks_map m_sock_events
All sockets that have been detected to be "ready" (by the Event_set doc header definition) at any poi...
static const uint8_t S_DEFAULT_CONN_METADATA
Type and value to supply as user-supplied metadata in SYN, if user chooses to use [[a]sync_]connect()...
void setup_drop_timer(const Socket_id &socket_id, Peer_socket::Ptr sock)
Creates a new Drop Timer and saves it to sock->m_snd_drop_timer.
void handle_ack_to_established(Peer_socket::Ptr sock, boost::shared_ptr< const Ack_packet > ack)
Handles a just-deserialized, just-demultiplexed, low-level ACK packet delivered to the given peer soc...
Peer_socket::Ptr sync_connect(const Remote_endpoint &to, const boost::chrono::duration< Rep, Period > &max_wait, Error_code *err_code=0, const Peer_socket_options *opts=0)
The blocking (synchronous) version of connect().
void handle_syn_ack_to_established(Peer_socket::Ptr sock, boost::shared_ptr< const Syn_ack_packet > syn_ack)
Handles a just-deserialized, just-demultiplexed, duplicate (equal to already-received SYN_ACK) low-le...
void setup_connection_timers(const Socket_id &socket_id, Peer_socket::Ptr sock, bool initial)
Assuming we've just sent SYN or SYN_ACK, sets up an asynchronous scheduled task to fire within some a...
void log_rcv_window(Peer_socket::Const_ptr sock, bool force_verbose_info_logging=false) const
Logs TRACE or DATA messages that show the detailed state of the receiving sequence number space.
size_t sock_rcv_wnd(Peer_socket::Const_ptr sock) const
Computes and returns the currently correct rcv_wnd value; that is the amount of space free in Receive...
void connect_worker(const Remote_endpoint &to, const boost::asio::const_buffer &serialized_metadata, const Peer_socket_options *opts, Peer_socket::Ptr *sock)
Thread W implementation of connect().
bool drop_pkts_on_acks(Peer_socket::Ptr sock, const Peer_socket::Sent_pkt_ordered_by_when_iter &last_dropped_pkt_it, size_t *cong_ctl_dropped_pkts, size_t *cong_ctl_dropped_bytes, size_t *dropped_pkts, size_t *dropped_bytes, std::vector< Peer_socket::order_num_t > *pkts_marked_to_drop)
Helper of perform_accumulated_on_recv_tasks() that acts on the determination made by categorize_pkts_...
static const Peer_socket::Sent_packet::ack_count_t S_MAX_LATER_ACKS_BEFORE_CONSIDERING_DROPPED
For a given unacknowledged sent packet P, the maximum number of times any individual packet with high...
bool async_low_lvl_syn_ack_ack_send_or_close_immediately(const Peer_socket::Ptr &sock, boost::shared_ptr< const Syn_ack_packet > &syn_ack)
Helper to create, fully fill out, and asynchronously send via async_sock_low_lvl_packet_send_or_close...
Error_code sock_categorize_data_to_established(Peer_socket::Ptr sock, boost::shared_ptr< const Data_packet > packet, bool *dupe, bool *slide, size_t *slide_size)
Helper for handle_data_to_established() that categorizes the DATA packet received as either illegal; ...
void async_sock_low_lvl_rst_send(Peer_socket::Ptr sock)
Sends an RST to the other side of the given socket asynchronously when possible.
void sock_set_state(Peer_socket::Ptr sock, Peer_socket::State state, Peer_socket::Open_sub_state open_sub_state=Peer_socket::Open_sub_state::S_CONNECTED)
Sets Peer_socket::m_state and Peer_socket::m_open_sub_state.
void receive_emptied_rcv_buf_while_disconnecting(Peer_socket::Ptr sock)
Placed by receive() onto W during a graceful close, after the Receive buffer had been emptied by the ...
void sock_disconnect_detected(Peer_socket::Ptr sock, const Error_code &disconnect_cause, bool close)
Records that thread W shows underlying connection is broken (graceful termination,...
size_t receive(Peer_socket::Ptr sock, const Function< size_t()> &rcv_buf_consume_func, Error_code *err_code)
Implementation of non-blocking sock->receive() for socket sock in all cases except when sock->state()...
void handle_connection_rexmit_timer_event(const Socket_id &socket_id, Peer_socket::Ptr sock)
Handles the triggering of the retransmit timer wait set up by setup_connection_timers(); it will re-s...
Node_options m_opts
This Node's global set of options.
void close_connection_immediately(const Socket_id &socket_id, Peer_socket::Ptr sock, const Error_code &err_code, bool defer_delta_check)
A thread W method that handles the transition of the given socket from OPEN (any sub-state) to CLOSED...
void sock_disconnect_completed(Peer_socket::Ptr sock)
While in S_OPEN+S_DISCONNECTING state (i.e., after beginning a graceful close with sock_disconnect_de...
Fine_duration compute_rtt_on_ack(Peer_socket::Sent_packet::Const_ptr flying_pkt, const Fine_time_pt &time_now, Ack_packet::Individual_ack::Const_ptr ack, const Peer_socket::Sent_packet::Sent_when **sent_when) const
Helper of perform_accumulated_on_recv_tasks() that computes the RTT implied by a given individual ack...
Peer_socket::Ptr connect_with_metadata(const Remote_endpoint &to, const boost::asio::const_buffer &serialized_metadata, Error_code *err_code=0, const Peer_socket_options *opts=0)
Same as connect() but sends, as part of the connection handshake, the user-supplied metadata,...
void new_round_trip_time_sample(Peer_socket::Ptr sock, Fine_duration round_trip_time)
Handles a just-computed new RTT (round trip time) measurement for an individual packet earlier sent: ...
bool ok_to_rexmit_or_close(Peer_socket::Ptr sock, const Peer_socket::Sent_pkt_ordered_by_when_iter &pkt_it, bool defer_delta_check)
Checks whether the given sent packet has been retransmitted the maximum number of allowed times; if s...
util::Task_engine m_task_engine
The main loop engine, functioning in the single-threaded-but-asynchronous callback-based "reactor" st...
Port_space m_ports
Flow port space for both client and server sockets. All threads may access this.
void rst_and_close_connection_immediately(const Socket_id &socket_id, Peer_socket::Ptr sock, const Error_code &err_code, bool defer_delta_check)
Asynchronously send RST to the other side of the given socket and close_connection_immediately().
void drop_timer_action(Peer_socket::Ptr sock, bool drop_all_packets)
Handles a Drop_timer (Peer_socket::m_snd_drop_timer) event in ESTABLISHED state by dropping the speci...
A class that keeps a Peer_socket_receive_stats data store, includes methods to conveniently accumulat...
void good_data_accepted_packet(size_t data)
Indicates good_data_packet(), and these data are not dropped (so either delivered into Receive buffer...
void good_data_dropped_reassembly_q_overflow_packet(size_t data)
Indicates good_data_packet(), but these data are dropped due to insufficient Receive reassembly queue...
void presumed_dropped_data(size_t data)
Indicates that one or more unreceived data packets have been considered Dropped due to the number of ...
void good_data_delivered_packet(size_t data)
Indicates good_data_accepted_packet(), and these data are delivered into Receive buffer (either immed...
void late_or_dupe_to_send_ack_packet(size_t data)
Indicates that late_or_dupe_data_packet() and therefore an individual acknowledgment for this packet ...
void total_data_packet(size_t data)
Indicates one DATA packet has been received on socket.
void good_to_send_ack_packet(size_t data)
Indicates that good_data_delivered_packet() and therefore an individual acknowledgment for this packe...
void good_data_packet(size_t data)
Indicates total_data_packet(), and these data are new and acceptable into Receive buffer assuming the...
void error_data_packet(size_t data)
Indicates total_data_packet(), but there is some error about the sequence numbers so that they are no...
void buffer_fed(size_t size)
Indicates the Receive buffer was enqueued with data from network (so its data_size() increased).
void good_data_first_qd_packet(size_t data)
Indicates good_data_accepted_packet(), and these data are, upon receipt, queued for reassembly (not i...
void good_data_dropped_buf_overflow_packet(size_t data)
Indicates good_data_packet(), but these data are dropped due to insufficient Receive buffer space.
void late_or_dupe_data_packet(size_t data)
Indicates total_data_packet(), but the arrived data have either already been received before or (more...
A peer (non-server) socket operating over the Flow network protocol, with optional stream-of-bytes an...
State m_state
See state().
size_t get_connect_metadata(const boost::asio::mutable_buffer &buffer, Error_code *err_code=0) const
Obtains the serialized connect metadata, as supplied by the user during the connection handshake.
size_t max_block_size_multiple(const size_t &opt_val_ref, const unsigned int *inflate_pct_val_ptr=0) const
Returns the smallest multiple of max_block_size() that is >= the given option value,...
bool sync_send_reactor_pattern_impl(const Fine_time_pt &wait_until, Error_code *err_code)
Helper similar to sync_send_impl() but for the null_buffers versions of sync_send().
std::map< Sequence_number, Sent_pkt_ordered_by_when_iter > Sent_pkt_by_seq_num_map
Short-hand for m_snd_flying_pkts_by_seq_num type; see that data member.
bool sync_receive_reactor_pattern_impl(const Fine_time_pt &wait_until, Error_code *err_code)
Helper similar to sync_receive_impl() but for the null_buffers versions of sync_receive().
Remote_endpoint m_remote_endpoint
See remote_endpoint(). Should be set before user gets access to *this and not changed afterwards.
size_t sync_receive(const Mutable_buffer_sequence &target, const boost::chrono::duration< Rep, Period > &max_wait, Error_code *err_code=0)
Blocking (synchronous) version of receive().
util::Blob m_serialized_metadata
If !m_active_connect, this contains the serialized metadata that the user supplied on the other side ...
size_t node_sync_send(const Function< size_t(size_t max_data_size)> &snd_buf_feed_func_or_empty, const Fine_time_pt &wait_until, Error_code *err_code)
This is to sync_send() as node_send() is to send().
Error_code m_disconnect_cause
The Error_code causing disconnection (if one has occurred or is occurring) on this socket; otherwise ...
Peer_socket(log::Logger *logger_ptr, util::Task_engine *task_engine, const Peer_socket_options &opts)
Constructs object; initializes most values to well-defined (0, empty, etc.) but not necessarily meani...
Sequence_number m_rcv_init_seq_num
The Initial Sequence Number (ISN) contained in the original Syn_packet or Syn_ack_packet we received.
const Remote_endpoint & remote_endpoint() const
Intended other side of the connection (regardless of success, failure, or current State).
State
State of a Peer_socket.
@ S_OPEN
Future reads or writes may be possible. A socket in this state may be Writable or Readable.
@ S_CLOSED
Neither future reads nor writes are possible, AND Node has disowned the Peer_socket.
Open_sub_state
The sub-state of a Peer_socket when state is State::S_OPEN.
@ S_CONNECTED
This Peer_socket was created through a passive connect (Node::accept() and the like) or an active con...
@ S_CONNECTING
This Peer_socket was created through an active connect (Node::connect() and the like),...
@ S_DISCONNECTING
This Peer_socket was created through a passive connect (Node::accept() and the like) or an active con...
size_t sync_send(const Const_buffer_sequence &data, const boost::chrono::duration< Rep, Period > &max_wait, Error_code *err_code=0)
Blocking (synchronous) version of send().
~Peer_socket() override
Boring virtual destructor. Note that deletion is to be handled exclusively via shared_ptr,...
Error_code disconnect_cause() const
The error code that perviously caused state() to become State::S_CLOSED, or success code if state is ...
Sequence_number::seq_num_t order_num_t
Short-hand for order number type. 0 is reserved. Caution: Keep in sync with Drop_timer::packet_id_t.
flow_port_t local_port() const
The local Flow-protocol port chosen by the Node (if active or passive open) or user (if passive open)...
flow_port_t m_local_port
See local_port(). Should be set before user gets access to *this and not changed afterwards.
friend class Send_bandwidth_estimator
Stats modules have const access to all socket internals.
bool set_options(const Peer_socket_options &opts, Error_code *err_code=0)
Dynamically replaces the current options set (options()) with the given options set.
size_t node_send(const Function< size_t(size_t max_data_size)> &snd_buf_feed_func, Error_code *err_code)
Non-template helper for template send() that forwards the send() logic to Node::send().
bool rexmit_on() const
Whether retransmission is enabled on this connection.
size_t node_sync_receive(const Function< size_t()> &rcv_buf_consume_func_or_empty, const Fine_time_pt &wait_until, Error_code *err_code)
This is to sync_receive() as node_receive() is to receive().
util::Lock_guard< Mutex > Lock_guard
Short-hand for RAII lock guard of Mutex.
Int_state
The state of the socket (and the connection from this end's point of view) for the internal state mac...
@ S_ESTABLISHED
Public state is OPEN+CONNECTED; in our opinion the connection is established.
@ S_SYN_SENT
Public state is OPEN+CONNECTING; user requested active connect; we sent SYN and are awaiting response...
@ S_CLOSED
Closed (dead or new) socket.
@ S_SYN_RCVD
Public state is OPEN+CONNECTING; other side requested passive connect via SYN; we sent SYN_ACK and ar...
util::Lock_guard< Options_mutex > Options_lock
Short-hand for lock that acquires exclusive access to an Options_mutex.
void close_abruptly(Error_code *err_code=0)
Acts as if fatal error error::Code::S_USER_CLOSED_ABRUPTLY has been discovered on the connection.
size_t max_block_size() const
The maximum number of bytes of user data per received or sent packet on this connection.
Node * node() const
Node that produced this Peer_socket.
Peer_socket_info info() const
Returns a structure containing the most up-to-date stats about this connection.
Recvd_pkt_map::iterator Recvd_pkt_iter
Short-hand for m_rcv_packets_with_gaps iterator type.
Mutex m_mutex
This object's mutex.
Sent_pkt_by_sent_when_map::iterator Sent_pkt_ordered_by_when_iter
Short-hand for m_snd_flying_pkts_by_sent_when iterator type.
Sent_pkt_by_seq_num_map::const_iterator Sent_pkt_ordered_by_seq_const_iter
Short-hand for m_snd_flying_pkts_by_seq_num const iterator type.
Peer_socket_info m_info_on_close
This is the final set of stats collected at the time the socket was moved to S_CLOSED m_state.
bool ensure_open(Error_code *err_code) const
Helper that is equivalent to Node::ensure_sock_open(this, err_code).
Sent_pkt_by_sent_when_map::const_iterator Sent_pkt_ordered_by_when_const_iter
Short-hand for m_snd_flying_pkts_by_sent_when const iterator type.
Opt_type opt(const Opt_type &opt_val_ref) const
Analogous to Node::opt() but for per-socket options.
std::string bytes_blocks_str(size_t bytes) const
Helper that, given a byte count, returns a string with that byte count and the number of max_block_si...
Peer_socket_options m_opts
This socket's per-socket set of options.
Peer_socket_options options() const
Copies this socket's option set and returns that copy.
Options_mutex m_opts_mutex
The mutex protecting m_opts.
std::map< Sequence_number, boost::shared_ptr< Received_packet > > Recvd_pkt_map
Short-hand for m_rcv_packets_with_gaps type; see that data member.
Recvd_pkt_map::const_iterator Recvd_pkt_const_iter
Short-hand for m_rcv_packets_with_gaps const iterator type.
Open_sub_state m_open_sub_state
See state().
size_t node_receive(const Function< size_t()> &rcv_buf_consume_func, Error_code *err_code)
Non-template helper for template receive() that forwards the receive() logic to Node::receive().
State state(Open_sub_state *open_sub_state=0) const
Current State of the socket.
void return_port(flow_port_t port, Error_code *err_code)
Return a previously reserved port (of any type).
An internal net_flow sequence number identifying a piece of data.
void set_metadata(char num_line_id=0, const Sequence_number &zero_point=Sequence_number(), seq_num_delta_t multiple_size=0)
Updates the full set of metadata (used at least for convenient convention-based logging but not actua...
uint64_t seq_num_t
Raw sequence number type.
Internal net_flow class that implements a socket buffer, as used by Peer_socket for Send and Receive ...
void consume_buf_move(util::Blob *target_buf, size_t max_data_size)
Consumes (removes from the front of the internal byte buffer and returns them to the caller) a byte s...
size_t data_size() const
The total number of bytes of application-layer data stored in this object.
Properties of various container types.
typename Value_list::const_reverse_iterator Const_reverse_iterator
Type for reverse iterator pointing into an immutable structure of this type.
typename Value_list::reverse_iterator Reverse_iterator
Type for reverse iterator pointing into a mutable structure of this type.
std::pair< Iterator, bool > insert(Value const &key_and_mapped)
Attempts to insert the given key/mapped-value pair into the map.
static Ptr ptr_cast(const From_ptr &ptr_to_cast)
Provides syntactic-sugary way to perform a static_pointer_cast<> from a compatible smart pointer type...
boost::shared_ptr< Peer_socket > Ptr
Short-hand for ref-counted pointer to mutable values of type Target_type::element_type (a-la T*).
Const_target_ptr Const_ptr
Short-hand for ref-counted pointer to immutable values of type Target_type::element_type (a-la T cons...
Similar to ostringstream but allows fast read-only access directly into the std::string being written...
#define FLOW_ERROR_SYS_ERROR_LOG_WARNING()
Logs a warning about the (often errno-based or from a library) error code in sys_err_code.
#define FLOW_ERROR_EXEC_AND_THROW_ON_ERROR(ARG_ret_type, ARG_method_name,...)
Narrow-use macro that implements the error code/exception semantics expected of most public-facing Fl...
#define FLOW_ERROR_LOG_ERROR(ARG_val)
Logs a warning about the given error code using FLOW_LOG_WARNING().
#define FLOW_ERROR_EMIT_ERROR(ARG_val)
Sets *err_code to ARG_val and logs a warning about the error using FLOW_LOG_WARNING().
#define FLOW_ERROR_EMIT_ERROR_LOG_INFO(ARG_val)
Identical to FLOW_ERROR_EMIT_ERROR(), but the message logged has flow::log::Sev::S_INFO severity inst...
#define FLOW_LOG_DATA(ARG_stream_fragment)
Logs a DATA message into flow::log::Logger *get_logger() with flow::log::Component get_log_component(...
#define FLOW_LOG_INFO(ARG_stream_fragment)
Logs an INFO message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
#define FLOW_LOG_WITHOUT_CHECKING(ARG_sev, ARG_stream_fragment)
Identical to FLOW_LOG_WITH_CHECKING() but foregoes the filter (Logger::should_log()) check.
#define FLOW_LOG_WARNING(ARG_stream_fragment)
Logs a WARNING message into flow::log::Logger *get_logger() with flow::log::Component get_log_compone...
#define FLOW_LOG_WITH_CHECKING(ARG_sev, ARG_stream_fragment)
Logs a message of the specified severity into flow::log::Logger *get_logger() with flow::log::Compone...
#define FLOW_LOG_TRACE_WITHOUT_CHECKING(ARG_stream_fragment)
Logs a TRACE message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
#define FLOW_LOG_DATA_WITHOUT_CHECKING(ARG_stream_fragment)
Logs a DATA message into flow::log::Logger *get_logger() with flow::log::Component get_log_component(...
#define FLOW_LOG_TRACE(ARG_stream_fragment)
Logs a TRACE message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
Synchronicity
Enumeration indicating the manner in which asio_exec_ctx_post(), and various boost....
void asio_exec_ctx_post(log::Logger *logger_ptr, Execution_context *exec_ctx, Synchronicity synchronicity, Task &&task)
An extension of boost.asio's post() and dispatch() free function templates, this free function templa...
bool exec_void_and_throw_on_error(const Func &func, Error_code *err_code, util::String_view context)
Equivalent of exec_and_throw_on_error() for operations with void return type.
@ S_DATA
Message satisfies Sev::S_TRACE description AND contains variable-length structure (like packet,...
@ S_TRACE
Message indicates any condition that may occur with great frequency (thus verbose if logged).
@ S_INFO
Message indicates a not-"bad" condition that is not frequent enough to be of severity Sev::S_TRACE.
@ S_CONN_TIMEOUT
Other side did not complete connection handshake within the allowed time; perhaps no one is listening...
@ S_USER_CLOSED_ABRUPTLY
User code on this side abruptly closed connection; other side may be informed of this.
@ S_CONN_RESET_TOO_MANY_REXMITS
Connection reset because a packet has been retransmitted too many times.
@ S_SEQ_NUM_IMPLIES_CONNECTION_COLLISION
Other side has sent packet with sequence number that implies a port collision between two connections...
@ S_SEQ_NUM_ARITHMETIC_FAILURE
Other side has sent packets with inconsistent sequence numbers.
@ S_CONN_METADATA_TOO_LARGE
During connection user supplied metadata that is too large.
@ S_CANNOT_CONNECT_TO_IP_ANY
Cannot ask to connect to "any" IP address. Use specific IP address.
@ S_WAIT_USER_TIMEOUT
A blocking (sync_) or background-blocking (async_) operation timed out versus user-supplied time limi...
@ S_WAIT_INTERRUPTED
A blocking (sync_) or background-blocking (async_) operation was interrupted, such as by a signal.
@ S_INTERNAL_ERROR_SYSTEM_ERROR_ASIO_TIMER
Internal error: System error: Something went wrong with boost.asio timer subsystem.
@ S_EVENT_SET_CLOSED
Attempted operation on an event set, when that event set was closed.
@ S_INTERNAL_ERROR_PORT_COLLISION
Internal error: Ephemeral port double reservation allowed.
@ S_NODE_NOT_RUNNING
Node not running.
Flow module containing the API and implementation of the Flow network protocol, a TCP-inspired stream...
uint16_t flow_port_t
Logical Flow port type (analogous to a UDP/TCP port in spirit but in no way relevant to UDP/TCP).
const flow_port_t S_PORT_ANY
Special Flow port value used to indicate "invalid port" or "please pick a random available ephemeral ...
std::ostream & operator<<(std::ostream &os, const Congestion_control_selector::Strategy_choice &strategy_choice)
Serializes a Peer_socket_options::Congestion_control_strategy_choice enum to a standard ostream – the...
bool key_exists(const Container &container, const typename Container::key_type &key)
Returns true if and only if the given key is present at least once in the given associative container...
Auto_cleanup setup_auto_cleanup(const Cleanup_func &func)
Provides a way to execute arbitrary (cleanup) code at the exit of the current block.
std::string buffers_dump_string(const Const_buffer_sequence &data, const std::string &indentation, size_t bytes_per_line)
Identical to buffers_to_ostream() but returns an std::string instead of writing to a given ostream.
bool subtract_with_floor(Minuend *minuend, const Subtrahend &subtrahend, const Minuend &floor)
Performs *minuend -= subtrahend, subject to a floor of floor.
Integer ceil_div(Integer dividend, Integer divisor)
Returns the result of the given non-negative integer divided by a positive integer,...
bool in_open_open_range(T const &min_val, T const &val, T const &max_val)
Returns true if and only if the given value is within the given range, given as a (low,...
Scheduled_task_handle schedule_task_from_now(log::Logger *logger_ptr, const Fine_duration &from_now, bool single_threaded, Task_engine *task_engine, Scheduled_task_handler &&task_body_moved)
Schedule the given function to execute in a certain amount of time: A handy wrapper around Timer (asi...
bool scheduled_task_fired(log::Logger *logger_ptr, Scheduled_task_const_handle task)
Returns whether a previously scheduled (by schedule_task_from_now() or similar) task has already fire...
bool in_open_closed_range(T const &min_val, T const &val, T const &max_val)
Returns true if and only if the given value is within the given range, given as a (low,...
void ostream_op_to_string(std::string *target_str, T const &... ostream_args)
Writes to the specified string, as if the given arguments were each passed, via << in sequence,...
Fine_duration scheduled_task_fires_from_now_or_canceled(log::Logger *logger_ptr, Scheduled_task_const_handle task)
Returns how long remains until a previously scheduled (by schedule_task_from_now() or similar) task f...
boost::shared_ptr< Scheduled_task_handle_state > Scheduled_task_handle
Black-box type that represents a handle to a scheduled task as scheduled by schedule_task_at() or sch...
bool in_closed_range(T const &min_val, T const &val, T const &max_val)
Returns true if and only if the given value is within the given range, inclusive.
boost::shared_ptr< void > Auto_cleanup
Helper type for setup_auto_cleanup().
bool scheduled_task_cancel(log::Logger *logger_ptr, Scheduled_task_handle task)
Attempts to prevent the execution of a previously scheduled (by schedule_task_from_now() or similar) ...
boost::asio::io_service Task_engine
Short-hand for boost.asio event service, the central class of boost.asio.
Blob_with_log_context<> Blob
A concrete Blob_with_log_context that compile-time-disables Basic_blob::share() and the sharing API d...
boost::system::error_code Error_code
Short-hand for a boost.system error code (which basically encapsulates an integer/enum error code and...
Flow_log_component
The flow::log::Component payload enumeration comprising various log components used by Flow's own int...
Fine_clock::duration Fine_duration
A high-res time duration as computed from two Fine_time_pts.
Fine_clock::time_point Fine_time_pt
A high-res time point as returned by Fine_clock::now() and suitable for precise time math in general.
unsigned char uint8_t
Byte. Best way to represent a byte of binary data. This is 8 bits on all modern systems.
Specifies the outgoing (pre-serialization) acknowledgment of a single received Data_packet,...
Equivalent of Individual_ack_rexmit_off but for sockets with retransmission enabled.
Specifies the incoming (post-deserialization) acknowledgment of a single received Data_packet.
boost::shared_ptr< const Individual_ack > Const_ptr
Short-hand for ref-counted pointer to immutable objects of this class.
uint64_t ack_delay_t
Type used to store the ACK delay for a given individual acknowledged packet.
Fine_duration Ack_delay_time_unit
Ack_delay_time_unit(1) is the duration corresponding to the ack_delay_t value 1; and proportionally f...
uint32_t rcv_wnd_t
Type used to store the size of m_rcv_wnd member in a couple of different packet types.
uint8_t rexmit_id_t
Type used to store the retransmission count in DATA and ACK packets.
The data nugget uniquely identifying a peer-to-peer connection from a remote endpoint to a port in th...
Metadata describing the data sent in the acknowledgment of an individual received packet.
boost::shared_ptr< const Individual_ack > Const_ptr
Short-hand for ref-counted pointer to immutable objects of this class.
boost::shared_ptr< Individual_ack > Ptr
Short-hand for ref-counted pointer to mutable objects of this class.
Metadata (and data, if retransmission is on) for a packet that has been received (and,...
const size_t m_size
Number of bytes in the Data_packet::m_data field of that packet.
Received_packet(log::Logger *logger_ptr, size_t size, util::Blob *src_data)
Constructs object by storing size of data and, if so instructed, the data themselves.
util::Blob m_data
Byte sequence equal to that of Data_packet::m_data of the packet.
Data store to keep timing related info when a packet is sent out.
const order_num_t m_order_num
Order number of the packet.
size_t m_sent_cwnd_bytes
The congestion window size (in bytes) that is used when the packet is sent out.
Fine_time_pt m_sent_time
The timestamp when the packet is sent out.
Metadata (and data, if retransmission is on) for a packet that has been sent one (if retransmission i...
Sent_packet(bool rexmit_on, boost::shared_ptr< Data_packet > packet, const Sent_when &sent_when)
Constructs object with the given values and m_acks_after_me at zero.
std::vector< Sent_when > m_sent_when
Time stamps, order numbers, and other info at the times when the different attempts (including origin...
const size_t m_size
Number of bytes in the Data_packet::m_data field of the sent packet.
const boost::shared_ptr< Data_packet > m_packet
If retransmission is on, this is the DATA packet itself that was sent; otherwise null.
uint16_t ack_count_t
Type used for m_acks_after_me.
ack_count_t m_acks_after_me
The number of times any packet with m_sent_when.back().m_order_num > this->m_sent_when....
A data store that keeps stats about the a Peer_socket connection.
Peer_socket_send_stats m_snd
Stats for outgoing direction of traffic. As opposed to the other m_snd_* members, this typically accu...
Node_options m_node_opts
Per-node options currently set on the socket's Node.
size_t m_low_lvl_max_buf_size
The UDP receive buffer maximum size, as reported by an appropriate call to the appropriate getsockopt...
size_t m_rcv_buf_size
The number of bytes in the internal Receive buffer.
size_t m_rcv_wnd_last_advertised
The last rcv_wnd (receive window) size sent to sender (not necessarily received; packets can be lost)...
Fine_duration m_snd_pacing_slice_period
In pacing, the duration of the current pacing time slice.
size_t m_rcv_reassembly_q_data_size
If rexmit_on is false then 0; otherwise the total DATA payload in the reassembly queue of the socket.
size_t m_snd_pacing_bytes_allowed_this_slice
This many bytes worth of DATA packets may still be sent, at this time, within the time slice defined ...
Peer_socket_options m_sock_opts
Per-socket options currently set on the socket.
size_t m_snd_buf_size
The number of bytes in the internal Send buffer.
size_t m_rcv_syn_rcvd_data_cumulative_size
Total size of DATA payload queued while waiting for SYN_ACK_ACK in SYN_RCVD state.
size_t m_rcv_syn_rcvd_data_q_size
Number of DATA packets queued while waiting for SYN_ACK_ACK in SYN_RCVD state.
std::string m_int_state_str
The internal state of the socket, rendered into string (e.g., "SYN_RECEIVED" or "ESTABLISHED").
Fine_time_pt m_snd_pacing_slice_start
In pacing, the time point marking the beginning of the current pacing time slice.
size_t m_snd_cong_ctl_in_flight_count
In congestion control, the current sent data packets that have been neither acknowledged nor consider...
size_t m_snd_cong_ctl_in_flight_bytes
In congestion control, the current sent data bytes that have been neither acknowledged nor considered...
double m_snd_est_bandwidth_mbit_per_sec
Estimate of the currently available (to this connection) outgoing bandwidth, in megabits per second.
size_t m_rcv_wnd
Receive window size = max Receive buffer space minus space taken. Infinity if flow control disabled.
size_t m_rcv_packets_with_gaps
Number of DATA packets tracked in structure tracking all valid received packets such at least one pac...
size_t m_snd_cong_ctl_wnd_bytes
In congestion control, the current congestion window (number of outgoing data bytes allowed In-flight...
Fine_duration m_snd_smoothed_round_trip_time
Estimated current round trip time of packets, computed as a smooth value over the past individual RTT...
Error_code m_disconnect_cause
If the socket is closing or closed, this is the reason for the closure; otherwise the default-constru...
size_t m_snd_cong_ctl_wnd_count_approx
In congestion control, the approximate equivalent of m_snd_cong_ctl_in_flight_bytes as a full packet ...
size_t m_snd_rcv_wnd
The receive window (rcv_wnd a/k/a free Receive buffer space) value of the peer socket on the other si...
bool m_is_active_connect
true if this is the "client" socket (connect()ed); false otherwise (accept()ed).
size_t m_snd_pacing_packet_q_size
In pacing, number of packets currently queued to be sent out by the pacing module.
Fine_duration m_snd_round_trip_time_variance
RTTVAR used for m_snd_smoothed_round_trip_time calculation; it is the current RTT variance.
Peer_socket_receive_stats m_rcv
Stats for incoming direction of traffic. As opposed to the other m_rcv_* members, this typically accu...
Fine_duration m_snd_drop_timeout
Drop Timeout: how long a given packet must remain unacknowledged to be considered dropped due to Drop...
A set of low-level options affecting a single Peer_socket.
Fine_duration m_st_init_drop_timeout
Once socket enters ESTABLISHED state, this is the value for Peer_socket::m_snd_drop_timeout until the...
unsigned int m_st_max_rexmissions_per_packet
If retransmission is enabled and a given packet is retransmitted this many times and has to be retran...
size_t m_st_rcv_buf_max_size
Maximum number of bytes that the Receive buffer can hold.
size_t m_st_cong_ctl_max_cong_wnd_blocks
The constant that determines the CWND limit in Congestion_control_classic_data::congestion_window_at_...
Fine_duration m_st_snd_bandwidth_est_sample_period_floor
When estimating the available send bandwidth, each sample must be compiled over at least this long of...
unsigned int m_st_cong_ctl_cong_avoidance_increment_blocks
The multiple of max-block-size by which to increment CWND in congestion avoidance mode after receivin...
size_t m_st_cong_ctl_cong_wnd_on_drop_timeout_blocks
On Drop Timeout, set congestion window to this value times max-block-size.
size_t m_st_cong_ctl_init_cong_wnd_blocks
The initial size of the congestion window, given in units of max-block-size-sized blocks.
bool m_st_rexmit_on
Whether to enable reliability via retransmission.
size_t m_st_snd_buf_max_size
Maximum number of bytes that the Send buffer can hold.
Fine_duration m_st_connect_retransmit_period
How often to resend SYN or SYN_ACK while SYN_ACK or SYN_ACK_ACK, respectively, has not been received.
Fine_duration m_dyn_rcv_wnd_recovery_timer_period
When the mode triggered by rcv-buf-max-size-to-advertise-percent being exceeded is in effect,...
Fine_duration m_st_connect_retransmit_timeout
How long from the first SYN or SYN_ACK to allow for connection handshake before aborting connection.
size_t m_st_max_full_blocks_before_ack_send
If there are at least this many TIMES max-block-size bytes' worth of individual acknowledgments to be...
Fine_duration m_st_delayed_ack_timer_period
The maximum amount of time to delay sending ACK with individual packet's acknowledgment since receivi...
unsigned int m_dyn_drop_timeout_backoff_factor
Whenever the Drop Timer fires, upon the requisite Dropping of packet(s), the DTO (Drop Timeout) is se...
size_t m_st_max_block_size
The size of block that we will strive to (and will, assuming at least that many bytes are available i...
unsigned int m_st_cong_ctl_classic_wnd_decay_percent
In classic congestion control, RFC 5681 specifies the window should be halved on loss; this option al...
unsigned int m_st_rcv_buf_max_size_to_advertise_percent
% of rcv-buf-max-size that has to be freed, since the last receive window advertisement,...
unsigned int m_st_rcv_max_packets_after_unrecvd_packet_ratio_percent
The limit on the size of Peer_socket::m_rcv_packets_with_gaps, expressed as what percentage the maxim...
Fine_duration m_dyn_drop_timeout_ceiling
Ceiling to impose on the Drop Timeout.
Represents the remote endpoint of a Flow-protocol connection; identifies the UDP endpoint of the remo...
util::Udp_endpoint m_udp_endpoint
UDP address (IP address/UDP port) where the Node identified by this endpoint bound its low-level UDP ...
#define FLOW_UTIL_WHERE_AM_I_STR()
Same as FLOW_UTIL_WHERE_AM_I() but evaluates to an std::string.