26#include <boost/algorithm/string.hpp> 
   27#include <boost/tuple/tuple.hpp> 
   42  m_active_connect(false), 
 
   43  m_state(
State::S_CLOSED), 
 
   46  m_rcv_buf(logger_ptr, 0), 
 
   48  m_snd_buf(logger_ptr, max_block_size()),
 
   49  m_serialized_metadata(logger_ptr),
 
   52  m_rcv_syn_rcvd_data_cumulative_size(0), 
 
   53  m_rcv_reassembly_q_data_size(0),
 
   54  m_rcv_pending_acks_size_at_recv_handler_start(0),
 
   55  m_snd_pending_rcv_wnd(0), 
 
   56  m_rcv_last_sent_rcv_wnd(0),
 
   57  m_rcv_in_rcv_wnd_recovery(false),
 
   58  m_rcv_delayed_ack_timer(*task_engine),
 
   59  m_snd_flying_bytes(0),
 
   60  m_snd_last_order_num(0),
 
   61  m_snd_rexmit_q_size(0),
 
   62  m_snd_remote_rcv_wnd(0),
 
   63  m_snd_smoothed_round_trip_time(0),
 
   64  m_round_trip_time_variance(0),
 
   65  m_snd_drop_timeout(0),
 
   66  m_snd_pacing_data(task_engine),
 
   68  m_init_rexmit_count(0)
 
   71  FLOW_LOG_TRACE(
"Peer_socket [" << 
static_cast<void*
>(
this) << 
"] created.");
 
  109  return sync_send(
nullptr, Fine_duration::max(), err_code);
 
  120  const Function<size_t (
size_t)> empty_snd_buf_feed_func;
 
  121  assert(empty_snd_buf_feed_func.empty());
 
  135  const Ptr sock = shared_from_this();
 
  142  return m_node->
send(sock, snd_buf_feed_func, err_code);
 
  149  using boost::adopt_lock;
 
  154  const Ptr sock = shared_from_this();
 
  180                snd_buf_feed_func_or_empty.empty()
 
  183                                          { 
return m_node->
send(sock, snd_buf_feed_func_or_empty, err_code); }),
 
  185                wait_until, err_code);
 
  190  return sync_receive(
nullptr, Fine_duration::max(), err_code);
 
  201  const Function<size_t ()> empty_rcv_buf_consume_func;
 
  202  assert(empty_rcv_buf_consume_func.empty());
 
  216  const Ptr sock = shared_from_this();
 
  223  return m_node->
receive(sock, rcv_buf_consume_func, err_code);
 
  230  using boost::adopt_lock;
 
  235  const Ptr sock = shared_from_this();
 
  253                rcv_buf_consume_func_or_empty.empty()
 
  256                                          { 
return m_node->
receive(sock, rcv_buf_consume_func_or_empty, err_code); }),
 
  258                wait_until, err_code);
 
  275  const Ptr sock = shared_from_this();
 
  298  const Ptr sock = shared_from_this();
 
  326  const Const_ptr sock = shared_from_this();
 
  349                                            const unsigned int* inflate_pct_val_ptr)
 const 
  355  const unsigned int inflate_pct = inflate_pct_val_ptr ? (*inflate_pct_val_ptr) : 0;
 
  421  os.os() << bytes << 
'~' << (bytes / block);
 
  422  if ((bytes % block) != 0)
 
  431                                      boost::shared_ptr<Data_packet> packet,
 
  433  m_size(packet->m_data.size()),
 
  434  m_sent_when({ sent_when }),
 
  436  m_packet(
rexmit_on ? packet : boost::shared_ptr<Data_packet>()) 
 
  450    m_data = std::move(*src_data); 
 
  466                                      boost::shared_ptr<const Syn_ack_packet> syn_ack)
 
  474  FLOW_LOG_INFO(
"NetFlow worker thread continuing active-connect of [" << sock << 
"].  " 
  475                "Received [" << syn_ack->m_type_ostream_manip << 
"] with " 
  476                "ISN [" << syn_ack->m_init_seq_num << 
"]; " 
  477                "security token [" << syn_ack->m_packed.m_security_token << 
"].");
 
  481  async_low_lvl_syn_ack_ack_send(sock, syn_ack);
 
  488  sock->m_rcv_init_seq_num = syn_ack->m_init_seq_num;
 
  489  sock->m_rcv_next_seq_num = sock->m_rcv_init_seq_num + 1;
 
  502  setup_drop_timer(socket_id, sock);
 
  505  sock->m_snd_remote_rcv_wnd = syn_ack->m_packed.m_rcv_wnd;
 
  517    event_set_all_check_delta(
true);
 
  525                                         boost::shared_ptr<const Syn_ack_packet> syn_ack)
 
  533  FLOW_LOG_INFO(
"NetFlow worker thread working on [" << sock << 
"].  " 
  535                "received duplicate [" << syn_ack->m_type_ostream_manip << 
"] with " 
  536                "ISN [" << syn_ack->m_init_seq_num << 
"]; " 
  537                "security token [" << syn_ack->m_packed.m_security_token << 
"].  " 
  538                "Could be from packet loss.");
 
  542  async_low_lvl_syn_ack_ack_send(sock, syn_ack);
 
  547                                      boost::shared_ptr<Data_packet> packet,
 
  548                                      bool syn_rcvd_qd_packet)
 
  602  const bool rexmit_on = sock->rexmit_on();
 
  605  auto& data = packet->m_data; 
 
  606  assert(!data.empty()); 
 
  608  const size_t data_size = data.size();
 
  616  FLOW_LOG_TRACE(
"NetFlow worker thread working on [" << sock << 
"].  " 
  617                 "Received [" << packet->m_type_ostream_manip << 
"] with " 
  618                 "sequence number [" << seq_num << 
"]; data size [" << data_size << 
"].");
 
  623  log_rcv_window(sock); 
 
  639  const Error_code cat_result = sock_categorize_data_to_established(sock, packet, &dupe, &slide, &slide_size);
 
  649    rst_and_close_connection_immediately(socket_id, sock, cat_result, 
true);
 
  682    async_acknowledge_packet(sock, seq_num, packet->m_rexmit_id, data_size); 
 
  698    if (!sock_data_to_rcv_buf_unless_overflow(sock, packet))
 
  722    sock_rcv_buf_now_readable(sock, syn_rcvd_qd_packet);
 
  730    async_acknowledge_packet(sock, seq_num, 0, data_size); 
 
  739      sock_track_new_data_after_gap_rexmit_off(sock, packet, data_size, &slide, &slide_size);
 
  751      sock_slide_rcv_next_seq_num(sock, slide_size, 
false);
 
  766      if (!sock_data_to_rcv_buf_unless_overflow(sock, packet))
 
  783      sock_slide_rcv_next_seq_num(sock, slide_size, 
true);
 
  787      sock_rcv_buf_now_readable(sock, syn_rcvd_qd_packet);
 
  789    else if (!sock_data_to_reassembly_q_unless_overflow(sock, packet)) 
 
  803    async_acknowledge_packet(sock, seq_num, packet->m_rexmit_id, data_size); 
 
  807  log_rcv_window(sock);
 
  811                                                     boost::shared_ptr<const Data_packet> packet,
 
  812                                                     bool* dupe, 
bool* slide, 
size_t* slide_size)
 
  814  assert(dupe && slide && slide_size);
 
  826  const auto& data = packet->m_data;
 
  831  advance_seq_num(&seq_num_end, data.size());
 
  834  bool first_gap_exists;
 
  837  rcv_get_first_gap_info(sock, &first_gap_exists, &seq_num_after_first_gap);
 
  851                     "Received [" << packet->m_type_ostream_manip << 
"] with " 
  852                     "sequence number [" << seq_num << 
"]; data size [" << data.size() << 
"]; " 
  853                     "sequence number precedes " 
  854                     "ISN [" << sock->m_rcv_init_seq_num << 
"].");
 
  860  if (seq_num < rcv_next_seq_num)
 
  867    if (seq_num_end > rcv_next_seq_num)
 
  871                       "Received [" << packet->m_type_ostream_manip << 
"] with " 
  872                       "sequence numbers [" << seq_num << 
", " << seq_num_end << 
"); " 
  873                       "data size [" << data.size() << 
"]; " 
  874                       "straddle first unreceived " 
  875                       "sequence number [" << rcv_next_seq_num << 
"].");
 
  882    FLOW_LOG_TRACE(
"Duplicate packet before first unreceived sequence number [" << rcv_next_seq_num << 
"].");
 
  895  if (seq_num == rcv_next_seq_num)
 
  901    if (first_gap_exists && (seq_num_end > seq_num_after_first_gap))
 
  905                       "Received [" << packet->m_type_ostream_manip << 
"] with " 
  906                       "sequence numbers [" << seq_num << 
", " << seq_num_end << 
"); " 
  907                       "data size [" << data.size() << 
"]; " 
  908                       "supposed gap-filling data " 
  909                       "straddle the boundary of packet [" << seq_num_after_first_gap << 
", ...).");
 
  915    FLOW_LOG_TRACE(
"Packet filled first [" << data.size() << 
"] unreceived sequence numbers " 
  916                   "starting with [" << rcv_next_seq_num << 
"].");
 
  920    *slide_size = size_t(seq_num_end - seq_num);
 
  921    assert(*slide_size == data.size());
 
  926  assert(seq_num > rcv_next_seq_num);
 
  968  if (next_packet == rcv_packets_with_gaps.end())
 
  976    if (first_gap_exists)
 
  980      get_seq_num_range(last_packet, 0, &seq_num_last_end);
 
  982      if (seq_num_last_end > seq_num) 
 
  988                         "Received [" << packet->m_type_ostream_manip << 
"] with " 
  989                         "sequence numbers [" << seq_num << 
", " << seq_num_end << 
"); " 
  990                         "data size [" << data.size() << 
"]; " 
  991                         "supposed middle gap-filling packet data " 
  992                         "straddle the boundary of last packet [..., " << seq_num_last_end << 
").");
 
 1000      FLOW_LOG_TRACE(
"New packet is newest packet after unreceived gap; " 
 1001                     "sequence numbers [" << seq_num << 
", " << seq_num_end << 
"); " 
 1002                     "first unreceived packet [" << rcv_next_seq_num << 
"].");
 
 1008      FLOW_LOG_TRACE(
"New packet forms gap; sequence numbers [" << seq_num << 
", " << seq_num_end << 
"); " 
 1009                     "first unreceived packet [" << rcv_next_seq_num << 
"].");
 
 1019  get_seq_num_range(next_packet, &seq_num_next_start, &seq_num_next_end);
 
 1021  if (seq_num_next_start == seq_num)
 
 1026    if (seq_num_next_end != seq_num_end)
 
 1032                       "Received [" << packet->m_type_ostream_manip << 
"] with " 
 1033                       "sequence numbers [" << seq_num << 
", " << seq_num_end << 
"); " 
 1034                       "data size [" << data.size() << 
"]; " 
 1035                       "do not match supposed " 
 1036                       "duplicate packet [" << seq_num << 
", " << seq_num_next_end << 
").");
 
 1047                   "sequence numbers [" << seq_num << 
", " << seq_num_end << 
").");
 
 1053  assert(seq_num_next_start > seq_num); 
 
 1065  if (seq_num_end > seq_num_next_start) 
 
 1071                     "Received [" << packet->m_type_ostream_manip << 
"] with " 
 1072                     "sequence numbers [" << seq_num << 
", " << seq_num_end << 
"); " 
 1073                     "data size [" << data.size() << 
"]; " 
 1074                     "supposed middle gap-filling packet data " 
 1075                     "straddle the left boundary of packet " 
 1076                     "[" << seq_num_next_start << 
", " << seq_num_next_end << 
").");
 
 1082  if (next_packet == rcv_packets_with_gaps.begin())
 
 1084    FLOW_LOG_TRACE(
"New packet partially fills first gap without sliding window; " 
 1085                   "sequence numbers [" << seq_num << 
", " << seq_num_end << 
"); " 
 1086                   "first unreceived packet [" << rcv_next_seq_num << 
"].");
 
 1092  get_seq_num_range(prev_packet, &seq_num_prev_start, &seq_num_prev_end);
 
 1094  if (seq_num_prev_end > seq_num) 
 
 1100                     "Received [" << packet->m_type_ostream_manip << 
"] with " 
 1101                     "sequence numbers [" << seq_num << 
", " << seq_num_end << 
"); " 
 1102                     "data size [" << data.size() << 
"]; " 
 1103                     "supposed middle gap-filling packet data " 
 1104                     "straddle the right boundary of packet " 
 1105                     "[" << seq_num_prev_start << 
", " << seq_num_prev_end << 
").");
 
 1112                 "sequence numbers [" << seq_num << 
", " << seq_num_end << 
"); " 
 1113                 "first unreceived packet [" << rcv_next_seq_num << 
"].");
 
 1119                                                boost::shared_ptr<Data_packet> packet)
 
 1128  Blob& data = packet->m_data; 
 
 1130  const size_t data_size = data.size();
 
 1146    if ((sock->m_rcv_buf.data_size() + data_size)
 
 1147        > sock->max_block_size_multiple(sock->m_opts.m_st_rcv_buf_max_size,
 
 1148                                        &sock->m_opts.m_st_rcv_buf_max_size_slack_percent))
 
 1156      FLOW_LOG_INFO(
"NetFlow worker thread working on [" << sock << 
"].  " 
 1157                    "Received [" << packet->m_type_ostream_manip << 
"] with " 
 1158                    "sequence numbers [" << packet->m_seq_num << 
", " << (packet->m_seq_num + data_size) << 
"); " 
 1159                    "data size [" << data_size << 
"]; " 
 1160                    "dropping because Receive buffer full.");
 
 1173  const size_t written =
 
 1175      sock->m_rcv_buf.feed_buf_move(&data, std::numeric_limits<size_t>::max());
 
 1177    assert(written == data_size);
 
 1179    buf_size = sock->m_rcv_buf.data_size();
 
 1190  receive_wnd_recovery_data_received(sock);
 
 1218  if ((!syn_rcvd_qd_packet) &&
 
 1222    event_set_all_check_delta(
true);
 
 1230                                                    boost::shared_ptr<const Data_packet> packet,
 
 1232                                                    bool* slide, 
size_t* slide_size)
 
 1234  using std::make_pair;
 
 1250  const size_t max_packets_after_unrecvd_packet = sock_max_packets_after_unrecvd_packet(sock);
 
 1257  const auto insert_result =
 
 1259    rcv_packets_with_gaps.insert
 
 1263  assert(!sock->rexmit_on());
 
 1264  assert(insert_result.second); 
 
 1267  bool first_gap_exists;
 
 1271  rcv_get_first_gap_info(sock, &first_gap_exists, &seq_num_after_first_gap);
 
 1272  assert(first_gap_exists);
 
 1281  if (rcv_packets_with_gaps.size() == max_packets_after_unrecvd_packet + 1)
 
 1285    *slide_size = size_t(seq_num_after_first_gap - sock->m_rcv_next_seq_num);
 
 1291    FLOW_LOG_INFO(
"NetFlow worker thread working on [" << sock << 
"].  " 
 1292                  "Received [" << packet->m_type_ostream_manip << 
"] with " 
 1293                  "sequence numbers [" << packet->m_seq_num << 
", " << (packet->m_seq_num + data_size) << 
"); " 
 1294                  "exceeded max gapped packet list size [" << max_packets_after_unrecvd_packet << 
"]; " 
 1295                  "assuming Dropped; " 
 1296                  "will fake receiving all [" << slide_size << 
"] sequence numbers in the first unreceived gap.");
 
 1301    assert(rcv_packets_with_gaps.size() <= max_packets_after_unrecvd_packet);
 
 1306                                                     boost::shared_ptr<Data_packet> packet)
 
 1308  using std::make_pair;
 
 1317  auto& data = packet->m_data; 
 
 1319  const size_t data_size = data.size();
 
 1324  size_t max_packets_after_unrecvd_packet = sock_max_packets_after_unrecvd_packet(sock);
 
 1372  size_t max_packets_in_reassembly_q
 
 1373    = sock->max_block_size_multiple(sock->m_opts.m_st_rcv_buf_max_size,
 
 1374                                    &sock->m_opts.m_st_rcv_buf_max_size_slack_percent);
 
 1376  size_t rcv_buf_size;
 
 1379    rcv_buf_size = sock->m_rcv_buf.data_size(); 
 
 1384  max_packets_in_reassembly_q /= sock->max_block_size();
 
 1388  max_packets_in_reassembly_q += rcv_packets_with_gaps.size();
 
 1391  if (max_packets_in_reassembly_q < max_packets_after_unrecvd_packet)
 
 1393    max_packets_after_unrecvd_packet = max_packets_in_reassembly_q;
 
 1398    FLOW_LOG_TRACE(
"Unexpected Receive buffer limits: safety net [" << max_packets_after_unrecvd_packet << 
"] <= " 
 1399                   "real limit [" << max_packets_in_reassembly_q << 
"], but the opposite is typical.  " 
 1400                   "See details just below."); 
 
 1403  if (rcv_packets_with_gaps.size() + 1 > max_packets_after_unrecvd_packet)
 
 1417                     "Received [" << packet->m_type_ostream_manip << 
"] with " 
 1418                     "sequence numbers [" << packet->m_seq_num << 
", " << (packet->m_seq_num + data_size) << 
"); " 
 1419                     "exceeded max gapped packet list size [" << max_packets_after_unrecvd_packet << 
"]; " 
 1420                     "dropping packet.");
 
 1425  FLOW_LOG_TRACE(
"NetFlow worker thread working on [" << sock << 
"].  " 
 1426                 "Enqueueing [" << packet->m_type_ostream_manip << 
"] payload onto reassembly queue with " 
 1427                 "sequence numbers [" << packet->m_seq_num << 
", " << (packet->m_seq_num + data_size) << 
") " 
 1428                 "of size [" << data_size << 
"]; " 
 1429                 "successfully fit into max gapped packet list size [" << max_packets_after_unrecvd_packet << 
"]; " 
 1430                 "could have fit [" << (max_packets_after_unrecvd_packet - rcv_packets_with_gaps.size()) << 
"] more.");
 
 1434  const auto insert_result =
 
 1436    rcv_packets_with_gaps.insert
 
 1439  sock->m_rcv_reassembly_q_data_size += data_size;
 
 1440  assert(insert_result.second); 
 
 1451  receive_wnd_recovery_data_received(sock);
 
 1469  rcv_next_seq_num += slide_size; 
 
 1472                 "[" << (rcv_next_seq_num - slide_size) << 
"] to " 
 1473                 "[" << rcv_next_seq_num << 
"].");
 
 1485  size_t total_written = 0;
 
 1488  for (end_contig_it = start_contig_it;
 
 1492       (end_contig_it != rcv_packets_with_gaps.end()) && (end_contig_it->first == rcv_next_seq_num);
 
 1497    if (reassembly_in_progress)
 
 1517        written = sock->m_rcv_buf.feed_buf_move(&rcvd_packet.
m_data, std::numeric_limits<size_t>::max());
 
 1519        buf_size = sock->m_rcv_buf.data_size();
 
 1521      total_written += written;
 
 1527      assert(written != 0);
 
 1530    advance_seq_num(&rcv_next_seq_num, rcvd_packet.
m_size);
 
 1533                   "[" << rcv_next_seq_num << 
"]; packet subsumed by this move.");
 
 1537  rcv_packets_with_gaps.erase(start_contig_it, end_contig_it); 
 
 1538  sock->m_rcv_reassembly_q_data_size -= total_written;
 
 1548  return uint64_t(sock->opt(sock->m_opts.m_st_rcv_max_packets_after_unrecvd_packet_ratio_percent)) *
 
 1549         uint64_t(sock->opt(sock->m_opts.m_st_rcv_buf_max_size)) /
 
 1550         uint64_t(sock->max_block_size()) /
 
 1558  *first_gap_exists = !sock->m_rcv_packets_with_gaps.empty();
 
 1560  if (*first_gap_exists)
 
 1562    *seq_num_after_first_gap = sock->m_rcv_packets_with_gaps.begin()->first;
 
 1575  sock->m_rcv_stats.total_to_send_ack_packet(data_size);
 
 1577  const size_t acks_pending_before_this = sock->m_rcv_pending_acks.size();
 
 1579  static_assert(std::is_aggregate_v<Peer_socket::Individual_ack>,
 
 1580                "We want it to be direct-initializable.");
 
 1581  static_assert((!std::is_copy_constructible_v<Peer_socket::Individual_ack>)
 
 1582                  && (!std::is_copy_assignable_v<Peer_socket::Individual_ack>),
 
 1583                "We want it to be noncopyable but rather passed-around via its ::Ptr.");
 
 1589  sock->m_rcv_pending_acks.push_back
 
 1618  if (m_socks_with_accumulated_pending_acks.insert(sock).second)
 
 1624    sock->m_rcv_pending_acks_size_at_recv_handler_start = acks_pending_before_this;
 
 1631  using boost::chrono::milliseconds;
 
 1632  using boost::chrono::microseconds;
 
 1633  using boost::chrono::duration_cast;
 
 1634  using boost::chrono::round;
 
 1642  vector<Peer_socket::Individual_ack::Ptr>& pending_acks = sock->m_rcv_pending_acks;
 
 1647    FLOW_LOG_TRACE(
"Was about to perform accumulated acknowledgment tasks on [" << sock << 
"] but skipping because " 
 1648                   "state is now [" << sock->m_int_state << 
"].");
 
 1653  assert(!pending_acks.empty());
 
 1698  const Fine_duration delayed_ack_timer_period = sock->opt(sock->m_opts.m_st_delayed_ack_timer_period);
 
 1700  bool force_ack = delayed_ack_timer_period == Fine_duration::zero(); 
 
 1705      (
"Delayed [ACK] feature disabled on [" << sock << 
"]; forcing immediate [ACK].  " 
 1706       "Receive window state: [" << sock->m_rcv_init_seq_num << 
", " << sock->m_rcv_next_seq_num << 
") " 
 1707       "| " << sock->m_rcv_packets_with_gaps.size() << 
":{...}.");
 
 1709  else if (!sock->m_rcv_packets_with_gaps.empty())
 
 1721    for (
size_t ack_idx = sock->m_rcv_pending_acks_size_at_recv_handler_start;
 
 1722         ack_idx != pending_acks.size(); ++ack_idx)
 
 1724      ack = pending_acks[ack_idx];
 
 1725      if (ack->m_seq_num > sock->m_rcv_next_seq_num)
 
 1735        (
"On [" << sock << 
"] " 
 1736         "received out-of-order packet [" << ack->m_seq_num << 
", size " << ack->m_data_size << 
", " 
 1737         "rexmit " << ack->m_rexmit_id << 
"]; " 
 1738         "forcing immediate [ACK].  " 
 1739         "Receive window state: [" << sock->m_rcv_init_seq_num << 
", " << sock->m_rcv_next_seq_num << 
") " 
 1740         "| " << sock->m_rcv_packets_with_gaps.size() << 
":{...}.");
 
 1748      = sock->opt(sock->m_opts.m_st_max_full_blocks_before_ack_send) * sock->max_block_size();
 
 1752      bytes += ack->m_data_size;
 
 1763                     "accumulated at least [" << limit << 
"] bytes to acknowledge; " 
 1764                     "forcing immediate [ACK].");
 
 1790    if (sock->m_rcv_pending_acks_size_at_recv_handler_start != 0)
 
 1793                     "canceling delayed [ACK] timer due to forcing " 
 1794                     "immediate [ACK]; would have fired " 
 1795                     "in [" << round<milliseconds>(sock->m_rcv_delayed_ack_timer.expiry() - Fine_clock::now()) << 
"] " 
 1798      if (sock->m_rcv_delayed_ack_timer.cancel() == 0)
 
 1804                      "tried to cancel delayed [ACK] timer while " 
 1805                      "forcing [ACK], but it was already just about to fire.");
 
 1813      async_low_lvl_ack_send(sock);
 
 1814      assert(pending_acks.empty());
 
 1836    if (sock->m_rcv_pending_acks_size_at_recv_handler_start == 0)
 
 1840      sock->m_rcv_delayed_ack_timer.expires_after(delayed_ack_timer_period);
 
 1843                     "scheduled delayed [ACK] timer to fire " 
 1844                     "in [" << round<milliseconds>(delayed_ack_timer_period) << 
"].");
 
 1847      sock->m_rcv_delayed_ack_timer.async_wait([
this, socket_id, sock](
const Error_code& sys_err_code)
 
 1849        async_low_lvl_ack_send(sock, sys_err_code);
 
 1857  sock->m_rcv_stats.current_pending_to_ack_packets(pending_acks.size());
 
 1864  using boost::algorithm::join;
 
 1879      (
"Receive window state for [" << sock << 
"]: " 
 1880       "[" << sock->m_rcv_init_seq_num << 
", " << sock->m_rcv_next_seq_num << 
") " 
 1881       "| " << rcv_packets_with_gaps.size() << 
":{...}.");
 
 1892  vector<string> pkt_strs;
 
 1893  pkt_strs.reserve(rcv_packets_with_gaps.size());
 
 1895  const size_t MAX_TO_SHOW = 100;
 
 1896  bool skipped_some = 
false;
 
 1900       pkt_it != rcv_packets_with_gaps.end();
 
 1903    const bool last_iteration = (count == rcv_packets_with_gaps.size() - 1);
 
 1905    if ((!skipped_some) && (count > MAX_TO_SHOW) && (!last_iteration))
 
 1908      skipped_some = 
true;
 
 1919      if (!last_iteration)
 
 1926      pkt_str = 
"[...skipped...] ";
 
 1931    get_seq_num_range(pkt_it, &start, &end);
 
 1934    pkt_strs.push_back(pkt_str);
 
 1941     "Receive window state for [" << sock << 
"]: " 
 1942       "[" << sock->m_rcv_init_seq_num << 
", " << sock->m_rcv_next_seq_num << 
") " 
 1943       "| " << rcv_packets_with_gaps.size() << 
":{" << join(pkt_strs, 
" ") << 
"}.");
 
 1947                                     boost::shared_ptr<const Ack_packet> ack)
 
 2001  sock->m_snd_pending_rcv_wnd = ack->m_rcv_wnd;
 
 2004  sock->m_rcv_acked_packets.insert(sock->m_rcv_acked_packets.end(), 
 
 2005                                   ack->m_rcv_acked_packets.begin(), ack->m_rcv_acked_packets.end());
 
 2006  m_socks_with_accumulated_acks.insert(sock); 
 
 2008  FLOW_LOG_TRACE(
"NetFlow worker thread working on [" << sock << 
"].  " 
 2009                 "Received and accumulated [" << ack->m_type_ostream_manip << 
"] with " 
 2010                 "[" << ack->m_rcv_acked_packets.size() << 
"] individual acknowledgments " 
 2011                 "and rcv_wnd = [" << ack->m_rcv_wnd << 
"]; total for this socket in this " 
 2012                 "receive handler is [" << sock->m_rcv_acked_packets.size() << 
"] individual acknowledgments.");
 
 2014  sock->m_snd_stats.received_low_lvl_ack_packet(ack->m_rcv_acked_packets.empty());
 
 2022  using boost::unordered_set;
 
 2023  using boost::chrono::round;
 
 2024  using boost::chrono::milliseconds;
 
 2025  using boost::chrono::seconds;
 
 2035  log_accumulated_acks(sock);
 
 2039  using Acks = vector<Ack_packet::Individual_ack::Ptr>;
 
 2040  Acks& acked_packets = sock->m_rcv_acked_packets;
 
 2053    FLOW_LOG_INFO(
"NetFlow worker thread working on [" << sock << 
"].  " 
 2054                  "Accumulated [ACK] packets with [" << acked_packets.size() << 
"] " 
 2055                  "individual acknowledgments, but state is now [" << sock->m_int_state << 
"]; ignoring ACKs forever.");
 
 2199  const bool rexmit_on = sock->rexmit_on();
 
 2200  auto& snd_stats = sock->m_snd_stats;
 
 2201  auto& snd_flying_pkts_by_when = sock->m_snd_flying_pkts_by_sent_when;
 
 2204  auto& pkts_marked_to_drop = sock->m_snd_temp_pkts_marked_to_drop;
 
 2205  pkts_marked_to_drop.clear();
 
 2208  const bool could_send_before_acks = can_send(sock);
 
 2210  const bool had_rexmit_data_before_acks = !sock->m_snd_rexmit_q.empty();
 
 2215  unordered_set<Peer_socket::order_num_t> flying_now_acked_pkts;
 
 2218  size_t clean_acked_bytes = 0;
 
 2219  size_t clean_acked_packets = 0;
 
 2223  using Clean_acked_packet = tuple<Fine_duration, size_t, size_t>;
 
 2224  vector<Clean_acked_packet> clean_acked_packet_events;
 
 2225  clean_acked_packet_events.reserve(min(acked_packets.size(), snd_flying_pkts_by_when.size())); 
 
 2243    const bool error_ack = !categorize_individual_ack(socket_id, sock, ack, &dupe_or_late, &flying_pkt_it);
 
 2256    if (flying_pkt_it != snd_flying_pkts_by_when.past_oldest())
 
 2259      flying_pkt = flying_pkt_it->second;
 
 2260      round_trip_time = compute_rtt_on_ack(flying_pkt, time_now, ack, &sent_when); 
 
 2269    assert(!dupe_or_late);
 
 2271    assert(flying_pkt_it != snd_flying_pkts_by_when.past_oldest());
 
 2275    new_round_trip_time_sample(sock, round_trip_time);
 
 2285    const size_t bytes_acked = flying_pkt->m_size;
 
 2287    clean_acked_packet_events.emplace_back(round_trip_time, bytes_acked, cwnd_bytes);
 
 2290    snd_flying_pkts_erase_one(sock, flying_pkt_it);
 
 2293    clean_acked_bytes += bytes_acked;
 
 2294    ++clean_acked_packets;
 
 2319    flying_now_acked_pkts.insert(sent_when->
m_order_num);
 
 2328    = categorize_pkts_as_dropped_on_acks(sock, flying_now_acked_pkts);
 
 2334  size_t dropped_pkts;
 
 2335  size_t dropped_bytes;
 
 2336  size_t cong_ctl_dropped_bytes;
 
 2337  size_t cong_ctl_dropped_pkts;
 
 2338  if (!drop_pkts_on_acks(sock, last_dropped_pkt_it,
 
 2339                         &cong_ctl_dropped_pkts, &cong_ctl_dropped_bytes,
 
 2340                         &dropped_pkts, &dropped_bytes, &pkts_marked_to_drop))
 
 2361  if (dropped_pkts != 0)
 
 2364    FLOW_LOG_INFO(
"NetFlow worker thread working on [" << sock << 
"].  " 
 2365                  "Considering Dropped: [" << dropped_bytes << 
"] bytes = [" << dropped_pkts << 
"] packets.");
 
 2367    if (cong_ctl_dropped_pkts != 0) 
 
 2370      assert(cong_ctl_dropped_bytes != 0); 
 
 2372      FLOW_LOG_INFO(
"cong_ctl [" << sock << 
"] update: loss event: " 
 2373                    "Dropped [" << cong_ctl_dropped_bytes << 
"] bytes " 
 2374                    "= [" << cong_ctl_dropped_pkts << 
"] packets.");
 
 2376      sock->m_snd_cong_ctl->on_loss_event(cong_ctl_dropped_bytes, cong_ctl_dropped_pkts);
 
 2377      sock->m_snd_last_loss_event_when = Fine_clock::now();
 
 2384    assert(dropped_pkts == 0);
 
 2385    assert(cong_ctl_dropped_pkts == 0);
 
 2388  if (clean_acked_packets != 0)
 
 2390    assert(clean_acked_bytes != 0); 
 
 2391    assert(!clean_acked_packet_events.empty());
 
 2394    for (
const auto& [rtt, bytes, cwnd_bytes] : clean_acked_packet_events)
 
 2396      FLOW_LOG_TRACE(
"cong_ctl [" << sock << 
"] update: clean individual acknowledgment: " 
 2397                     "[" << sock->bytes_blocks_str(bytes) << 
"] with RTT [" << round<milliseconds>(rtt) <<
 
 2398                     "] and sent_cwnd_bytes [" << cwnd_bytes << 
"].");
 
 2400      sock->m_snd_cong_ctl->on_individual_ack(rtt, bytes, cwnd_bytes);
 
 2403    FLOW_LOG_TRACE(
"cong_ctl/bw_est [" << sock << 
"] update: clean acknowledgments: " 
 2404                   "[" << sock->bytes_blocks_str(clean_acked_bytes) << 
"] = " 
 2405                   "[" << clean_acked_packets << 
"] packets.");
 
 2408    sock->m_snd_bandwidth_estimator->on_acks(clean_acked_bytes);
 
 2409    sock->m_snd_cong_ctl->on_acks(clean_acked_bytes, clean_acked_packets);
 
 2417  if (dropped_pkts != 0)
 
 2420    snd_stats.dropped_data(dropped_bytes, dropped_pkts);
 
 2422    const seconds MIN_TIME_BETWEEN_LOGS(1);
 
 2423    const Fine_duration since_last_loss_sock_log = Fine_clock::now() - m_last_loss_sock_log_when;
 
 2425    if (since_last_loss_sock_log > MIN_TIME_BETWEEN_LOGS)
 
 2427      FLOW_LOG_INFO(
"Will log socket state on loss, because last such loss-driven logging was " 
 2428                    "[" << round<milliseconds>(since_last_loss_sock_log) << 
" >" 
 2429                    " " << MIN_TIME_BETWEEN_LOGS << 
"] ago.");
 
 2430      sock_log_detail(sock);
 
 2431      m_last_loss_sock_log_when = Fine_clock::now();
 
 2435      FLOW_LOG_INFO(
"Will NOT log socket state on loss, because last such loss-driven logging was " 
 2436                    "[" << round<milliseconds>(since_last_loss_sock_log) << 
" <=" 
 2437                    " " << MIN_TIME_BETWEEN_LOGS << 
"] ago.");
 
 2442  log_snd_window(sock);
 
 2452  drop_timer->start_contemporaneous_events();
 
 2454  for (
const auto pkt_order_num : flying_now_acked_pkts)
 
 2456    drop_timer->on_ack(pkt_order_num);
 
 2457    drop_timer->on_packet_no_longer_in_flight(pkt_order_num);
 
 2459  for (
const auto pkt_order_num : pkts_marked_to_drop)
 
 2461    drop_timer->on_packet_no_longer_in_flight(pkt_order_num);
 
 2464  drop_timer->end_contemporaneous_events();
 
 2468  if (sock->m_snd_pending_rcv_wnd != sock->m_snd_remote_rcv_wnd)
 
 2471                   "rcv_wnd change [" << sock->m_snd_remote_rcv_wnd << 
"] => [" << sock->m_snd_pending_rcv_wnd << 
"].");
 
 2472    sock->m_snd_remote_rcv_wnd = sock->m_snd_pending_rcv_wnd;
 
 2483    sock->m_snd_stats.updated_rcv_wnd(sock->m_snd_remote_rcv_wnd < sock->max_block_size());
 
 2492  if ((!could_send_before_acks) || (
rexmit_on && (!had_rexmit_data_before_acks)))
 
 2494    send_worker(sock, 
true);
 
 2505  assert(dupe_or_late);
 
 2506  assert(acked_pkt_it);
 
 2550  const bool rexmit_on = sock->rexmit_on();
 
 2551  auto& snd_flying_pkts_by_when = sock->m_snd_flying_pkts_by_sent_when;
 
 2552  auto& snd_flying_pkts_by_seq = sock->m_snd_flying_pkts_by_seq_num;
 
 2553  auto& snd_stats = sock->m_snd_stats;
 
 2558  const unsigned int rexmit_id = ack->m_rexmit_id;
 
 2562  snd_stats.received_ack();
 
 2578                     "acknowledgment [" << seq_num << 
", ...) is outside (ISN, snd_next) " 
 2579                     "range (" << sock->m_snd_init_seq_num << 
", " << sock->m_snd_next_seq_num << 
").");
 
 2582    snd_stats.error_ack();
 
 2588    rst_and_close_connection_immediately(socket_id, sock,
 
 2598  *acked_pkt_it = snd_flying_pkts_by_when.find(seq_num);
 
 2599  if (*acked_pkt_it == snd_flying_pkts_by_when.past_oldest()) 
 
 2666    if (pkt_it != snd_flying_pkts_by_seq.begin())
 
 2672      get_seq_num_range(pkt_it->second, &l1, &l2);
 
 2674      assert(l1 < seq_num); 
 
 2680        snd_stats.error_ack();
 
 2684                         "acknowledgment [" << seq_num << 
", ...) is at least partially inside " 
 2685                         "packet [" << l1 << 
", " << l2 << 
").");
 
 2721    snd_stats.late_or_dupe_ack();
 
 2723    FLOW_LOG_INFO(
"NetFlow worker thread working on [" << sock << 
"].  " 
 2724                  "Acknowledged packet [" << seq_num << 
", ...) is duplicate or late (or invalid).  " 
 2725                  "RTT unknown.  Ignoring.");
 
 2728    *dupe_or_late = 
true;
 
 2729    assert(*acked_pkt_it == snd_flying_pkts_by_when.past_oldest()); 
 
 2733  assert(*acked_pkt_it != snd_flying_pkts_by_when.past_oldest());
 
 2738  const unsigned int acked_rexmit_id = 
rexmit_on ? acked_pkt.
m_packet->m_rexmit_id : 0;
 
 2740  get_seq_num_range(*acked_pkt_it, 0, &seq_num_end);
 
 2744  if (rexmit_id > acked_rexmit_id)
 
 2748                     "Acknowledged packet [" << seq_num << 
", " << seq_num_end << 
") " 
 2749                     "rexmit_id [" << 
int(rexmit_id) << 
"] " 
 2750                     "exceeds highest sent rexmit_id [" << 
int(acked_rexmit_id) << 
"].");
 
 2759  if (rexmit_id != acked_rexmit_id)
 
 2761    assert(rexmit_id < acked_rexmit_id);
 
 2799    snd_stats.late_or_dupe_ack();
 
 2801    FLOW_LOG_INFO(
"NetFlow worker thread working on [" << sock << 
"].  " 
 2802                  "Acknowledged packet [" << seq_num << 
", " << seq_num_end << 
") " 
 2803                  "order_num [" << acked_pkt.
m_sent_when[rexmit_id].m_order_num << 
"] " 
 2804                  "rexmit_id [" << 
int(rexmit_id) << 
"] " 
 2805                  "is less than highest sent [" << 
int(acked_rexmit_id) << 
"].  Ignoring.");
 
 2808    *dupe_or_late = 
true;
 
 2809    assert(*acked_pkt_it != snd_flying_pkts_by_when.past_oldest()); 
 
 2813  assert(rexmit_id == acked_rexmit_id);
 
 2818  snd_stats.good_ack(acked_pkt.
m_size);
 
 2821  *dupe_or_late = 
false;
 
 2822  assert(*acked_pkt_it != snd_flying_pkts_by_when.past_oldest()); 
 
 2831  using boost::chrono::milliseconds;
 
 2832  using boost::chrono::round;
 
 2852  const unsigned int rexmit_id = ack->m_rexmit_id;
 
 2854  *sent_when = &(flying_pkt->m_sent_when[rexmit_id]);
 
 2863  const auto& ack_delay = ack->m_delay;
 
 2864  round_trip_time = time_now - (*sent_when)->m_sent_time - ack_delay;
 
 2866  if (round_trip_time.count() < 0)
 
 2875    FLOW_LOG_TRACE(
"Acknowledged packet [" << ack->m_seq_num << 
", ...) " 
 2876                   "order_num [" << order_num << 
"] has negative " 
 2877                   "RTT [" << round_trip_time << 
"]; assuming zero.  " 
 2878                   "Sent at [" << (*sent_when)->m_sent_time << 
"]; " 
 2879                   "received at [" << time_now << 
"]; " 
 2880                   "receiver-reported ACK delay [" << ack_delay << 
"].");
 
 2881    round_trip_time = Fine_duration::zero();
 
 2883  FLOW_LOG_TRACE(
"Acknowledged packet [" << ack->m_seq_num << 
", ...) " 
 2884                 "order_num [" << order_num << 
"] " 
 2885                 "has RTT [" << round<milliseconds>(round_trip_time) << 
"] " 
 2886                 "(ACK delay [" << round<milliseconds>(ack_delay) << 
"]).");
 
 2888  return round_trip_time;
 
 2893                                           const boost::unordered_set<Peer_socket::order_num_t>& flying_now_acked_pkts)
 
 2895  using std::priority_queue;
 
 2911  auto& snd_flying_pkts_by_when = sock->m_snd_flying_pkts_by_sent_when;
 
 2982  priority_queue<Peer_socket::order_num_t>
 
 2983    high_ack_count_q(flying_now_acked_pkts.begin(), flying_now_acked_pkts.end());
 
 2987  ack_count_t ack_increment_after_me = 0;
 
 2991  for (last_dropped_pkt_it = snd_flying_pkts_by_when.newest();
 
 2992       last_dropped_pkt_it != snd_flying_pkts_by_when.past_oldest();
 
 2993       ++last_dropped_pkt_it) 
 
 3009    while ((!high_ack_count_q.empty()) &&
 
 3011           (high_ack_count_q.top() > cur_pkt_sent_when.
m_order_num))
 
 3014      ++ack_increment_after_me; 
 
 3017      high_ack_count_q.pop(); 
 
 3024    if (cur_sent_pkt.
m_acks_after_me > S_MAX_LATER_ACKS_BEFORE_CONSIDERING_DROPPED)
 
 3038        get_seq_num_range(last_dropped_pkt_it, &cur_pkt_seq_num, &cur_pkt_seq_num_end);
 
 3041          (
"Unacknowledged packet [" << cur_pkt_seq_num << 
", " << cur_pkt_seq_num_end << 
") " 
 3042           "order_num [" << cur_pkt_sent_when.
m_order_num << 
"] has " 
 3044           "for later packets; considering it and " 
 3045           "all unacknowledged packets sent earlier as Dropped.");
 
 3055  return last_dropped_pkt_it;
 
 3060                             size_t* cong_ctl_dropped_pkts, 
size_t* cong_ctl_dropped_bytes,
 
 3061                             size_t* dropped_pkts, 
size_t* dropped_bytes,
 
 3062                             std::vector<Peer_socket::order_num_t>* pkts_marked_to_drop)
 
 3082  const bool rexmit_on = sock->rexmit_on();
 
 3083  auto& snd_flying_pkts_by_when = sock->m_snd_flying_pkts_by_sent_when;
 
 3084  auto& snd_stats = sock->m_snd_stats;
 
 3115  *dropped_pkts = snd_flying_pkts_by_when.size(); 
 
 3116  *dropped_bytes = sock->m_snd_flying_bytes; 
 
 3118  *cong_ctl_dropped_bytes = 0;
 
 3119  *cong_ctl_dropped_pkts = 0;
 
 3120  bool loss_event_finished = 
false;
 
 3135  auto& snd_rexmit_q = sock->m_snd_rexmit_q;
 
 3136  decltype(sock->m_snd_rexmit_q)::iterator snd_rexmit_q_fulcrum_it = snd_rexmit_q.end();
 
 3139  assert(pkts_marked_to_drop->empty());
 
 3141  auto pkt_it = last_dropped_pkt_it;
 
 3142  while (pkt_it != snd_flying_pkts_by_when.past_oldest())
 
 3145    auto next_pkt_it = boost::next(pkt_it);
 
 3153    if (!loss_event_finished)
 
 3158          && (sent_when.
m_sent_time < sock->m_snd_last_loss_event_when))
 
 3162        loss_event_finished = 
true;
 
 3167        *cong_ctl_dropped_bytes += sent_pkt->m_size;
 
 3168        ++(*cong_ctl_dropped_pkts);
 
 3177      if (!ok_to_rexmit_or_close(sock, pkt_it, 
true)) 
 
 3191      snd_rexmit_q_fulcrum_it = snd_rexmit_q.insert(snd_rexmit_q_fulcrum_it, sent_pkt);
 
 3192      ++sock->m_snd_rexmit_q_size;
 
 3202       "Scoreboard must not get otherwise changed when a packet is erased.");
 
 3203    pkts_marked_to_drop->push_back(sent_when.
m_order_num);
 
 3204    snd_flying_pkts_erase_one(sock, pkt_it);
 
 3206    pkt_it = next_pkt_it;
 
 3210  *dropped_pkts -= snd_flying_pkts_by_when.size(); 
 
 3211  *dropped_bytes -= sock->m_snd_flying_bytes; 
 
 3213  if (*cong_ctl_dropped_pkts != 0)
 
 3216    snd_stats.loss_event();
 
 3224  using boost::algorithm::join;
 
 3225  using boost::chrono::symbol_format;
 
 3228  using std::transform;
 
 3237  using Acks = vector<Ack::Ptr>;
 
 3238  const Acks& acked_packets = sock->m_rcv_acked_packets;
 
 3244    vector<string> ack_strs(acked_packets.size());
 
 3245    transform(acked_packets.begin(), acked_packets.end(), ack_strs.begin(),
 
 3246              [](Ack::Const_ptr ack) -> 
string 
 3248      return util::ostream_op_string(
'[', ack->m_seq_num, 
", ", int(ack->m_rexmit_id), 
", ",
 
 3252    const string ack_str = join(ack_strs, 
" ");
 
 3255                                   "Accumulated [ACK] packets with " 
 3256                                   "acknowledgments [seq_num, rexmit_id, delay]: " 
 3257                                   "[" << ack_str << 
"].");
 
 3261    FLOW_LOG_TRACE(
"NetFlow worker thread working on [" << sock << 
"].  " 
 3262                   "Accumulated [ACK] packets with " 
 3263                   "[" << acked_packets.size() << 
"] individual acknowledgments.");
 
 3268    log_snd_window(sock);
 
 3284  auto& snd_flying_pkts_by_when = sock->m_snd_flying_pkts_by_sent_when;
 
 3285  auto& snd_flying_pkts_by_seq = sock->m_snd_flying_pkts_by_seq_num;
 
 3288  assert(!snd_flying_pkts_by_when.empty());
 
 3295  log_snd_window(sock);
 
 3297  const bool rexmit_on = sock->rexmit_on();
 
 3299  const bool could_send_before_drops = can_send(sock);
 
 3301  const bool had_rexmit_data_before_drops = !sock->m_snd_rexmit_q.empty();
 
 3306  size_t cong_ctl_dropped_bytes = 0;
 
 3307  size_t cong_ctl_dropped_pkts = 0;
 
 3309  if (drop_all_packets)
 
 3311    cong_ctl_dropped_bytes = sock->m_snd_flying_bytes;
 
 3312    cong_ctl_dropped_pkts = snd_flying_pkts_by_when.size();
 
 3319           pkt_it != snd_flying_pkts_by_when.past_newest();
 
 3323        if (!ok_to_rexmit_or_close(sock, prior(pkt_it.base()), 
false)) 
 
 3334        sock->m_snd_rexmit_q.push_back(pkt_it->second); 
 
 3336      sock->m_snd_rexmit_q_size += cong_ctl_dropped_pkts;
 
 3343    snd_flying_pkts_updated(sock, snd_flying_pkts_by_when.newest(), snd_flying_pkts_by_when.past_oldest(), 
false);
 
 3344    snd_flying_pkts_by_when.clear();
 
 3345    snd_flying_pkts_by_seq.clear();
 
 3347    packet_marked_to_drop_or_drop_all = 0; 
 
 3355    cong_ctl_dropped_bytes = oldest_pkt->m_size;
 
 3356    cong_ctl_dropped_pkts = 1;
 
 3361      if (!ok_to_rexmit_or_close(sock, oldest_pkt_it, 
false)) 
 
 3368      sock->m_snd_rexmit_q.push_back(oldest_pkt); 
 
 3369      ++sock->m_snd_rexmit_q_size;
 
 3374    packet_marked_to_drop_or_drop_all = oldest_pkt->m_sent_when.back().m_order_num;
 
 3377    snd_flying_pkts_erase_one(sock, oldest_pkt_it);
 
 3402  FLOW_LOG_INFO(
"cong_ctl [" << sock << 
"] update: Drop Timeout event: " 
 3403                "Dropped [" << cong_ctl_dropped_bytes << 
"] bytes = [" << cong_ctl_dropped_pkts << 
"] packets.");
 
 3406  sock->m_snd_cong_ctl->on_drop_timeout(cong_ctl_dropped_bytes, cong_ctl_dropped_pkts);
 
 3407  sock->m_snd_last_loss_event_when = Fine_clock::now();
 
 3410  sock->m_snd_stats.drop_timeout();
 
 3411  sock->m_snd_stats.dropped_data(cong_ctl_dropped_bytes, cong_ctl_dropped_pkts);
 
 3414  log_snd_window(sock);
 
 3419  drop_timer->start_contemporaneous_events();
 
 3427  if (packet_marked_to_drop_or_drop_all == 0)
 
 3430    drop_timer->on_no_packets_in_flight_any_longer();
 
 3434    drop_timer->on_packet_no_longer_in_flight(packet_marked_to_drop_or_drop_all);
 
 3439  drop_timer->end_contemporaneous_events();
 
 3446  if ((!could_send_before_drops) || (
rexmit_on && (!had_rexmit_data_before_drops)))
 
 3448    send_worker(sock, 
false);
 
 3458  using boost::ratio_subtract;
 
 3459  using boost::ratio_string;
 
 3460  using boost::chrono::round;
 
 3461  using boost::chrono::milliseconds;
 
 3462  using boost::chrono::microseconds;
 
 3463  using boost::chrono::seconds;
 
 3492  if (srtt == Fine_duration::zero())
 
 3500                   "srtt = [" << round<milliseconds>(srtt) << 
" = " << srtt << 
"]; " 
 3501                   "rtt_var = [" << round<milliseconds>(rtt_var) << 
" = " << rtt_var << 
"]; " 
 3502                   "rtt = [" << rtt << 
"].");
 
 3520    using Alpha = ratio<1, 8>; 
 
 3521    using One_minus_alpha = ratio_subtract<ratio<1>, Alpha>;
 
 3522    using Beta = ratio<1, 4>; 
 
 3523    using One_minus_beta = ratio_subtract<ratio<1>, Beta>;
 
 3528    if (abs_srtt_minus_rtt.count() < 0)
 
 3530      abs_srtt_minus_rtt = -abs_srtt_minus_rtt;
 
 3535      = rtt_var * One_minus_beta::num / One_minus_beta::den
 
 3536        + abs_srtt_minus_rtt * Beta::num / Beta::den;
 
 3538      = srtt * One_minus_alpha::num / One_minus_alpha::den
 
 3539        + rtt * Alpha::num / Alpha::den;
 
 3543                   "srtt = [" << round<milliseconds>(srtt) << 
" = " << srtt << 
"]; " 
 3544                   "rtt_var = [" << round<milliseconds>(rtt_var) << 
" = " << rtt_var << 
"]; " 
 3545                   "rtt = [" << rtt << 
"]; " 
 3546                   "prev_srtt = [" << prev_srtt << 
"]; " 
 3547                   "prev_rtt_var = [" << prev_rtt_var << 
"]; " 
 3548                   "alpha = " << (ratio_string<Alpha, char>::prefix()) << 
"; " 
 3549                   "(1 - alpha) = " << (ratio_string<One_minus_alpha, char>::prefix()) << 
"; " 
 3550                   "beta = " << (ratio_string<Beta, char>::prefix()) << 
"; " 
 3551                   "(1 - beta) = " << (ratio_string<One_minus_beta, char>::prefix()) << 
"; " 
 3552                   "|srtt - rtt| = [" << abs_srtt_minus_rtt << 
"].");
 
 3569  const Fine_duration clock_resolution_at_least = microseconds(500);
 
 3571  const Fine_duration ceiling = sock->opt(sock->m_opts.m_dyn_drop_timeout_ceiling);
 
 3572  const unsigned int k = 4;
 
 3576  const Fine_duration srtt_plus_var_term = srtt + max(clock_resolution_at_least, rtt_var_k);
 
 3577  dto = max(srtt_plus_var_term, floor);
 
 3578  dto = min(dto, ceiling);
 
 3582                 "dto = [" << round<milliseconds>(dto) << 
" = " << dto << 
"]; " 
 3583                 "rtt_var * k = [" << rtt_var_k << 
"]; " 
 3584                 "srtt + max(G, rtt_var * k) = [" << srtt_plus_var_term << 
"]; " 
 3585                 "k = [" << k << 
"]; " 
 3586                 "floor = [" << floor << 
"]; ceiling = [" << ceiling << 
"]; " 
 3587                 "clock_resolution = [" << clock_resolution_at_least << 
"]; " 
 3588                 "prev_dto = [" << prev_dto << 
"].");
 
 3596  using boost::algorithm::join;
 
 3604  const auto& snd_flying_pkts_by_seq = sock->m_snd_flying_pkts_by_seq_num;
 
 3605  const auto& snd_flying_pkts_by_when = sock->m_snd_flying_pkts_by_sent_when;
 
 3606  const size_t num_flying_pkts = snd_flying_pkts_by_seq.size();
 
 3610  if (snd_flying_pkts_by_seq.empty())
 
 3614                           "Send window state for [" << sock << 
"]: cong_wnd " 
 3615                             "[" << sock->bytes_blocks_str(sock->m_snd_cong_ctl->congestion_window_bytes()) << 
"]; " 
 3616                             "sent+acked/dropped " 
 3617                             "[" << sock->m_snd_init_seq_num << 
", " << sock->m_snd_next_seq_num << 
") " 
 3618                             "unsent [" << sock->m_snd_next_seq_num << 
", ...).");
 
 3629      (
"Send window state for [" << sock << 
"]: cong_wnd " 
 3630       "[" << sock->bytes_blocks_str(sock->m_snd_cong_ctl->congestion_window_bytes()) << 
"]; " 
 3631       "sent+acked/dropped [" << sock->m_snd_init_seq_num << 
", " << snd_flying_pkts_by_seq.begin()->first << 
") " 
 3632       "in-flight [" << sock->m_snd_flying_bytes << 
"] bytes: " << num_flying_pkts << 
":{...} " 
 3633       "unsent [" << sock->m_snd_next_seq_num << 
", ...).");
 
 3640  const bool rexmit_on = sock->rexmit_on();
 
 3642  vector<string> pkt_strs;
 
 3643  pkt_strs.reserve(num_flying_pkts);
 
 3645       pkt_it_it != snd_flying_pkts_by_seq.end();
 
 3649    get_seq_num_range(pkt_it_it->second, &start, &end);
 
 3653    String_ostream pkt_str_os;
 
 3654    pkt_str_os.os() << 
'[' << start;
 
 3657      pkt_str_os.os() << 
'[' << int(sent_pkt->m_packet->m_rexmit_id) << 
'/' << sent_pkt->m_sent_when.back().m_order_num
 
 3662      pkt_str_os.os() << 
", ";
 
 3664    pkt_str_os.os() << end << 
")<" << sent_pkt->m_acks_after_me << 
"acks" << flush;
 
 3666    pkt_strs.push_back(pkt_str_os.str());
 
 3671     "Send window state for [" << sock << 
"]: cong_wnd " 
 3672       "[" << sock->bytes_blocks_str(sock->m_snd_cong_ctl->congestion_window_bytes()) << 
"]; " 
 3673       "sent+acked/dropped [" << sock->m_snd_init_seq_num << 
", " << snd_flying_pkts_by_seq.begin()->first << 
") " 
 3675       "[" << sock->m_snd_flying_bytes << 
"] bytes: " << num_flying_pkts << 
":{" << join(pkt_strs, 
" ") <<
 
 3676       "} unsent [" << sock->m_snd_next_seq_num << 
", ...).");
 
 3686  vector<string> pkt_strs_time;
 
 3687  pkt_strs_time.reserve(num_flying_pkts);
 
 3690       pkt_it != snd_flying_pkts_by_when.const_past_newest();
 
 3695    get_seq_num_range(prior(pkt_it.base()), &start, &end);
 
 3701                               start, 
'[', 
int(sent_pkt->m_packet->m_rexmit_id), 
'/',
 
 3702                               sent_pkt->m_sent_when.back().m_order_num, 
"], ", end, 
")<",
 
 3703                               sent_pkt->m_acks_after_me, 
"acks");
 
 3704    pkt_strs_time.push_back(pkt_str);
 
 3708  if (pkt_strs_time != pkt_strs)
 
 3712       "Sorted by time sent: {" << join(pkt_strs_time, 
" ") << 
"}.");
 
 3721  if (flying_packets.empty())
 
 3728  const Peer_socket::Sent_pkt_by_seq_num_map::value_type& highest_val = *(prior(flying_packets.end()));
 
 3732  advance_seq_num(&seq_num, highest_val.second->second->m_size);
 
 3747    get_seq_num_range(pkt_it, &seq_num, &seq_num_end);
 
 3749    if (sock->rexmit_on())
 
 3752        (
"On [" << sock << 
"] erasing packet [" << seq_num << 
", " << seq_num_end << 
") " 
 3753         "order_num [" << order_num << 
"] rexmit_id [" << 
int(sent_pkt.
m_packet->m_rexmit_id) << 
"] from " 
 3754         "snd_flying_pkts* and friends.");
 
 3759        (
"On [" << sock << 
"] erasing packet [" << seq_num << 
", " << seq_num_end << 
") " 
 3760         "order_num [" << order_num << 
"] from snd_flying_pkts* and friends.");
 
 3765  snd_flying_pkts_updated(sock, pkt_it, boost::next(pkt_it), 
false);
 
 3768  sock->m_snd_flying_pkts_by_seq_num.erase(pkt_it->first);
 
 3769  sock->m_snd_flying_pkts_by_sent_when.erase(pkt_it);
 
 3779  using std::make_pair;
 
 3783  auto& snd_flying_pkts_by_when = sock->m_snd_flying_pkts_by_sent_when;
 
 3786  const auto insert_result =
 
 3788    snd_flying_pkts_by_when.insert(make_pair(seq_num, sent_pkt));
 
 3792  assert(insert_result.second); 
 
 3793  assert(insert_result.first == pkt_it); 
 
 3795  snd_flying_pkts_updated(sock, pkt_it, boost::next(pkt_it), 
true); 
 
 3799  const auto insert_result_by_seq =
 
 3801    sock->m_snd_flying_pkts_by_seq_num.insert(make_pair(seq_num, pkt_it));
 
 3804  assert(insert_result_by_seq.second);
 
 3825  get_seq_num_range(pkt_it, 0, &seq_num_end);
 
 3826  if (sock->rexmit_on())
 
 3829      (
"On [" << sock << 
"] pushing packet [" << seq_num << 
", " << seq_num_end << 
") " 
 3830       "rexmit_id [" << 
int(sent_pkt->m_packet->m_rexmit_id) << 
"] onto snd_flying_pkts and friends.");
 
 3835      (
"On [" << sock << 
"] pushing packet [" << seq_num << 
", " << seq_num_end << 
") " 
 3836       "onto snd_flying_pkts and friends.");
 
 3847  if (pkt_begin == pkt_end)
 
 3853  const auto& snd_flying_pkts_by_when = sock->m_snd_flying_pkts_by_sent_when;
 
 3854  size_t& snd_flying_bytes = sock->m_snd_flying_bytes;
 
 3858      && (pkt_begin == snd_flying_pkts_by_when.const_newest())
 
 3859      && (pkt_end == snd_flying_pkts_by_when.const_past_oldest()))
 
 3861    snd_flying_bytes = 0;
 
 3865    size_t delta_bytes = 0;
 
 3866    for ( ; pkt_begin != pkt_end; ++pkt_begin)
 
 3868      delta_bytes += pkt_begin->second->m_size;
 
 3870    added ? (snd_flying_bytes += delta_bytes) : (snd_flying_bytes -= delta_bytes);
 
 3874                 "In-flight [" << sock->bytes_blocks_str(snd_flying_bytes) << 
"].");
 
 3879                                 bool defer_delta_check)
 
 3884  get_seq_num_range(pkt_it, &seq_num, &seq_num_end);
 
 3886  const unsigned int rexmit_id = pkt.
m_packet->m_rexmit_id;
 
 3887  FLOW_LOG_TRACE(
"On [" << sock << 
"] attempting to queue for retransmission " 
 3888                 "[" << seq_num << 
", " << seq_num_end << 
"] which has been " 
 3889                 "retransmitted [" << rexmit_id << 
"] times so far.");
 
 3890  if (rexmit_id == sock->opt(sock->m_opts.m_st_max_rexmissions_per_packet))
 
 3892    rst_and_close_connection_immediately(socket_id(sock), sock,
 
 3903  return connect_with_metadata(to, boost::asio::buffer(&S_DEFAULT_CONN_METADATA, 
sizeof(S_DEFAULT_CONN_METADATA)),
 
 3908                                             const boost::asio::const_buffer& serialized_metadata,
 
 3943                     [&]() { connect_worker(to, serialized_metadata, sock_opts, &sock); });
 
 3947  if (sock->m_disconnect_cause)
 
 3949    *err_code = sock->m_disconnect_cause;
 
 3961  using boost::asio::buffer;
 
 3962  using boost::asio::ip::address;
 
 3970  auto& sock = *sock_ptr;
 
 3977    const bool opts_ok = sock_validate_options(*sock_opts, 0, &err_code);
 
 3980    sock.reset(sock_create(*sock_opts));
 
 3985      sock->m_disconnect_cause = err_code;
 
 4002      sock_non_ptr = sock_create(
m_opts.m_dyn_sock_opts);
 
 4004    sock.reset(sock_non_ptr);
 
 4009  sock->m_active_connect = 
true;
 
 4010  sock->m_node = 
this;
 
 4012  sock->m_remote_endpoint = to;
 
 4014  sock->m_serialized_metadata.assign_copy(serialized_metadata);
 
 4023  sock->m_snd_cong_ctl.reset
 
 4032  bool ip_addr_any_error = 
false;
 
 4036    if (addr.to_v4() == util::Ip_address_v4::any())
 
 4038      ip_addr_any_error = 
true;
 
 4041  else if (addr.is_v6())
 
 4043    if (addr.to_v6() == util::Ip_address_v6::any())
 
 4045      ip_addr_any_error = 
true;
 
 4049  if (ip_addr_any_error)
 
 4052    Error_code* err_code = &sock->m_disconnect_cause;
 
 4060  sock->m_local_port = m_ports.reserve_ephemeral_port(&sock->m_disconnect_cause);
 
 4069  FLOW_LOG_INFO(
"NetFlow worker thread starting active-connect of [" << sock << 
"].");
 
 4078    FLOW_LOG_WARNING(
"Cannot add [" << sock << 
"], because such a connection already exists.  " 
 4079                     "This is an ephemeral port collision and " 
 4080                     "constitutes either a bug or an extremely unlikely condition.");
 
 4083    Error_code* err_code = &sock->m_disconnect_cause;
 
 4088    m_ports.return_port(sock->m_local_port, &return_err_code);
 
 4089    assert(!return_err_code);
 
 4100  setup_connection_timers(socket_id, sock, 
true);
 
 4105  init_seq_num = m_seq_num_generator.generate_init_seq_num();
 
 4109  init_seq_num.
set_metadata(
'L', init_seq_num + 1, sock->max_block_size());
 
 4111  sock->m_snd_next_seq_num = init_seq_num + 1;
 
 4114  auto syn = create_syn(sock);
 
 4123  m_socks[socket_id] = sock;
 
 4132  return sync_connect_with_metadata(to, Fine_duration::max(),
 
 4133                                    boost::asio::buffer(&S_DEFAULT_CONN_METADATA, 
sizeof(S_DEFAULT_CONN_METADATA)),
 
 4134                                    err_code, sock_opts);
 
 4138                                                  const boost::asio::const_buffer& serialized_metadata,
 
 4141  return sync_connect_with_metadata(to, Fine_duration::max(), serialized_metadata, err_code, opts);
 
 4145                                         const boost::asio::const_buffer& serialized_metadata,
 
 4149                                     to, max_wait, serialized_metadata, _1, sock_opts);
 
 4177    event_set->close(&dummy_prevents_throw);
 
 4180  const auto sock = connect_with_metadata(to, serialized_metadata, err_code, sock_opts);
 
 4194                                                          &dummy_prevents_throw);
 
 4198  result = event_set->sync_wait(max_wait, err_code);
 
 4213    sock->close_abruptly(&dummy_prevents_throw); 
 
 4219  const bool ready = event_set->events_detected(err_code);
 
 4242      *err_code = sock->m_disconnect_cause; 
 
 4252  sock->close_abruptly(&dummy_prevents_throw);
 
 4261  using boost::chrono::microseconds;
 
 4262  using boost::chrono::duration_cast;
 
 4263  using boost::weak_ptr;
 
 4267  Fine_duration rexmit_from_now = sock->opt(sock->m_opts.m_st_connect_retransmit_period);
 
 4274    ++sock->m_init_rexmit_count;
 
 4284  sock->m_init_rexmit_scheduled_task
 
 4287                              sock_observer = weak_ptr<Peer_socket>(sock)]
 
 4290    auto sock = sock_observer.lock();
 
 4293      handle_connection_rexmit_timer_event(socket_id, sock);
 
 4301    sock->m_connection_timeout_scheduled_task
 
 4303                               sock->opt(sock->m_opts.m_st_connect_retransmit_timeout),
 
 4304                               true, &m_task_engine,
 
 4306                                sock_observer = weak_ptr<Peer_socket>(sock)]
 
 4311      auto sock = sock_observer.lock();
 
 4318      FLOW_LOG_INFO(
"Connection handshake timeout timer [" << sock << 
"] has been triggered; was on " 
 4319                    "attempt [" << (sock->m_init_rexmit_count + 1) << 
"].");
 
 4346  FLOW_LOG_INFO(
"Connection handshake retransmit timer [" << sock << 
"] triggered; was on " 
 4347                "attempt [" << (sock->m_init_rexmit_count + 1) << 
"].");
 
 4358  if (sock->m_active_connect)
 
 4397  sock->m_rcv_delayed_ack_timer.cancel();
 
 4398  sock->m_snd_pacing_data.m_slice_timer.cancel();
 
 4400  if (sock->m_init_rexmit_scheduled_task)
 
 4405  if (sock->m_connection_timeout_scheduled_task)
 
 4410  if (sock->m_rcv_in_rcv_wnd_recovery)
 
 4413    sock->m_rcv_in_rcv_wnd_recovery = 
false;
 
 4416  if (sock->m_snd_drop_timer)
 
 4419    sock->m_snd_drop_timer->done();
 
 4423    sock->m_snd_drop_timer.reset();
 
 4429  sock->m_snd_drop_timeout = sock->opt(sock->m_opts.m_st_init_drop_timeout);
 
 4436  const auto on_timer = [
this, 
socket_id, sock](
bool drop_all_packets)
 
 4450                  const Function<
size_t (
size_t max_data_size)>& snd_buf_feed_func,
 
 4453  using boost::asio::post;
 
 4473  if (sock->m_disconnect_cause) 
 
 4507  const size_t sent = snd_buf_feed_func(sock->max_block_size_multiple(sock->m_opts.m_st_snd_buf_max_size));
 
 4510  sock->m_snd_stats.buffer_fed(sock->m_snd_buf.data_size());
 
 4631  if ((!was_deqable) && (sent != 0))
 
 4644  using boost::any_cast;
 
 4692  switch (sock->m_int_state)
 
 4707                  "in state [" << sock->m_int_state << 
"] " 
 4708                  "closed before asynchronous send_worker() could proceed.");
 
 4714                     "in state [" << sock->m_int_state << 
"] " 
 4715                     "somehow had send() called on it.");
 
 4723  using boost::asio::buffer;
 
 4726  using boost::ratio_string;
 
 4727  using boost::chrono::milliseconds;
 
 4728  using boost::chrono::round;
 
 4729  using boost::shared_ptr;
 
 4771  using Idle_timeout_dto_factor = ratio<110, 100>;
 
 4773    = sock->m_snd_drop_timeout * Idle_timeout_dto_factor::num / Idle_timeout_dto_factor::den;
 
 4774  const Fine_duration since_last_send = Fine_clock::now() - sock->m_snd_last_data_sent_when;
 
 4776  if ((sock->m_snd_last_data_sent_when != 
Fine_time_pt()) && (since_last_send > idle_timeout))
 
 4779    FLOW_LOG_INFO(
"Idle timeout triggered for [" << sock << 
"]; " 
 4780                  "last activity [" << round<milliseconds>(since_last_send) << 
"] ago " 
 4781                  "exceeds idle timeout [" << round<milliseconds>(idle_timeout) << 
"] " 
 4782                  "= " << (ratio_string<Idle_timeout_dto_factor, char>::prefix()) << 
" x " 
 4783                  "[" << round<milliseconds>(sock->m_snd_drop_timeout) << 
"].");
 
 4784    sock->m_snd_cong_ctl->on_idle_timeout();
 
 4785    sock->m_snd_stats.idle_timeout();
 
 4794                   "Initial check: can_send() is false.");
 
 4805  const bool rexmit_on = sock->rexmit_on();
 
 4814                     "Initial check: can_send() is true, but no data to send.");
 
 4821    list<Peer_socket::Sent_packet::Ptr>& rexmit_q = sock->m_snd_rexmit_q;
 
 4822    size_t& rexmit_q_size = sock->m_snd_rexmit_q_size;
 
 4829                   "Initial check: Will send from rexmit queue of size [" << rexmit_q_size << 
"] and/or " 
 4830                   "Send buffer with total size [" << snd_buf.
data_size() << 
"].");
 
 4837      shared_ptr<Data_packet> data;
 
 4839      bool rexmit = 
false;
 
 4847      if (rexmit_q.empty())
 
 4852        data = Low_lvl_packet::create_uninit_packet<Data_packet>(
get_logger());
 
 4853        data->m_rexmit_id = 0; 
 
 4864        assert(!data->m_data.empty());
 
 4867        data->m_seq_num = snd_next_seq_num;
 
 4891        sent_pkt = rexmit_q.front();
 
 4894        rexmit_q.pop_front();
 
 4897        data = sent_pkt->m_packet;
 
 4900        ++data->m_rexmit_id;
 
 4903        sent_pkt->m_sent_when.push_back(sent_when);
 
 4906        sent_pkt->m_acks_after_me = 0;
 
 4930      sock->m_snd_stats.data_sent(data->m_data.size(), rexmit);
 
 4936                   "can_send() == [" << 
can_send(sock) << 
"]; " 
 4937                   "snd_deqable() == [" << 
snd_deqable(sock) << 
"].");
 
 4990  const size_t pipe_taken = sock->m_snd_flying_bytes;
 
 4991  const size_t cong_wnd = sock->m_snd_cong_ctl->congestion_window_bytes();
 
 4992  const size_t& rcv_wnd = sock->m_snd_remote_rcv_wnd; 
 
 4994  const size_t pipe_total = min(cong_wnd, rcv_wnd);
 
 4997    = (pipe_taken < pipe_total) && ((pipe_total - pipe_taken) >= sock->max_block_size());
 
 4999  FLOW_LOG_TRACE(
"cong_ctl [" << sock << 
"] info: can_send = [" << can << 
"]; " 
 5000                 "pipe_taken = [" << sock->bytes_blocks_str(pipe_taken) << 
"]; " 
 5001                 "cong_wnd = [" << sock->bytes_blocks_str(cong_wnd) << 
"]; " 
 5002                 "rcv_wnd = [" << sock->bytes_blocks_str(rcv_wnd) << 
"].");
 
 5008                     const Function<
size_t ()>& rcv_buf_consume_func,
 
 5011  using boost::asio::post;
 
 5039  const bool no_bytes_available = sock->m_rcv_buf.empty();
 
 5040  const size_t bytes_consumed = rcv_buf_consume_func();
 
 5042  if (bytes_consumed != 0)
 
 5050                   "has successfully returned [" << bytes_consumed << 
"] bytes.");
 
 5059    if (sock->m_rcv_buf.empty()
 
 5075    return bytes_consumed;
 
 5084                   "has successfully returned no bytes because still not fully connected.");
 
 5106  if (!no_bytes_available)
 
 5110                   "has data to return, but the provided buffer size is too small.");
 
 5118                 "returning no data because Receive buffer empty.");
 
 5159  using boost::any_cast;
 
 5249    FLOW_LOG_INFO(
'[' << sock << 
"] Receive buffer space freed, " 
 5250                  "but state is now [" << sock->m_int_state << 
"]; ignoring.");
 
 5255  if (sock->m_rcv_in_rcv_wnd_recovery)
 
 5261    FLOW_LOG_TRACE(
'[' << sock << 
"] Receive buffer space freed, but " 
 5262                   "we are already in rcv_wnd recovery mode.  Nothing to do.");
 
 5271  const size_t& last_rcv_wnd = sock->m_rcv_last_sent_rcv_wnd;
 
 5273  if (rcv_wnd <= last_rcv_wnd)
 
 5277    FLOW_LOG_TRACE(
'[' << sock << 
"] Receive buffer space freed, but " 
 5278                   "free space [" << sock->bytes_blocks_str(rcv_wnd) << 
"] <= prev " 
 5279                   "free space [" << sock->bytes_blocks_str(last_rcv_wnd) << 
"].  Nothing to do.");
 
 5284  const size_t diff = rcv_wnd - last_rcv_wnd;
 
 5285  const unsigned int pct = sock->opt(sock->m_opts.m_st_rcv_buf_max_size_to_advertise_percent);
 
 5286  const size_t max_rcv_buf_size = sock->max_block_size_multiple(sock->m_opts.m_st_rcv_buf_max_size);
 
 5287  const size_t min_inc = max_rcv_buf_size * pct / 100;
 
 5293                   "freed is [" << sock->bytes_blocks_str(diff) << 
"] since last advertisement; " 
 5294                   "< threshold [" << pct << 
"%] x " 
 5295                   "[" << sock->bytes_blocks_str(max_rcv_buf_size) << 
"] = " 
 5296                   "[" << sock->bytes_blocks_str(min_inc) << 
"].  Not advertising rcv_wnd yet.");
 
 5303                "freed is [" << sock->bytes_blocks_str(diff) << 
"] since last advertisement; " 
 5304                "rcv_wnd = [" << sock->bytes_blocks_str(rcv_wnd) << 
"]; " 
 5305                ">= threshold [" << pct << 
"%] x " 
 5306                "[" << sock->bytes_blocks_str(max_rcv_buf_size) << 
"] = " 
 5307                "[" << sock->bytes_blocks_str(min_inc) << 
"].  Sending unsolicited rcv_wnd-advertising ACK " 
 5308                "and entering rcv_wnd recovery.");
 
 5311  sock->m_rcv_in_rcv_wnd_recovery = 
true;
 
 5313  sock->m_rcv_wnd_recovery_start_time = Fine_clock::now();
 
 5316  sock->m_rcv_stats.rcv_wnd_recovery_event_start();
 
 5323  using boost::chrono::milliseconds;
 
 5324  using boost::chrono::round;
 
 5325  using boost::weak_ptr;
 
 5332  auto ack = Low_lvl_packet::create_uninit_packet<Ack_packet>(
get_logger());
 
 5333  ack->m_rcv_wnd = rcv_wnd;
 
 5335  sock->m_rcv_last_sent_rcv_wnd = rcv_wnd;
 
 5340  sock->m_rcv_stats.sent_low_lvl_ack_packet(
true);
 
 5344  const Fine_duration fire_when_from_now = sock->opt(sock->m_opts.m_dyn_rcv_wnd_recovery_timer_period);
 
 5347                "[" << round<milliseconds>(fire_when_from_now) << 
"] from now.");
 
 5356  sock->m_rcv_wnd_recovery_scheduled_task
 
 5358                             [
this, sock_observer = weak_ptr<Peer_socket>(sock)](
bool)
 
 5362    auto sock = sock_observer.lock();
 
 5369    const Fine_duration since_recovery_started = Fine_clock::now() - sock->m_rcv_wnd_recovery_start_time;
 
 5370    if (since_recovery_started > sock->opt(sock->m_opts.m_dyn_rcv_wnd_recovery_max_period))
 
 5375      FLOW_LOG_INFO(
'[' << sock << 
"]: still no new DATA arrived since last rcv_wnd advertisement; " 
 5376                    "Time since entering recovery [" << round<milliseconds>(since_recovery_started) << 
"] expired.  " 
 5377                    "Ending rcv_wnd recovery.");
 
 5378      sock->m_rcv_in_rcv_wnd_recovery = 
false;
 
 5381      sock->m_rcv_stats.rcv_wnd_recovery_event_finish(
false);
 
 5393    FLOW_LOG_INFO(
'[' << sock << 
"]: still no new DATA arrived since last rcv_wnd advertisement; " 
 5394                  "rcv_wnd = [" << sock->bytes_blocks_str(rcv_wnd) << 
"]; " 
 5395                  "time since entering recovery [" << round<milliseconds>(since_recovery_started) << 
"].  " 
 5396                  "Sending unsolicited rcv_wnd-advertising ACK and continuing rcv_wnd recovery.");
 
 5404  using boost::chrono::milliseconds;
 
 5405  using boost::chrono::round;
 
 5412  if (!sock->m_rcv_in_rcv_wnd_recovery)
 
 5420  FLOW_LOG_INFO(
'[' << sock << 
"]: Canceling rcv_wnd recovery; " 
 5421                "Time since entering recovery " 
 5422                "[" << round<milliseconds>(Fine_clock::now() - sock->m_rcv_wnd_recovery_start_time) << 
"].");
 
 5424  sock->m_rcv_in_rcv_wnd_recovery = 
false;
 
 5426  const bool canceled =
 
 5432  sock->m_rcv_stats.rcv_wnd_recovery_event_finish(
true);
 
 5437  using std::numeric_limits;
 
 5441  if (!sock->opt(sock->m_opts.m_st_rcv_flow_control_on))
 
 5446    return numeric_limits<size_t>::max();
 
 5451  size_t rcv_buf_size;
 
 5454    rcv_buf_size = sock->m_rcv_buf.data_size();
 
 5458  if (sock->rexmit_on())
 
 5460    rcv_buf_size += sock->m_rcv_reassembly_q_data_size; 
 
 5463  const size_t max_rcv_buf_size = sock->max_block_size_multiple(sock->m_opts.m_st_rcv_buf_max_size);
 
 5465  return (max_rcv_buf_size > rcv_buf_size) ? (max_rcv_buf_size - rcv_buf_size) : 0;
 
 5487                  "was completely closed before asynchronous " 
 5488                  "receive_emptied_rcv_buf_while_disconnecting() could proceed.");
 
 5506                   "is gracefully closing, and Receive buffer is empty, but graceful close itself not yet finished.");
 
 5512  if (!sock->m_rcv_buf.empty())
 
 5517                   "is gracefully closing, but Receive buffer has data again.");
 
 5524                 "is gracefully closing, and Receive buffer is now empty.  Ready to permanently close.");
 
 5534  using boost::adopt_lock;
 
 5578      *err_code = sock->m_disconnect_cause;
 
 5601                                        const Error_code& err_code, 
bool defer_delta_check)
 
 5603  using boost::lexical_cast;
 
 5614    FLOW_LOG_INFO(
"Closing and destroying [" << sock << 
"] abruptly.");
 
 5619    FLOW_LOG_INFO(
"Closing and destroying [" << sock << 
"] after graceful close.");
 
 5639    sock->m_info_on_close.m_disconnect_cause = err_code;
 
 5664  const auto erased = 1 ==
 
 5670  if (!sock->m_active_connect)
 
 5678    Port_to_server_map::const_iterator port_to_server_it = 
m_servs.find(sock->m_local_port);
 
 5679    if (port_to_server_it != 
m_servs.end()) 
 
 5694  if (sock->m_active_connect)
 
 5698    assert(!return_err_code);
 
 5713  if (inserted_rd || inserted_wr) 
 
 5721                                                const Error_code& err_code, 
bool defer_delta_check)
 
 5732  auto syn = Low_lvl_packet::create_uninit_packet<Syn_packet>(
get_logger());
 
 5734  syn->m_init_seq_num = sock->m_snd_init_seq_num;
 
 5738  syn->m_serialized_metadata = 
static_cast<const Blob&
>(sock->m_serialized_metadata);
 
 5745  auto syn_ack = Low_lvl_packet::create_uninit_packet<Syn_ack_packet>(
get_logger());
 
 5747  syn_ack->m_init_seq_num = sock->m_snd_init_seq_num;
 
 5749  syn_ack->m_packed.m_security_token = sock->m_security_token;
 
 5751  syn_ack->m_packed.m_rcv_wnd = sock->m_rcv_last_sent_rcv_wnd;
 
 5757                                          boost::shared_ptr<const Syn_ack_packet>& syn_ack)
 
 5760  auto syn_ack_ack = Low_lvl_packet::create_uninit_packet<Syn_ack_ack_packet>(
get_logger());
 
 5763  syn_ack_ack->m_packed.m_security_token = syn_ack->m_packed.m_security_token;
 
 5765  syn_ack_ack->m_packed.m_rcv_wnd = sock->m_rcv_last_sent_rcv_wnd = 
sock_rcv_wnd(sock);
 
 5773  using boost::chrono::milliseconds;
 
 5774  using boost::chrono::duration_cast;
 
 5775  using std::make_pair;
 
 5777  using std::numeric_limits;
 
 5784  vector<Peer_socket::Individual_ack::Ptr>& pending_acks = sock->m_rcv_pending_acks;
 
 5786  if (sys_err_code == boost::asio::error::operation_aborted)
 
 5789                   "pending acknowledgment count [" << pending_acks.size() << 
"].");
 
 5794  FLOW_LOG_TRACE(
"Delayed [ACK] timer [" << sock << 
"] triggered, or ACK forced; " 
 5795                 "pending acknowledgment count [" << pending_acks.size() << 
"].");
 
 5808    FLOW_LOG_TRACE(
"Delayed [ACK] timer [" << sock << 
"] triggered, " 
 5809                   "but socket already in inapplicable state [" << sock->m_int_state << 
"].  Ignoring.");
 
 5814  if (pending_acks.empty())
 
 5819                     "but socket has no pending acknowledgments.  This is likely an internal bug.  Ignoring.");
 
 5846  const size_t& rcv_wnd = sock->m_rcv_last_sent_rcv_wnd = 
sock_rcv_wnd(sock);
 
 5848  auto ack = Low_lvl_packet::create_uninit_packet<Ack_packet>(
get_logger());
 
 5849  ack->m_rcv_wnd = rcv_wnd; 
 
 5854  if (sock->rexmit_on())
 
 5870      sock->m_rcv_stats.sent_low_lvl_ack_packet(
false);
 
 5873      ack = Low_lvl_packet::create_uninit_packet<Ack_packet>(
get_logger());
 
 5874      ack->m_rcv_wnd = rcv_wnd; 
 
 5902    if (delay.count() < 0)
 
 5907                       "delay for packet [" << seq_num << 
", ...) is " 
 5908                       "negative: [" << delay << 
"]; using zero.");
 
 5909      delay = Fine_duration::zero();
 
 5924    if (uint64_t(pkt_delay.count()) > uint64_t(MAX_DELAY_VALUE))
 
 5929                       "delay for packet [" << seq_num << 
", ...) is [" << pkt_delay << 
"]; overflow; " 
 5930                       "using max value [" << MAX_DELAY_VALUE << 
"] units.");
 
 5936    if (sock->rexmit_on())
 
 5938      ack->m_rcv_acked_packets_rexmit_on_out.push_back
 
 5940                                              ind_ack->m_rexmit_id,
 
 5945      ack->m_rcv_acked_packets_rexmit_off_out.push_back
 
 5949    size_est_so_far += size_est_inc;
 
 5952    sock->m_rcv_stats.sent_individual_ack();
 
 5956  if (size_est_so_far != 0)
 
 5962  sock->m_rcv_stats.sent_low_lvl_ack_packet(
false);
 
 5965  pending_acks.clear();
 
 5968  sock->m_rcv_stats.current_pending_to_ack_packets(0);
 
 5976  return Socket_id{ sock->remote_endpoint(), sock->local_port() };
 
 5982  return !(sock->m_snd_rexmit_q.empty() && sock->m_snd_buf.empty());
 
 5998  return sock->m_snd_buf.data_size() + sock->max_block_size()
 
 5999           <= sock->opt(sock->m_opts.m_st_snd_buf_max_size);
 
 6005  return !sock->m_rcv_buf.empty();
 
 6013                 sock->m_int_state << 
"] to [" << new_state << 
"].");
 
 6014  sock->m_int_state = new_state;
 
 6023  sock->m_state = state;
 
 6026    sock->m_open_sub_state = open_sub_state;
 
 6043  sock->m_disconnect_cause = disconnect_cause;
 
 6065  assert(sock->m_disconnect_cause);
 
 6075  sock->m_rcv_buf.clear();
 
 6076  sock->m_snd_buf.clear();
 
 6077  sock->m_rcv_packets_with_gaps.clear();
 
 6078  sock->m_rcv_reassembly_q_data_size = 0;
 
 6079  sock->m_snd_flying_pkts_by_sent_when.clear();
 
 6080  sock->m_snd_flying_pkts_by_seq_num.clear();
 
 6081  sock->m_snd_rexmit_q.clear();
 
 6082  sock->m_serialized_metadata.make_zero(); 
 
 6083  sock->m_rcv_syn_rcvd_data_q.clear();
 
 6084  sock->m_rcv_pending_acks.clear();
 
 6085  sock->m_rcv_acked_packets.clear();
 
 6086  sock->m_snd_pacing_data.m_packet_q.clear();
 
 6091  sock->m_snd_cong_ctl.reset();
 
 6093  sock->m_snd_bandwidth_estimator.reset();
 
 6126  sock->m_opts = opts;
 
 6136#define VALIDATE_STATIC_OPTION(ARG_opt) \ 
 6137  validate_static_option(opts.ARG_opt, prev_opts->ARG_opt, #ARG_opt, err_code) 
 6138#define VALIDATE_CHECK(ARG_check) \ 
 6139  validate_option_check(ARG_check, #ARG_check, err_code) 
 6161  using boost::chrono::seconds;
 
 6162  using std::numeric_limits;
 
 6173    const bool static_ok
 
 6174      = VALIDATE_STATIC_OPTION(m_st_max_block_size) &&
 
 6175        VALIDATE_STATIC_OPTION(m_st_connect_retransmit_period) &&
 
 6176        VALIDATE_STATIC_OPTION(m_st_connect_retransmit_timeout) &&
 
 6177        VALIDATE_STATIC_OPTION(m_st_snd_buf_max_size) &&
 
 6178        VALIDATE_STATIC_OPTION(m_st_rcv_buf_max_size) &&
 
 6179        VALIDATE_STATIC_OPTION(m_st_rcv_flow_control_on) &&
 
 6180        VALIDATE_STATIC_OPTION(m_st_rcv_buf_max_size_slack_percent) &&
 
 6181        VALIDATE_STATIC_OPTION(m_st_rcv_buf_max_size_to_advertise_percent) &&
 
 6182        VALIDATE_STATIC_OPTION(m_st_rcv_max_packets_after_unrecvd_packet_ratio_percent) &&
 
 6183        VALIDATE_STATIC_OPTION(m_st_delayed_ack_timer_period) &&
 
 6184        VALIDATE_STATIC_OPTION(m_st_max_full_blocks_before_ack_send) &&
 
 6185        VALIDATE_STATIC_OPTION(m_st_rexmit_on) &&
 
 6186        VALIDATE_STATIC_OPTION(m_st_max_rexmissions_per_packet) &&
 
 6187        VALIDATE_STATIC_OPTION(m_st_init_drop_timeout) &&
 
 6188        VALIDATE_STATIC_OPTION(m_st_snd_pacing_enabled) &&
 
 6189        VALIDATE_STATIC_OPTION(m_st_snd_bandwidth_est_sample_period_floor) &&
 
 6190        VALIDATE_STATIC_OPTION(m_st_cong_ctl_strategy) &&
 
 6191        VALIDATE_STATIC_OPTION(m_st_cong_ctl_init_cong_wnd_blocks) &&
 
 6192        VALIDATE_STATIC_OPTION(m_st_cong_ctl_max_cong_wnd_blocks) &&
 
 6193        VALIDATE_STATIC_OPTION(m_st_cong_ctl_cong_wnd_on_drop_timeout_blocks) &&
 
 6194        VALIDATE_STATIC_OPTION(m_st_cong_ctl_classic_wnd_decay_percent) &&
 
 6195        VALIDATE_STATIC_OPTION(m_st_drop_packet_exactly_after_drop_timeout) &&
 
 6196        VALIDATE_STATIC_OPTION(m_st_drop_all_on_drop_timeout) &&
 
 6197        VALIDATE_STATIC_OPTION(m_st_out_of_order_ack_restarts_drop_timer);
 
 6208  const bool checks_ok
 
 6239#undef VALIDATE_CHECK 
 6240#undef VALIDATE_STATIC_OPTION 
 6247  using boost::adopt_lock;
 
 6289  using boost::lexical_cast;
 
 6294  stats->
m_rcv = sock->m_rcv_stats.stats();
 
 6295  stats->
m_snd = sock->m_snd_stats.stats();
 
 6317    = sock->m_rcv_syn_rcvd_data_q.empty() ? 0 : sock->m_rcv_syn_rcvd_data_cumulative_size;
 
 6333    = util::to_mbit_per_sec<Send_bandwidth_estimator::Time_unit>
 
 6334        (sock->m_snd_bandwidth_estimator->bandwidth_bytes_per_time());
 
 6351  FLOW_LOG_INFO(
"[=== Socket state for [" << sock << 
"]. ===\n" << stats);
 
 6358  FLOW_LOG_INFO(
"=== Socket state for [" << sock << 
"]. ===]");
 
 6380  *seq_num += data_size;
 
 6383template<
typename Packet_map_iter>
 
 6390    *seq_num_start = seq_num_start_cref;
 
 6394    *seq_num_end = seq_num_start_cref;
 
 6402  return ++sock->m_snd_last_order_num;
 
 6408  return sock_create_forward_plus_ctor_args<Peer_socket>(opts);
 
 6418         << 
"NetFlow_socket " 
 6420         "@" << 
static_cast<const void*
>(sock))
 
 6421      : (os << 
"NetFlow_socket@null");
 
 6429#define STATE_TO_CASE_STATEMENT(ARG_state) \ 
 6430  case Peer_socket::Int_state::S_##ARG_state: \ 
 6431    return os << #ARG_state 
 6440    STATE_TO_CASE_STATEMENT(CLOSED);
 
 6441    STATE_TO_CASE_STATEMENT(SYN_SENT);
 
 6442    STATE_TO_CASE_STATEMENT(SYN_RCVD);
 
 6443    STATE_TO_CASE_STATEMENT(ESTABLISHED);
 
 6446#undef STATE_TO_CASE_STATEMENT 
const Component & get_log_component() const
Returns reference to the stored Component object, particularly as many FLOW_LOG_*() macros expect.
Logger * get_logger() const
Returns the stored Logger pointer, particularly as many FLOW_LOG_*() macros expect.
Interface that the user should implement, passing the implementing Logger into logging classes (Flow'...
static Congestion_control_strategy * create_strategy(Strategy_choice strategy_choice, log::Logger *logger_ptr, Peer_socket::Const_ptr sock)
Factory method that, given an enum identifying the desired strategy, allocates the appropriate Conges...
static Ptr create_drop_timer(log::Logger *logger_ptr, util::Task_engine *node_task_engine, Fine_duration *sock_drop_timeout, Peer_socket::Const_ptr &&sock, const Function< void(const Error_code &err_code)> &timer_failure, const Function< void(bool drop_all_packets)> &timer_fired)
Constructs Drop_timer and returns a ref-counted pointer wrapping it.
@ S_PEER_SOCKET_WRITABLE
Event type specifying the condition of interest wherein a target Peer_socket sock is such that callin...
@ S_PEER_SOCKET_READABLE
Event type specifying the condition of interest wherein a target Peer_socket sock is such that callin...
An object of this class is a single Flow-protocol networking node, in the sense that: (1) it has a di...
void snd_flying_pkts_updated(Peer_socket::Ptr sock, Peer_socket::Sent_pkt_ordered_by_when_const_iter pkt_begin, const Peer_socket::Sent_pkt_ordered_by_when_const_iter &pkt_end, bool added)
Updates Peer_socket::m_snd_flying_bytes according to an operation (add packets, remove packets) calle...
bool categorize_individual_ack(const Socket_id &socket_id, Peer_socket::Ptr sock, Ack_packet::Individual_ack::Const_ptr ack, bool *dupe_or_late, Peer_socket::Sent_pkt_ordered_by_when_iter *acked_pkt_it)
Helper of perform_accumulated_on_recv_tasks() that categorizes the given accumulated individual ackno...
void handle_data_to_established(const Socket_id &socket_id, Peer_socket::Ptr sock, boost::shared_ptr< Data_packet > packet, bool syn_rcvd_qd_packet)
Handles a just-deserialized, just-demultiplexed, low-level DATA packet delivered to the given peer so...
bool sock_is_writable(const boost::any &sock_as_any) const
Returns true if and only if calling sock->send() with at least some arguments would return either non...
Peer_socket_info sock_info(Peer_socket::Const_ptr sock)
Implementation of sock->info() for socket sock in all cases except when sock->state() == Peer_socket:...
void receive_wnd_updated(Peer_socket::Ptr sock)
Placed by receive() onto W if it has dequeued data from Receive buffer and given it to the user,...
void sock_track_new_data_after_gap_rexmit_off(Peer_socket::Ptr sock, boost::shared_ptr< const Data_packet > packet, size_t data_size, bool *slide, size_t *slide_size)
Helper for handle_data_to_established() that aims to register the given DATA packet as an out-of-orde...
bool sock_data_to_reassembly_q_unless_overflow(Peer_socket::Ptr sock, boost::shared_ptr< Data_packet > packet)
Helper for handle_data_to_established() that aims to register the given DATA packet as an out-of-orde...
static bool ensure_sock_open(Socket_ptr sock, Error_code *err_code)
Helper method that checks whether the given Peer_socket or Server_socket is CLOSED; if so,...
void send_worker(Peer_socket::Ptr sock, bool defer_delta_check)
Thread W implemention of send(): synchronously or asynchronously send the contents of sock->m_snd_buf...
void handle_accumulated_acks(const Socket_id &socket_id, Peer_socket::Ptr sock)
Helper of perform_accumulated_on_recv_tasks() that handles any incoming acknowledgments and rcv_wnd u...
void async_rcv_wnd_recovery(Peer_socket::Ptr sock, size_t rcv_wnd)
receive_wnd_updated() helper that continues rcv_wnd recovery: that is, sends unsolicited ACK with a r...
void log_accumulated_acks(Peer_socket::Const_ptr sock) const
Helper of handle_accumulated_acks() that logs the about-to-be-handled accumulated individual acknowle...
void sock_free_memory(Peer_socket::Ptr sock)
Helper that clears all non-O(1)-space data structures stored inside sock.
void sock_load_info_struct(Peer_socket::Const_ptr sock, Peer_socket_info *stats) const
Given a Peer_socket, copies all stats info (as available via Peer_socket::info()) from various struct...
void log_snd_window(Peer_socket::Const_ptr sock, bool force_verbose_info_logging=false) const
Logs TRACE or DATA messages thats show the detailed state of the sending sequence number space.
void send_worker_check_state(Peer_socket::Ptr sock)
Helper placed by send() onto W to invoke send_worker() but ensures that the socket has not entered so...
size_t m_low_lvl_max_buf_size
OS-reported m_low_lvl_sock UDP receive buffer maximum size, obtained right after we OS-set that setti...
Non_blocking_func_ret_type sync_op(typename Socket::Ptr sock, const Function< Non_blocking_func_ret_type()> &non_blocking_func, Non_blocking_func_ret_type would_block_ret_val, Event_set::Event_type ev_type, const Fine_time_pt &wait_until, Error_code *err_code)
Implementation of core blocking transfer methods, namely Peer_socket::sync_send(),...
size_t sock_max_packets_after_unrecvd_packet(Peer_socket::Const_ptr sock) const
Computes and returns the max size for Peer_socket::m_rcv_packets_with_gaps for sock.
Peer_socket::Sent_pkt_ordered_by_when_iter categorize_pkts_as_dropped_on_acks(Peer_socket::Ptr sock, const boost::unordered_set< Peer_socket::order_num_t > &flying_now_acked_pkts)
Helper of perform_accumulated_on_recv_tasks() that determines the range of In-flight packets that sho...
void rcv_get_first_gap_info(Peer_socket::Const_ptr sock, bool *first_gap_exists, Sequence_number *seq_num_after_first_gap)
Helper for handle_data_to_established() that gets simple info about Peer_socket::m_rcv_packets_with_g...
bool snd_deqable(Peer_socket::Const_ptr sock) const
Return true if and only if there are enough data either in Peer_socket::m_snd_rexmit_q of sock (if re...
void cancel_timers(Peer_socket::Ptr sock)
Cancel any timers and scheduled tasks active in the given socket.
void sock_rcv_buf_now_readable(Peer_socket::Ptr sock, bool syn_rcvd_qd_packet)
Helper for handle_data_to_established() that assumes the given's socket Receive buffer is currently r...
void snd_flying_pkts_erase_one(Peer_socket::Ptr sock, Peer_socket::Sent_pkt_ordered_by_when_iter pkt_it)
Erases (for example if considered Acknowledged or Dropped) a packet struct from the "scoreboard" (Pee...
Opt_type opt(const Opt_type &opt_val_ref) const
Obtain a copy of the value of a given option in a thread-safe manner.
bool sock_validate_options(const Peer_socket_options &opts, const Peer_socket_options *prev_opts, Error_code *err_code) const
Analogous to validate_options() but checks per-socket options instead of per-Node options.
void handle_accumulated_pending_acks(const Socket_id &socket_id, Peer_socket::Ptr sock)
Helper of perform_accumulated_on_recv_tasks() that handles any additional individual outgoing acknowl...
void receive_wnd_recovery_data_received(Peer_socket::Ptr sock)
Pertaining to the async_rcv_wnd_recovery() mechanism, this handles the event that we have received an...
static Peer_socket::order_num_t sock_get_new_snd_order_num(Peer_socket::Ptr sock)
Returns the "order number" to use for Peer_socket::Sent_packet::Sent_when structure corresponding to ...
Peer_socket::Ptr sync_connect_impl(const Remote_endpoint &to, const Fine_duration &max_wait, const boost::asio::const_buffer &serialized_metadata, Error_code *err_code, const Peer_socket_options *opts)
Implementation core of sync_connect*() that gets rid of templated or missing arguments thereof.
size_t max_block_size() const
The maximum number of bytes of user data per received or sent block on connections generated from thi...
void snd_flying_pkts_push_one(Peer_socket::Ptr sock, const Sequence_number &seq_num, Peer_socket::Sent_packet::Ptr sent_pkt)
Adds a new packet struct (presumably representing packet to be sent shortly) to the "scoreboard" (Pee...
Syn_packet::Ptr create_syn(Peer_socket::Const_ptr sock)
Helper that creates a new SYN packet object to the extent that is suitable for immediately passing to...
void close_abruptly(Peer_socket::Ptr sock, Error_code *err_code)
Implementation of non-blocking sock->close_abruptly() for socket sock in all cases except when sock->...
void async_low_lvl_ack_send(Peer_socket::Ptr sock, const Error_code &sys_err_code=Error_code())
Sends a low-level ACK packet, with all accumulated in Peer_socket::m_rcv_pending_acks of sock individ...
static void get_seq_num_range(const Packet_map_iter &packet_it, Sequence_number *seq_num_start, Sequence_number *seq_num_end)
Given an iterator into a Peer_socket::Sent_pkt_by_sent_when_map or Peer_socket::Recv_pkt_map,...
Peer_socket::Ptr sync_connect_with_metadata(const Remote_endpoint &to, const boost::chrono::duration< Rep, Period > &max_wait, const boost::asio::const_buffer &serialized_metadata, Error_code *err_code=0, const Peer_socket_options *opts=0)
A combination of sync_connect() and connect_with_metadata() (blocking connect, with supplied metadata...
Syn_ack_packet::Ptr create_syn_ack(Peer_socket::Const_ptr sock)
Like create_syn() but for SYN_ACK.
virtual Peer_socket * sock_create(const Peer_socket_options &opts)
Internal factory used for ALL Peer_socket objects created by this Node (including subclasses).
bool snd_buf_enqable(Peer_socket::Const_ptr sock) const
Return true if and only if there is enough free space in Peer_socket::m_snd_buf of sock to enqueue an...
bool can_send(Peer_socket::Const_ptr sock) const
Answers the perennial question of congestion and flow control: assuming there is a DATA packet to sen...
void sock_slide_rcv_next_seq_num(Peer_socket::Ptr sock, size_t slide_size, bool reassembly_in_progress)
Helper for handle_data_to_established() that aims to register a set of received DATA packet data as i...
void sock_log_detail(Peer_socket::Const_ptr sock) const
Logs a verbose state report for the given socket.
static void advance_seq_num(Sequence_number *seq_num, boost::shared_ptr< const Data_packet > data)
Assuming *seq_num points to the start of data.m_data, increments *seq_num to point to the datum just ...
static Sequence_number snd_past_last_flying_datum_seq_num(Peer_socket::Const_ptr sock)
Obtain the sequence number for the datum just past the last (latest) In-flight (i....
Peer_socket::Ptr connect(const Remote_endpoint &to, Error_code *err_code=0, const Peer_socket_options *opts=0)
Initiates an active connect to the specified remote Flow server.
void event_set_all_check_delta(bool defer_delta_check)
For each WAITING Event_set within the Node: checks for any events that hold, and if any do hold,...
void serv_peer_socket_closed(Server_socket::Ptr serv, Peer_socket::Ptr sock)
Records that a Server_socket-contained (i.e., currently un-established, or established but not yet ac...
bool rcv_buf_deqable(Peer_socket::Const_ptr sock) const
Return true if and only if there are enough data in Peer_socket::m_rcv_buf of sock to give the user s...
void async_acknowledge_packet(Peer_socket::Ptr sock, const Sequence_number &seq_num, unsigned int rexmit_id, size_t data_size)
Causes an acknowledgment of the given received packet to be included in a future Ack_packet sent to t...
Socket_id_to_socket_map m_socks
The peer-to-peer connections this Node is currently tracking.
Peer_socket::Options_lock Options_lock
Short-hand for lock that acquires exclusive access to an Options_mutex.
static Socket_id socket_id(Peer_socket::Const_ptr sock)
Constructs the socket pair (connection ID) for the given socket.
void handle_syn_ack_to_syn_sent(const Socket_id &socket_id, Peer_socket::Ptr sock, boost::shared_ptr< const Syn_ack_packet > syn_ack)
Handles a just-deserialized, just-demultiplexed low-level SYN_ACK packet delivered to the given peer ...
size_t send(Peer_socket::Ptr sock, const Function< size_t(size_t max_data_size)> &snd_buf_feed_func, Error_code *err_code)
Implementation of non-blocking sock->send() for socket sock in all cases except when sock->state() ==...
void sock_set_int_state(Peer_socket::Ptr sock, Peer_socket::Int_state new_state)
Sets internal state of given socket to the given state and logs a TRACE message about it.
bool sock_is_readable(const boost::any &sock_as_any) const
Returns true if and only if calling sock->receive() with at least some arguments would return either ...
bool sock_data_to_rcv_buf_unless_overflow(Peer_socket::Ptr sock, boost::shared_ptr< Data_packet > packet)
Helper for handle_data_to_established() that aims to pass the payload of the given DATA packet to the...
bool sock_set_options(Peer_socket::Ptr sock, const Peer_socket_options &opts, Error_code *err_code)
Thread W implementation of sock->set_options().
bool running() const
Returns true if and only if the Node is operating.
Port_to_server_map m_servs
The server sockets this Node is currently tracking.
Event_set::Ev_type_to_socks_map m_sock_events
All sockets that have been detected to be "ready" (by the Event_set doc header definition) at any poi...
static const uint8_t S_DEFAULT_CONN_METADATA
Type and value to supply as user-supplied metadata in SYN, if user chooses to use [[a]sync_]connect()...
void setup_drop_timer(const Socket_id &socket_id, Peer_socket::Ptr sock)
Creates a new Drop Timer and saves it to sock->m_snd_drop_timer.
void handle_ack_to_established(Peer_socket::Ptr sock, boost::shared_ptr< const Ack_packet > ack)
Handles a just-deserialized, just-demultiplexed, low-level ACK packet delivered to the given peer soc...
Peer_socket::Ptr sync_connect(const Remote_endpoint &to, const boost::chrono::duration< Rep, Period > &max_wait, Error_code *err_code=0, const Peer_socket_options *opts=0)
The blocking (synchronous) version of connect().
void handle_syn_ack_to_established(Peer_socket::Ptr sock, boost::shared_ptr< const Syn_ack_packet > syn_ack)
Handles a just-deserialized, just-demultiplexed, duplicate (equal to already-received SYN_ACK) low-le...
void setup_connection_timers(const Socket_id &socket_id, Peer_socket::Ptr sock, bool initial)
Assuming we've just sent SYN or SYN_ACK, sets up an asynchronous scheduled task to fire within some a...
void log_rcv_window(Peer_socket::Const_ptr sock, bool force_verbose_info_logging=false) const
Logs TRACE or DATA messages that show the detailed state of the receiving sequence number space.
size_t sock_rcv_wnd(Peer_socket::Const_ptr sock) const
Computes and returns the currently correct rcv_wnd value; that is the amount of space free in Receive...
void connect_worker(const Remote_endpoint &to, const boost::asio::const_buffer &serialized_metadata, const Peer_socket_options *opts, Peer_socket::Ptr *sock)
Thread W implementation of connect().
bool drop_pkts_on_acks(Peer_socket::Ptr sock, const Peer_socket::Sent_pkt_ordered_by_when_iter &last_dropped_pkt_it, size_t *cong_ctl_dropped_pkts, size_t *cong_ctl_dropped_bytes, size_t *dropped_pkts, size_t *dropped_bytes, std::vector< Peer_socket::order_num_t > *pkts_marked_to_drop)
Helper of perform_accumulated_on_recv_tasks() that acts on the determination made by categorize_pkts_...
static const Peer_socket::Sent_packet::ack_count_t S_MAX_LATER_ACKS_BEFORE_CONSIDERING_DROPPED
For a given unacknowledged sent packet P, the maximum number of times any individual packet with high...
Error_code sock_categorize_data_to_established(Peer_socket::Ptr sock, boost::shared_ptr< const Data_packet > packet, bool *dupe, bool *slide, size_t *slide_size)
Helper for handle_data_to_established() that categorizes the DATA packet received as either illegal; ...
void async_sock_low_lvl_rst_send(Peer_socket::Ptr sock)
Sends an RST to the other side of the given socket asynchronously when possible.
void sock_set_state(Peer_socket::Ptr sock, Peer_socket::State state, Peer_socket::Open_sub_state open_sub_state=Peer_socket::Open_sub_state::S_CONNECTED)
Sets Peer_socket::m_state and Peer_socket::m_open_sub_state.
void receive_emptied_rcv_buf_while_disconnecting(Peer_socket::Ptr sock)
Placed by receive() onto W during a graceful close, after the Receive buffer had been emptied by the ...
void sock_disconnect_detected(Peer_socket::Ptr sock, const Error_code &disconnect_cause, bool close)
Records that thread W shows underlying connection is broken (graceful termination,...
size_t receive(Peer_socket::Ptr sock, const Function< size_t()> &rcv_buf_consume_func, Error_code *err_code)
Implementation of non-blocking sock->receive() for socket sock in all cases except when sock->state()...
void handle_connection_rexmit_timer_event(const Socket_id &socket_id, Peer_socket::Ptr sock)
Handles the triggering of the retransmit timer wait set up by setup_connection_timers(); it will re-s...
Node_options m_opts
This Node's global set of options.
void close_connection_immediately(const Socket_id &socket_id, Peer_socket::Ptr sock, const Error_code &err_code, bool defer_delta_check)
A thread W method that handles the transition of the given socket from OPEN (any sub-state) to CLOSED...
void sock_disconnect_completed(Peer_socket::Ptr sock)
While in S_OPEN+S_DISCONNECTING state (i.e., after beginning a graceful close with sock_disconnect_de...
Fine_duration compute_rtt_on_ack(Peer_socket::Sent_packet::Const_ptr flying_pkt, const Fine_time_pt &time_now, Ack_packet::Individual_ack::Const_ptr ack, const Peer_socket::Sent_packet::Sent_when **sent_when) const
Helper of perform_accumulated_on_recv_tasks() that computes the RTT implied by a given individual ack...
void async_low_lvl_syn_ack_ack_send(const Peer_socket::Ptr &sock, boost::shared_ptr< const Syn_ack_packet > &syn_ack)
Helper to create, fully fill out, and asynchronously send via async_sock_low_lvl_packet_send_paced() ...
Peer_socket::Ptr connect_with_metadata(const Remote_endpoint &to, const boost::asio::const_buffer &serialized_metadata, Error_code *err_code=0, const Peer_socket_options *opts=0)
Same as connect() but sends, as part of the connection handshake, the user-supplied metadata,...
void new_round_trip_time_sample(Peer_socket::Ptr sock, Fine_duration round_trip_time)
Handles a just-computed new RTT (round trip time) measurement for an individual packet earlier sent: ...
void async_sock_low_lvl_packet_send_paced(const Peer_socket::Ptr &sock, Low_lvl_packet::Ptr &&packet)
Begins the process of asynchronously sending the given low-level packet to the remote Node specified ...
bool ok_to_rexmit_or_close(Peer_socket::Ptr sock, const Peer_socket::Sent_pkt_ordered_by_when_iter &pkt_it, bool defer_delta_check)
Checks whether the given sent packet has been retransmitted the maximum number of allowed times; if s...
util::Task_engine m_task_engine
The main loop engine, functioning in the single-threaded-but-asynchronous callback-based "reactor" st...
Port_space m_ports
Flow port space for both client and server sockets. All threads may access this.
void rst_and_close_connection_immediately(const Socket_id &socket_id, Peer_socket::Ptr sock, const Error_code &err_code, bool defer_delta_check)
Asynchronously send RST to the other side of the given socket and close_connection_immediately().
void drop_timer_action(Peer_socket::Ptr sock, bool drop_all_packets)
Handles a Drop_timer (Peer_socket::m_snd_drop_timer) event in ESTABLISHED state by dropping the speci...
A class that keeps a Peer_socket_receive_stats data store, includes methods to conveniently accumulat...
void good_data_accepted_packet(size_t data)
Indicates good_data_packet(), and these data are not dropped (so either delivered into Receive buffer...
void good_data_dropped_reassembly_q_overflow_packet(size_t data)
Indicates good_data_packet(), but these data are dropped due to insufficient Receive reassembly queue...
void presumed_dropped_data(size_t data)
Indicates that one or more unreceived data packets have been considered Dropped due to the number of ...
void good_data_delivered_packet(size_t data)
Indicates good_data_accepted_packet(), and these data are delivered into Receive buffer (either immed...
void late_or_dupe_to_send_ack_packet(size_t data)
Indicates that late_or_dupe_data_packet() and therefore an individual acknowledgment for this packet ...
void total_data_packet(size_t data)
Indicates one DATA packet has been received on socket.
void good_to_send_ack_packet(size_t data)
Indicates that good_data_delivered_packet() and therefore an individual acknowledgment for this packe...
void good_data_packet(size_t data)
Indicates total_data_packet(), and these data are new and acceptable into Receive buffer assuming the...
void error_data_packet(size_t data)
Indicates total_data_packet(), but there is some error about the sequence numbers so that they are no...
void buffer_fed(size_t size)
Indicates the Receive buffer was enqueued with data from network (so its data_size() increased).
void good_data_first_qd_packet(size_t data)
Indicates good_data_accepted_packet(), and these data are, upon receipt, queued for reassembly (not i...
void good_data_dropped_buf_overflow_packet(size_t data)
Indicates good_data_packet(), but these data are dropped due to insufficient Receive buffer space.
void late_or_dupe_data_packet(size_t data)
Indicates total_data_packet(), but the arrived data have either already been received before or (more...
A peer (non-server) socket operating over the Flow network protocol, with optional stream-of-bytes an...
State m_state
See state().
size_t get_connect_metadata(const boost::asio::mutable_buffer &buffer, Error_code *err_code=0) const
Obtains the serialized connect metadata, as supplied by the user during the connection handshake.
size_t max_block_size_multiple(const size_t &opt_val_ref, const unsigned int *inflate_pct_val_ptr=0) const
Returns the smallest multiple of max_block_size() that is >= the given option value,...
bool sync_send_reactor_pattern_impl(const Fine_time_pt &wait_until, Error_code *err_code)
Helper similar to sync_send_impl() but for the nullptr_t versions of sync_send().
std::map< Sequence_number, Sent_pkt_ordered_by_when_iter > Sent_pkt_by_seq_num_map
Short-hand for m_snd_flying_pkts_by_seq_num type; see that data member.
bool sync_receive_reactor_pattern_impl(const Fine_time_pt &wait_until, Error_code *err_code)
Helper similar to sync_receive_impl() but for the nullptr_t versions of sync_receive().
Remote_endpoint m_remote_endpoint
See remote_endpoint(). Should be set before user gets access to *this and not changed afterwards.
size_t sync_receive(const Mutable_buffer_sequence &target, const boost::chrono::duration< Rep, Period > &max_wait, Error_code *err_code=0)
Blocking (synchronous) version of receive().
util::Blob m_serialized_metadata
If !m_active_connect, this contains the serialized metadata that the user supplied on the other side ...
size_t node_sync_send(const Function< size_t(size_t max_data_size)> &snd_buf_feed_func_or_empty, const Fine_time_pt &wait_until, Error_code *err_code)
This is to sync_send() as node_send() is to send().
Error_code m_disconnect_cause
The Error_code causing disconnection (if one has occurred or is occurring) on this socket; otherwise ...
Peer_socket(log::Logger *logger_ptr, util::Task_engine *task_engine, const Peer_socket_options &opts)
Constructs object; initializes most values to well-defined (0, empty, etc.) but not necessarily meani...
Sequence_number m_rcv_init_seq_num
The Initial Sequence Number (ISN) contained in the original Syn_packet or Syn_ack_packet we received.
const Remote_endpoint & remote_endpoint() const
Intended other side of the connection (regardless of success, failure, or current State).
State
State of a Peer_socket.
@ S_OPEN
Future reads or writes may be possible. A socket in this state may be Writable or Readable.
@ S_CLOSED
Neither future reads nor writes are possible, AND Node has disowned the Peer_socket.
Open_sub_state
The sub-state of a Peer_socket when state is State::S_OPEN.
@ S_CONNECTED
This Peer_socket was created through a passive connect (Node::accept() and the like) or an active con...
@ S_CONNECTING
This Peer_socket was created through an active connect (Node::connect() and the like),...
@ S_DISCONNECTING
This Peer_socket was created through a passive connect (Node::accept() and the like) or an active con...
size_t sync_send(const Const_buffer_sequence &data, const boost::chrono::duration< Rep, Period > &max_wait, Error_code *err_code=0)
Blocking (synchronous) version of send().
~Peer_socket() override
Boring virtual destructor. Note that deletion is to be handled exclusively via shared_ptr,...
Error_code disconnect_cause() const
The error code that perviously caused state() to become State::S_CLOSED, or success code if state is ...
Sequence_number::seq_num_t order_num_t
Short-hand for order number type. 0 is reserved. Caution: Keep in sync with Drop_timer::packet_id_t.
flow_port_t local_port() const
The local Flow-protocol port chosen by the Node (if active or passive open) or user (if passive open)...
flow_port_t m_local_port
See local_port(). Should be set before user gets access to *this and not changed afterwards.
friend class Send_bandwidth_estimator
Stats modules have const access to all socket internals.
bool set_options(const Peer_socket_options &opts, Error_code *err_code=0)
Dynamically replaces the current options set (options()) with the given options set.
size_t node_send(const Function< size_t(size_t max_data_size)> &snd_buf_feed_func, Error_code *err_code)
Non-template helper for template send() that forwards the send() logic to Node::send().
bool rexmit_on() const
Whether retransmission is enabled on this connection.
size_t node_sync_receive(const Function< size_t()> &rcv_buf_consume_func_or_empty, const Fine_time_pt &wait_until, Error_code *err_code)
This is to sync_receive() as node_receive() is to receive().
util::Lock_guard< Mutex > Lock_guard
Short-hand for RAII lock guard of Mutex.
Int_state
The state of the socket (and the connection from this end's point of view) for the internal state mac...
@ S_ESTABLISHED
Public state is OPEN+CONNECTED; in our opinion the connection is established.
@ S_SYN_SENT
Public state is OPEN+CONNECTING; user requested active connect; we sent SYN and are awaiting response...
@ S_CLOSED
Closed (dead or new) socket.
@ S_SYN_RCVD
Public state is OPEN+CONNECTING; other side requested passive connect via SYN; we sent SYN_ACK and ar...
util::Lock_guard< Options_mutex > Options_lock
Short-hand for lock that acquires exclusive access to an Options_mutex.
void close_abruptly(Error_code *err_code=0)
Acts as if fatal error error::Code::S_USER_CLOSED_ABRUPTLY has been discovered on the connection.
size_t max_block_size() const
The maximum number of bytes of user data per received or sent packet on this connection.
Node * node() const
Node that produced this Peer_socket.
Peer_socket_info info() const
Returns a structure containing the most up-to-date stats about this connection.
Recvd_pkt_map::iterator Recvd_pkt_iter
Short-hand for m_rcv_packets_with_gaps iterator type.
Mutex m_mutex
This object's mutex.
Sent_pkt_by_sent_when_map::iterator Sent_pkt_ordered_by_when_iter
Short-hand for m_snd_flying_pkts_by_sent_when iterator type.
Sent_pkt_by_seq_num_map::const_iterator Sent_pkt_ordered_by_seq_const_iter
Short-hand for m_snd_flying_pkts_by_seq_num const iterator type.
Peer_socket_info m_info_on_close
This is the final set of stats collected at the time the socket was moved to S_CLOSED m_state.
bool ensure_open(Error_code *err_code) const
Helper that is equivalent to Node::ensure_sock_open(this, err_code).
Sent_pkt_by_sent_when_map::const_iterator Sent_pkt_ordered_by_when_const_iter
Short-hand for m_snd_flying_pkts_by_sent_when const iterator type.
Opt_type opt(const Opt_type &opt_val_ref) const
Analogous to Node::opt() but for per-socket options.
std::string bytes_blocks_str(size_t bytes) const
Helper that, given a byte count, returns a string with that byte count and the number of max_block_si...
Peer_socket_options m_opts
This socket's per-socket set of options.
Peer_socket_options options() const
Copies this socket's option set and returns that copy.
Options_mutex m_opts_mutex
The mutex protecting m_opts.
std::map< Sequence_number, boost::shared_ptr< Received_packet > > Recvd_pkt_map
Short-hand for m_rcv_packets_with_gaps type; see that data member.
Recvd_pkt_map::const_iterator Recvd_pkt_const_iter
Short-hand for m_rcv_packets_with_gaps const iterator type.
Open_sub_state m_open_sub_state
See state().
size_t node_receive(const Function< size_t()> &rcv_buf_consume_func, Error_code *err_code)
Non-template helper for template receive() that forwards the receive() logic to Node::receive().
State state(Open_sub_state *open_sub_state=0) const
Current State of the socket.
void return_port(flow_port_t port, Error_code *err_code)
Return a previously reserved port (of any type).
An internal net_flow sequence number identifying a piece of data.
void set_metadata(char num_line_id=0, const Sequence_number &zero_point=Sequence_number(), seq_num_delta_t multiple_size=0)
Updates the full set of metadata (used at least for convenient convention-based logging but not actua...
uint64_t seq_num_t
Raw sequence number type.
Internal net_flow class that implements a socket buffer, as used by Peer_socket for Send and Receive ...
void consume_buf_move(util::Blob *target_buf, size_t max_data_size)
Consumes (removes from the front of the internal byte buffer and returns them to the caller) a byte s...
size_t data_size() const
The total number of bytes of application-layer data stored in this object.
Properties of various container types.
typename Value_list::const_reverse_iterator Const_reverse_iterator
Type for reverse iterator pointing into an immutable structure of this type.
typename Value_list::reverse_iterator Reverse_iterator
Type for reverse iterator pointing into a mutable structure of this type.
std::pair< Iterator, bool > insert(Value const &key_and_mapped)
Attempts to insert the given key/mapped-value pair into the map.
static Ptr ptr_cast(const From_ptr &ptr_to_cast)
Provides syntactic-sugary way to perform a static_pointer_cast<> from a compatible smart pointer type...
boost::shared_ptr< Peer_socket > Ptr
Short-hand for ref-counted pointer to mutable values of type Target_type::element_type (a-la T*).
Const_target_ptr Const_ptr
Short-hand for ref-counted pointer to immutable values of type Target_type::element_type (a-la T cons...
Similar to ostringstream but allows fast read-only access directly into the std::string being written...
#define FLOW_ERROR_SYS_ERROR_LOG_WARNING()
Logs a warning about the (often errno-based or from a library) error code in sys_err_code.
#define FLOW_ERROR_LOG_ERROR(ARG_val)
Logs a warning about the given error code using FLOW_LOG_WARNING().
#define FLOW_ERROR_EXEC_AND_THROW_ON_ERROR(ARG_ret_type, ARG_function_name,...)
Narrow-use macro that implements the error code/exception semantics expected of most public-facing Fl...
#define FLOW_ERROR_EMIT_ERROR(ARG_val)
Sets *err_code to ARG_val and logs a warning about the error using FLOW_LOG_WARNING().
#define FLOW_ERROR_EMIT_ERROR_LOG_INFO(ARG_val)
Identical to FLOW_ERROR_EMIT_ERROR(), but the message logged has flow::log::Sev::S_INFO severity inst...
#define FLOW_LOG_DATA(ARG_stream_fragment)
Logs a DATA message into flow::log::Logger *get_logger() with flow::log::Component get_log_component(...
#define FLOW_LOG_INFO(ARG_stream_fragment)
Logs an INFO message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
#define FLOW_LOG_WITHOUT_CHECKING(ARG_sev, ARG_stream_fragment)
Identical to FLOW_LOG_WITH_CHECKING() but foregoes the filter (Logger::should_log()) check.
#define FLOW_LOG_WARNING(ARG_stream_fragment)
Logs a WARNING message into flow::log::Logger *get_logger() with flow::log::Component get_log_compone...
#define FLOW_LOG_WITH_CHECKING(ARG_sev, ARG_stream_fragment)
Logs a message of the specified severity into flow::log::Logger *get_logger() with flow::log::Compone...
#define FLOW_LOG_TRACE_WITHOUT_CHECKING(ARG_stream_fragment)
Logs a TRACE message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
#define FLOW_LOG_DATA_WITHOUT_CHECKING(ARG_stream_fragment)
Logs a DATA message into flow::log::Logger *get_logger() with flow::log::Component get_log_component(...
#define FLOW_LOG_TRACE(ARG_stream_fragment)
Logs a TRACE message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
Synchronicity
Enumeration indicating the manner in which asio_exec_ctx_post(), and various boost....
void asio_exec_ctx_post(log::Logger *logger_ptr, Execution_context *exec_ctx, Synchronicity synchronicity, Task &&task)
An extension of boost.asio's post() and dispatch() free function templates, this free function templa...
bool exec_void_and_throw_on_error(const Func &func, Error_code *err_code, util::String_view context)
Equivalent of exec_and_throw_on_error() for operations with void return type.
@ S_DATA
Message satisfies Sev::S_TRACE description AND contains variable-length structure (like packet,...
@ S_TRACE
Message indicates any condition that may occur with great frequency (thus verbose if logged).
@ S_INFO
Message indicates a not-"bad" condition that is not frequent enough to be of severity Sev::S_TRACE.
@ S_CONN_TIMEOUT
Other side did not complete connection handshake within the allowed time; perhaps no one is listening...
@ S_USER_CLOSED_ABRUPTLY
User code on this side abruptly closed connection; other side may be informed of this.
@ S_CONN_RESET_TOO_MANY_REXMITS
Connection reset because a packet has been retransmitted too many times.
@ S_SEQ_NUM_IMPLIES_CONNECTION_COLLISION
Other side has sent packet with sequence number that implies a port collision between two connections...
@ S_SEQ_NUM_ARITHMETIC_FAILURE
Other side has sent packets with inconsistent sequence numbers.
@ S_CONN_METADATA_TOO_LARGE
During connection user supplied metadata that is too large.
@ S_CANNOT_CONNECT_TO_IP_ANY
Cannot ask to connect to "any" IP address. Use specific IP address.
@ S_WAIT_USER_TIMEOUT
A blocking (sync_) or background-blocking (async_) operation timed out versus user-supplied time limi...
@ S_WAIT_INTERRUPTED
A blocking (sync_) or background-blocking (async_) operation was interrupted, such as by a signal.
@ S_EVENT_SET_CLOSED
Attempted operation on an event set, when that event set was closed.
@ S_INTERNAL_ERROR_PORT_COLLISION
Internal error: Ephemeral port double reservation allowed.
@ S_NODE_NOT_RUNNING
Node not running.
Flow module containing the API and implementation of the Flow network protocol, a TCP-inspired stream...
uint16_t flow_port_t
Logical Flow port type (analogous to a UDP/TCP port in spirit but in no way relevant to UDP/TCP).
const flow_port_t S_PORT_ANY
Special Flow port value used to indicate "invalid port" or "please pick a random available ephemeral ...
std::ostream & operator<<(std::ostream &os, const Congestion_control_selector::Strategy_choice &strategy_choice)
Serializes a Peer_socket_options::Congestion_control_strategy_choice enum to a standard ostream – the...
bool key_exists(const Container &container, const typename Container::key_type &key)
Returns true if and only if the given key is present at least once in the given associative container...
Auto_cleanup setup_auto_cleanup(const Cleanup_func &func)
Provides a way to execute arbitrary (cleanup) code at the exit of the current block.
std::string buffers_dump_string(const Const_buffer_sequence &data, const std::string &indentation, size_t bytes_per_line)
Identical to buffers_to_ostream() but returns an std::string instead of writing to a given ostream.
bool subtract_with_floor(Minuend *minuend, const Subtrahend &subtrahend, const Minuend &floor)
Performs *minuend -= subtrahend, subject to a floor of floor.
Integer ceil_div(Integer dividend, Integer divisor)
Returns the result of the given non-negative integer divided by a positive integer,...
bool in_open_open_range(T const &min_val, T const &val, T const &max_val)
Returns true if and only if the given value is within the given range, given as a (low,...
Scheduled_task_handle schedule_task_from_now(log::Logger *logger_ptr, const Fine_duration &from_now, bool single_threaded, Task_engine *task_engine, Scheduled_task_handler &&task_body_moved)
Schedule the given function to execute in a certain amount of time: A handy wrapper around Timer (asi...
bool scheduled_task_fired(log::Logger *logger_ptr, Scheduled_task_const_handle task)
Returns whether a previously scheduled (by schedule_task_from_now() or similar) task has already fire...
bool in_open_closed_range(T const &min_val, T const &val, T const &max_val)
Returns true if and only if the given value is within the given range, given as a (low,...
void ostream_op_to_string(std::string *target_str, T const &... ostream_args)
Writes to the specified string, as if the given arguments were each passed, via << in sequence,...
Fine_duration scheduled_task_fires_from_now_or_canceled(log::Logger *logger_ptr, Scheduled_task_const_handle task)
Returns how long remains until a previously scheduled (by schedule_task_from_now() or similar) task f...
boost::shared_ptr< Scheduled_task_handle_state > Scheduled_task_handle
Black-box type that represents a handle to a scheduled task as scheduled by schedule_task_at() or sch...
bool in_closed_range(T const &min_val, T const &val, T const &max_val)
Returns true if and only if the given value is within the given range, inclusive.
boost::shared_ptr< void > Auto_cleanup
Helper type for setup_auto_cleanup().
boost::asio::io_context Task_engine
Short-hand for boost.asio event service, the central class of boost.asio.
bool scheduled_task_cancel(log::Logger *logger_ptr, Scheduled_task_handle task)
Attempts to prevent the execution of a previously scheduled (by schedule_task_from_now() or similar) ...
Blob_with_log_context<> Blob
A concrete Blob_with_log_context that compile-time-disables Basic_blob::share() and the sharing API d...
boost::system::error_code Error_code
Short-hand for a boost.system error code (which basically encapsulates an integer/enum error code and...
Flow_log_component
The flow::log::Component payload enumeration comprising various log components used by Flow's own int...
Fine_clock::duration Fine_duration
A high-res time duration as computed from two Fine_time_pts.
Fine_clock::time_point Fine_time_pt
A high-res time point as returned by Fine_clock::now() and suitable for precise time math in general.
unsigned char uint8_t
Byte. Best way to represent a byte of binary data. This is 8 bits on all modern systems.
Specifies the outgoing (pre-serialization) acknowledgment of a single received Data_packet,...
Equivalent of Individual_ack_rexmit_off but for sockets with retransmission enabled.
Specifies the incoming (post-deserialization) acknowledgment of a single received Data_packet.
boost::shared_ptr< const Individual_ack > Const_ptr
Short-hand for ref-counted pointer to immutable objects of this class.
uint64_t ack_delay_t
Type used to store the ACK delay for a given individual acknowledged packet.
Fine_duration Ack_delay_time_unit
Ack_delay_time_unit(1) is the duration corresponding to the ack_delay_t value 1; and proportionally f...
uint32_t rcv_wnd_t
Type used to store the size of m_rcv_wnd member in a couple of different packet types.
uint8_t rexmit_id_t
Type used to store the retransmission count in DATA and ACK packets.
The data nugget uniquely identifying a peer-to-peer connection from a remote endpoint to a port in th...
Metadata describing the data sent in the acknowledgment of an individual received packet.
boost::shared_ptr< const Individual_ack > Const_ptr
Short-hand for ref-counted pointer to immutable objects of this class.
boost::shared_ptr< Individual_ack > Ptr
Short-hand for ref-counted pointer to mutable objects of this class.
Metadata (and data, if retransmission is on) for a packet that has been received (and,...
const size_t m_size
Number of bytes in the Data_packet::m_data field of that packet.
Received_packet(log::Logger *logger_ptr, size_t size, util::Blob *src_data)
Constructs object by storing size of data and, if so instructed, the data themselves.
util::Blob m_data
Byte sequence equal to that of Data_packet::m_data of the packet.
Data store to keep timing related info when a packet is sent out.
const order_num_t m_order_num
Order number of the packet.
size_t m_sent_cwnd_bytes
The congestion window size (in bytes) that is used when the packet is sent out.
Fine_time_pt m_sent_time
The timestamp when the packet is sent out.
Metadata (and data, if retransmission is on) for a packet that has been sent one (if retransmission i...
Sent_packet(bool rexmit_on, boost::shared_ptr< Data_packet > packet, const Sent_when &sent_when)
Constructs object with the given values and m_acks_after_me at zero.
std::vector< Sent_when > m_sent_when
Time stamps, order numbers, and other info at the times when the different attempts (including origin...
const size_t m_size
Number of bytes in the Data_packet::m_data field of the sent packet.
const boost::shared_ptr< Data_packet > m_packet
If retransmission is on, this is the DATA packet itself that was sent; otherwise null.
uint16_t ack_count_t
Type used for m_acks_after_me.
ack_count_t m_acks_after_me
The number of times any packet with m_sent_when.back().m_order_num > this->m_sent_when....
A data store that keeps stats about the a Peer_socket connection.
Peer_socket_send_stats m_snd
Stats for outgoing direction of traffic. As opposed to the other m_snd_* members, this typically accu...
Node_options m_node_opts
Per-node options currently set on the socket's Node.
size_t m_low_lvl_max_buf_size
The UDP receive buffer maximum size, as reported by an appropriate call to the appropriate getsockopt...
size_t m_rcv_buf_size
The number of bytes in the internal Receive buffer.
size_t m_rcv_wnd_last_advertised
The last rcv_wnd (receive window) size sent to sender (not necessarily received; packets can be lost)...
Fine_duration m_snd_pacing_slice_period
In pacing, the duration of the current pacing time slice.
size_t m_rcv_reassembly_q_data_size
If rexmit_on is false then 0; otherwise the total DATA payload in the reassembly queue of the socket.
size_t m_snd_pacing_bytes_allowed_this_slice
This many bytes worth of DATA packets may still be sent, at this time, within the time slice defined ...
Peer_socket_options m_sock_opts
Per-socket options currently set on the socket.
size_t m_snd_buf_size
The number of bytes in the internal Send buffer.
size_t m_rcv_syn_rcvd_data_cumulative_size
Total size of DATA payload queued while waiting for SYN_ACK_ACK in SYN_RCVD state.
size_t m_rcv_syn_rcvd_data_q_size
Number of DATA packets queued while waiting for SYN_ACK_ACK in SYN_RCVD state.
std::string m_int_state_str
The internal state of the socket, rendered into string (e.g., "SYN_RECEIVED" or "ESTABLISHED").
Fine_time_pt m_snd_pacing_slice_start
In pacing, the time point marking the beginning of the current pacing time slice.
size_t m_snd_cong_ctl_in_flight_count
In congestion control, the current sent data packets that have been neither acknowledged nor consider...
size_t m_snd_cong_ctl_in_flight_bytes
In congestion control, the current sent data bytes that have been neither acknowledged nor considered...
double m_snd_est_bandwidth_mbit_per_sec
Estimate of the currently available (to this connection) outgoing bandwidth, in megabits per second.
size_t m_rcv_wnd
Receive window size = max Receive buffer space minus space taken. Infinity if flow control disabled.
size_t m_rcv_packets_with_gaps
Number of DATA packets tracked in structure tracking all valid received packets such at least one pac...
size_t m_snd_cong_ctl_wnd_bytes
In congestion control, the current congestion window (number of outgoing data bytes allowed In-flight...
Fine_duration m_snd_smoothed_round_trip_time
Estimated current round trip time of packets, computed as a smooth value over the past individual RTT...
Error_code m_disconnect_cause
If the socket is closing or closed, this is the reason for the closure; otherwise the default-constru...
size_t m_snd_cong_ctl_wnd_count_approx
In congestion control, the approximate equivalent of m_snd_cong_ctl_in_flight_bytes as a full packet ...
size_t m_snd_rcv_wnd
The receive window (rcv_wnd a/k/a free Receive buffer space) value of the peer socket on the other si...
bool m_is_active_connect
true if this is the "client" socket (connect()ed); false otherwise (accept()ed).
size_t m_snd_pacing_packet_q_size
In pacing, number of packets currently queued to be sent out by the pacing module.
Fine_duration m_snd_round_trip_time_variance
RTTVAR used for m_snd_smoothed_round_trip_time calculation; it is the current RTT variance.
Peer_socket_receive_stats m_rcv
Stats for incoming direction of traffic. As opposed to the other m_rcv_* members, this typically accu...
Fine_duration m_snd_drop_timeout
Drop Timeout: how long a given packet must remain unacknowledged to be considered dropped due to Drop...
A set of low-level options affecting a single Peer_socket.
Fine_duration m_st_init_drop_timeout
Once socket enters ESTABLISHED state, this is the value for Peer_socket::m_snd_drop_timeout until the...
unsigned int m_st_max_rexmissions_per_packet
If retransmission is enabled and a given packet is retransmitted this many times and has to be retran...
size_t m_st_rcv_buf_max_size
Maximum number of bytes that the Receive buffer can hold.
size_t m_st_cong_ctl_max_cong_wnd_blocks
The constant that determines the CWND limit in Congestion_control_classic_data::congestion_window_at_...
Fine_duration m_st_snd_bandwidth_est_sample_period_floor
When estimating the available send bandwidth, each sample must be compiled over at least this long of...
unsigned int m_st_cong_ctl_cong_avoidance_increment_blocks
The multiple of max-block-size by which to increment CWND in congestion avoidance mode after receivin...
size_t m_st_cong_ctl_cong_wnd_on_drop_timeout_blocks
On Drop Timeout, set congestion window to this value times max-block-size.
size_t m_st_cong_ctl_init_cong_wnd_blocks
The initial size of the congestion window, given in units of max-block-size-sized blocks.
bool m_st_rexmit_on
Whether to enable reliability via retransmission.
size_t m_st_snd_buf_max_size
Maximum number of bytes that the Send buffer can hold.
Fine_duration m_st_connect_retransmit_period
How often to resend SYN or SYN_ACK while SYN_ACK or SYN_ACK_ACK, respectively, has not been received.
Fine_duration m_dyn_rcv_wnd_recovery_timer_period
When the mode triggered by rcv-buf-max-size-to-advertise-percent being exceeded is in effect,...
Fine_duration m_st_connect_retransmit_timeout
How long from the first SYN or SYN_ACK to allow for connection handshake before aborting connection.
size_t m_st_max_full_blocks_before_ack_send
If there are at least this many TIMES max-block-size bytes' worth of individual acknowledgments to be...
Fine_duration m_st_delayed_ack_timer_period
The maximum amount of time to delay sending ACK with individual packet's acknowledgment since receivi...
unsigned int m_dyn_drop_timeout_backoff_factor
Whenever the Drop Timer fires, upon the requisite Dropping of packet(s), the DTO (Drop Timeout) is se...
size_t m_st_max_block_size
The size of block that we will strive to (and will, assuming at least that many bytes are available i...
unsigned int m_st_cong_ctl_classic_wnd_decay_percent
In classic congestion control, RFC 5681 specifies the window should be halved on loss; this option al...
unsigned int m_st_rcv_buf_max_size_to_advertise_percent
% of rcv-buf-max-size that has to be freed, since the last receive window advertisement,...
unsigned int m_st_rcv_max_packets_after_unrecvd_packet_ratio_percent
The limit on the size of Peer_socket::m_rcv_packets_with_gaps, expressed as what percentage the maxim...
Fine_duration m_dyn_drop_timeout_ceiling
Ceiling to impose on the Drop Timeout.
Represents the remote endpoint of a Flow-protocol connection; identifies the UDP endpoint of the remo...
util::Udp_endpoint m_udp_endpoint
UDP address (IP address/UDP port) where the Node identified by this endpoint bound its low-level UDP ...
#define FLOW_UTIL_WHERE_AM_I_STR()
Same as FLOW_UTIL_WHERE_AM_I() but evaluates to an std::string.