26#include <boost/algorithm/string.hpp> 
   27#include <boost/tuple/tuple.hpp> 
   42  m_active_connect(false), 
 
   43  m_state(
State::S_CLOSED), 
 
   46  m_rcv_buf(logger_ptr, 0), 
 
   48  m_snd_buf(logger_ptr, max_block_size()),
 
   49  m_serialized_metadata(logger_ptr),
 
   52  m_rcv_syn_rcvd_data_cumulative_size(0), 
 
   53  m_rcv_reassembly_q_data_size(0),
 
   54  m_rcv_pending_acks_size_at_recv_handler_start(0),
 
   55  m_snd_pending_rcv_wnd(0), 
 
   56  m_rcv_last_sent_rcv_wnd(0),
 
   57  m_rcv_in_rcv_wnd_recovery(false),
 
   58  m_rcv_delayed_ack_timer(*task_engine),
 
   59  m_snd_flying_bytes(0),
 
   60  m_snd_last_order_num(0),
 
   61  m_snd_rexmit_q_size(0),
 
   62  m_snd_remote_rcv_wnd(0),
 
   63  m_snd_smoothed_round_trip_time(0),
 
   64  m_round_trip_time_variance(0),
 
   65  m_snd_drop_timeout(0),
 
   66  m_snd_pacing_data(task_engine),
 
   68  m_init_rexmit_count(0)
 
   71  FLOW_LOG_TRACE(
"Peer_socket [" << 
static_cast<void*
>(
this) << 
"] created.");
 
  109  return sync_send(tag, Fine_duration::max(), err_code);
 
  116  namespace bind_ns = util::bind_ns;
 
  120                                     bind_ns::cref(wait_until), _1);
 
  124  const Function<size_t (
size_t)> empty_snd_buf_feed_func;
 
  125  assert(empty_snd_buf_feed_func.empty());
 
  139  const Ptr sock = shared_from_this();
 
  146  return m_node->
send(sock, snd_buf_feed_func, err_code);
 
  153  using boost::adopt_lock;
 
  158  const Ptr sock = shared_from_this();
 
  184                snd_buf_feed_func_or_empty.empty()
 
  187                                          { 
return m_node->
send(sock, snd_buf_feed_func_or_empty, err_code); }),
 
  189                wait_until, err_code);
 
  194  return sync_receive(tag, Fine_duration::max(), err_code);
 
  201  namespace bind_ns = util::bind_ns;
 
  205                                     bind_ns::cref(wait_until), _1);
 
  209  const Function<size_t ()> empty_rcv_buf_consume_func;
 
  210  assert(empty_rcv_buf_consume_func.empty());
 
  224  const Ptr sock = shared_from_this();
 
  231  return m_node->
receive(sock, rcv_buf_consume_func, err_code);
 
  238  using boost::adopt_lock;
 
  243  const Ptr sock = shared_from_this();
 
  261                rcv_buf_consume_func_or_empty.empty()
 
  264                                          { 
return m_node->
receive(sock, rcv_buf_consume_func_or_empty, err_code); }),
 
  266                wait_until, err_code);
 
  283  const Ptr sock = shared_from_this();
 
  299  namespace bind_ns = util::bind_ns;
 
  307  const Ptr sock = shared_from_this();
 
  335  const Const_ptr sock = shared_from_this();
 
  358                                            const unsigned int* inflate_pct_val_ptr)
 const 
  364  const unsigned int inflate_pct = inflate_pct_val_ptr ? (*inflate_pct_val_ptr) : 0;
 
  393  namespace bind_ns = util::bind_ns;
 
  431  os.os() << bytes << 
'~' << (bytes / block);
 
  432  if ((bytes % block) != 0)
 
  441                                      boost::shared_ptr<Data_packet> packet,
 
  443  m_size(packet->m_data.size()),
 
  444  m_sent_when({ sent_when }),
 
  446  m_packet(
rexmit_on ? packet : boost::shared_ptr<Data_packet>()) 
 
  460    m_data = std::move(*src_data); 
 
  476                                      boost::shared_ptr<const Syn_ack_packet> syn_ack)
 
  484  FLOW_LOG_INFO(
"NetFlow worker thread continuing active-connect of [" << sock << 
"].  " 
  485                "Received [" << syn_ack->m_type_ostream_manip << 
"] with " 
  486                "ISN [" << syn_ack->m_init_seq_num << 
"]; " 
  487                "security token [" << syn_ack->m_packed.m_security_token << 
"].");
 
  491  if (!async_low_lvl_syn_ack_ack_send_or_close_immediately(sock, syn_ack))
 
  503  sock->m_rcv_init_seq_num = syn_ack->m_init_seq_num;
 
  504  sock->m_rcv_next_seq_num = sock->m_rcv_init_seq_num + 1;
 
  517  setup_drop_timer(socket_id, sock);
 
  520  sock->m_snd_remote_rcv_wnd = syn_ack->m_packed.m_rcv_wnd;
 
  532    event_set_all_check_delta(
true);
 
  540                                         boost::shared_ptr<const Syn_ack_packet> syn_ack)
 
  548  FLOW_LOG_INFO(
"NetFlow worker thread working on [" << sock << 
"].  " 
  550                "received duplicate [" << syn_ack->m_type_ostream_manip << 
"] with " 
  551                "ISN [" << syn_ack->m_init_seq_num << 
"]; " 
  552                "security token [" << syn_ack->m_packed.m_security_token << 
"].  " 
  553                "Could be from packet loss.");
 
  557  async_low_lvl_syn_ack_ack_send_or_close_immediately(sock, syn_ack);
 
  562                                      boost::shared_ptr<Data_packet> packet,
 
  563                                      bool syn_rcvd_qd_packet)
 
  617  const bool rexmit_on = sock->rexmit_on();
 
  620  auto& data = packet->m_data; 
 
  621  assert(!data.empty()); 
 
  623  const size_t data_size = data.size();
 
  631  FLOW_LOG_TRACE(
"NetFlow worker thread working on [" << sock << 
"].  " 
  632                 "Received [" << packet->m_type_ostream_manip << 
"] with " 
  633                 "sequence number [" << seq_num << 
"]; data size [" << data_size << 
"].");
 
  638  log_rcv_window(sock); 
 
  654  const Error_code cat_result = sock_categorize_data_to_established(sock, packet, &dupe, &slide, &slide_size);
 
  664    rst_and_close_connection_immediately(socket_id, sock, cat_result, 
true);
 
  697    async_acknowledge_packet(sock, seq_num, packet->m_rexmit_id, data_size); 
 
  713    if (!sock_data_to_rcv_buf_unless_overflow(sock, packet))
 
  737    sock_rcv_buf_now_readable(sock, syn_rcvd_qd_packet);
 
  745    async_acknowledge_packet(sock, seq_num, 0, data_size); 
 
  754      sock_track_new_data_after_gap_rexmit_off(sock, packet, data_size, &slide, &slide_size);
 
  766      sock_slide_rcv_next_seq_num(sock, slide_size, 
false);
 
  781      if (!sock_data_to_rcv_buf_unless_overflow(sock, packet))
 
  798      sock_slide_rcv_next_seq_num(sock, slide_size, 
true);
 
  802      sock_rcv_buf_now_readable(sock, syn_rcvd_qd_packet);
 
  804    else if (!sock_data_to_reassembly_q_unless_overflow(sock, packet)) 
 
  818    async_acknowledge_packet(sock, seq_num, packet->m_rexmit_id, data_size); 
 
  822  log_rcv_window(sock);
 
  826                                                     boost::shared_ptr<const Data_packet> packet,
 
  827                                                     bool* dupe, 
bool* slide, 
size_t* slide_size)
 
  829  assert(dupe && slide && slide_size);
 
  841  const auto& data = packet->m_data;
 
  846  advance_seq_num(&seq_num_end, data.size());
 
  849  bool first_gap_exists;
 
  852  rcv_get_first_gap_info(sock, &first_gap_exists, &seq_num_after_first_gap);
 
  866                     "Received [" << packet->m_type_ostream_manip << 
"] with " 
  867                     "sequence number [" << seq_num << 
"]; data size [" << data.size() << 
"]; " 
  868                     "sequence number precedes " 
  869                     "ISN [" << sock->m_rcv_init_seq_num << 
"].");
 
  875  if (seq_num < rcv_next_seq_num)
 
  882    if (seq_num_end > rcv_next_seq_num)
 
  886                       "Received [" << packet->m_type_ostream_manip << 
"] with " 
  887                       "sequence numbers [" << seq_num << 
", " << seq_num_end << 
"); " 
  888                       "data size [" << data.size() << 
"]; " 
  889                       "straddle first unreceived " 
  890                       "sequence number [" << rcv_next_seq_num << 
"].");
 
  897    FLOW_LOG_TRACE(
"Duplicate packet before first unreceived sequence number [" << rcv_next_seq_num << 
"].");
 
  910  if (seq_num == rcv_next_seq_num)
 
  916    if (first_gap_exists && (seq_num_end > seq_num_after_first_gap))
 
  920                       "Received [" << packet->m_type_ostream_manip << 
"] with " 
  921                       "sequence numbers [" << seq_num << 
", " << seq_num_end << 
"); " 
  922                       "data size [" << data.size() << 
"]; " 
  923                       "supposed gap-filling data " 
  924                       "straddle the boundary of packet [" << seq_num_after_first_gap << 
", ...).");
 
  930    FLOW_LOG_TRACE(
"Packet filled first [" << data.size() << 
"] unreceived sequence numbers " 
  931                   "starting with [" << rcv_next_seq_num << 
"].");
 
  935    *slide_size = size_t(seq_num_end - seq_num);
 
  936    assert(*slide_size == data.size());
 
  941  assert(seq_num > rcv_next_seq_num);
 
  983  if (next_packet == rcv_packets_with_gaps.end())
 
  991    if (first_gap_exists)
 
  995      get_seq_num_range(last_packet, 0, &seq_num_last_end);
 
  997      if (seq_num_last_end > seq_num) 
 
 1003                         "Received [" << packet->m_type_ostream_manip << 
"] with " 
 1004                         "sequence numbers [" << seq_num << 
", " << seq_num_end << 
"); " 
 1005                         "data size [" << data.size() << 
"]; " 
 1006                         "supposed middle gap-filling packet data " 
 1007                         "straddle the boundary of last packet [..., " << seq_num_last_end << 
").");
 
 1015      FLOW_LOG_TRACE(
"New packet is newest packet after unreceived gap; " 
 1016                     "sequence numbers [" << seq_num << 
", " << seq_num_end << 
"); " 
 1017                     "first unreceived packet [" << rcv_next_seq_num << 
"].");
 
 1023      FLOW_LOG_TRACE(
"New packet forms gap; sequence numbers [" << seq_num << 
", " << seq_num_end << 
"); " 
 1024                     "first unreceived packet [" << rcv_next_seq_num << 
"].");
 
 1034  get_seq_num_range(next_packet, &seq_num_next_start, &seq_num_next_end);
 
 1036  if (seq_num_next_start == seq_num)
 
 1041    if (seq_num_next_end != seq_num_end)
 
 1047                       "Received [" << packet->m_type_ostream_manip << 
"] with " 
 1048                       "sequence numbers [" << seq_num << 
", " << seq_num_end << 
"); " 
 1049                       "data size [" << data.size() << 
"]; " 
 1050                       "do not match supposed " 
 1051                       "duplicate packet [" << seq_num << 
", " << seq_num_next_end << 
").");
 
 1062                   "sequence numbers [" << seq_num << 
", " << seq_num_end << 
").");
 
 1068  assert(seq_num_next_start > seq_num); 
 
 1080  if (seq_num_end > seq_num_next_start) 
 
 1086                     "Received [" << packet->m_type_ostream_manip << 
"] with " 
 1087                     "sequence numbers [" << seq_num << 
", " << seq_num_end << 
"); " 
 1088                     "data size [" << data.size() << 
"]; " 
 1089                     "supposed middle gap-filling packet data " 
 1090                     "straddle the left boundary of packet " 
 1091                     "[" << seq_num_next_start << 
", " << seq_num_next_end << 
").");
 
 1097  if (next_packet == rcv_packets_with_gaps.begin())
 
 1099    FLOW_LOG_TRACE(
"New packet partially fills first gap without sliding window; " 
 1100                   "sequence numbers [" << seq_num << 
", " << seq_num_end << 
"); " 
 1101                   "first unreceived packet [" << rcv_next_seq_num << 
"].");
 
 1107  get_seq_num_range(prev_packet, &seq_num_prev_start, &seq_num_prev_end);
 
 1109  if (seq_num_prev_end > seq_num) 
 
 1115                     "Received [" << packet->m_type_ostream_manip << 
"] with " 
 1116                     "sequence numbers [" << seq_num << 
", " << seq_num_end << 
"); " 
 1117                     "data size [" << data.size() << 
"]; " 
 1118                     "supposed middle gap-filling packet data " 
 1119                     "straddle the right boundary of packet " 
 1120                     "[" << seq_num_prev_start << 
", " << seq_num_prev_end << 
").");
 
 1127                 "sequence numbers [" << seq_num << 
", " << seq_num_end << 
"); " 
 1128                 "first unreceived packet [" << rcv_next_seq_num << 
"].");
 
 1134                                                boost::shared_ptr<Data_packet> packet)
 
 1143  Blob& data = packet->m_data; 
 
 1145  const size_t data_size = data.size();
 
 1161    if ((sock->m_rcv_buf.data_size() + data_size)
 
 1162        > sock->max_block_size_multiple(sock->m_opts.m_st_rcv_buf_max_size,
 
 1163                                        &sock->m_opts.m_st_rcv_buf_max_size_slack_percent))
 
 1171      FLOW_LOG_INFO(
"NetFlow worker thread working on [" << sock << 
"].  " 
 1172                    "Received [" << packet->m_type_ostream_manip << 
"] with " 
 1173                    "sequence numbers [" << packet->m_seq_num << 
", " << (packet->m_seq_num + data_size) << 
"); " 
 1174                    "data size [" << data_size << 
"]; " 
 1175                    "dropping because Receive buffer full.");
 
 1188  const size_t written =
 
 1190      sock->m_rcv_buf.feed_buf_move(&data, std::numeric_limits<size_t>::max());
 
 1192    assert(written == data_size);
 
 1194    buf_size = sock->m_rcv_buf.data_size();
 
 1205  receive_wnd_recovery_data_received(sock);
 
 1233  if ((!syn_rcvd_qd_packet) &&
 
 1237    event_set_all_check_delta(
true);
 
 1245                                                    boost::shared_ptr<const Data_packet> packet,
 
 1247                                                    bool* slide, 
size_t* slide_size)
 
 1249  using std::make_pair;
 
 1265  const size_t max_packets_after_unrecvd_packet = sock_max_packets_after_unrecvd_packet(sock);
 
 1272  const auto insert_result =
 
 1274    rcv_packets_with_gaps.insert
 
 1278  assert(!sock->rexmit_on());
 
 1279  assert(insert_result.second); 
 
 1282  bool first_gap_exists;
 
 1286  rcv_get_first_gap_info(sock, &first_gap_exists, &seq_num_after_first_gap);
 
 1287  assert(first_gap_exists);
 
 1296  if (rcv_packets_with_gaps.size() == max_packets_after_unrecvd_packet + 1)
 
 1300    *slide_size = size_t(seq_num_after_first_gap - sock->m_rcv_next_seq_num);
 
 1306    FLOW_LOG_INFO(
"NetFlow worker thread working on [" << sock << 
"].  " 
 1307                  "Received [" << packet->m_type_ostream_manip << 
"] with " 
 1308                  "sequence numbers [" << packet->m_seq_num << 
", " << (packet->m_seq_num + data_size) << 
"); " 
 1309                  "exceeded max gapped packet list size [" << max_packets_after_unrecvd_packet << 
"]; " 
 1310                  "assuming Dropped; " 
 1311                  "will fake receiving all [" << slide_size << 
"] sequence numbers in the first unreceived gap.");
 
 1316    assert(rcv_packets_with_gaps.size() <= max_packets_after_unrecvd_packet);
 
 1321                                                     boost::shared_ptr<Data_packet> packet)
 
 1323  using std::make_pair;
 
 1332  auto& data = packet->m_data; 
 
 1334  const size_t data_size = data.size();
 
 1339  size_t max_packets_after_unrecvd_packet = sock_max_packets_after_unrecvd_packet(sock);
 
 1387  size_t max_packets_in_reassembly_q
 
 1388    = sock->max_block_size_multiple(sock->m_opts.m_st_rcv_buf_max_size,
 
 1389                                    &sock->m_opts.m_st_rcv_buf_max_size_slack_percent);
 
 1391  size_t rcv_buf_size;
 
 1394    rcv_buf_size = sock->m_rcv_buf.data_size(); 
 
 1399  max_packets_in_reassembly_q /= sock->max_block_size();
 
 1403  max_packets_in_reassembly_q += rcv_packets_with_gaps.size();
 
 1406  if (max_packets_in_reassembly_q < max_packets_after_unrecvd_packet)
 
 1408    max_packets_after_unrecvd_packet = max_packets_in_reassembly_q;
 
 1413    FLOW_LOG_TRACE(
"Unexpected Receive buffer limits: safety net [" << max_packets_after_unrecvd_packet << 
"] <= " 
 1414                   "real limit [" << max_packets_in_reassembly_q << 
"], but the opposite is typical.  " 
 1415                   "See details just below."); 
 
 1418  if (rcv_packets_with_gaps.size() + 1 > max_packets_after_unrecvd_packet)
 
 1432                     "Received [" << packet->m_type_ostream_manip << 
"] with " 
 1433                     "sequence numbers [" << packet->m_seq_num << 
", " << (packet->m_seq_num + data_size) << 
"); " 
 1434                     "exceeded max gapped packet list size [" << max_packets_after_unrecvd_packet << 
"]; " 
 1435                     "dropping packet.");
 
 1440  FLOW_LOG_TRACE(
"NetFlow worker thread working on [" << sock << 
"].  " 
 1441                 "Enqueueing [" << packet->m_type_ostream_manip << 
"] payload onto reassembly queue with " 
 1442                 "sequence numbers [" << packet->m_seq_num << 
", " << (packet->m_seq_num + data_size) << 
") " 
 1443                 "of size [" << data_size << 
"]; " 
 1444                 "successfully fit into max gapped packet list size [" << max_packets_after_unrecvd_packet << 
"]; " 
 1445                 "could have fit [" << (max_packets_after_unrecvd_packet - rcv_packets_with_gaps.size()) << 
"] more.");
 
 1449  const auto insert_result =
 
 1451    rcv_packets_with_gaps.insert
 
 1454  sock->m_rcv_reassembly_q_data_size += data_size;
 
 1455  assert(insert_result.second); 
 
 1466  receive_wnd_recovery_data_received(sock);
 
 1484  rcv_next_seq_num += slide_size; 
 
 1487                 "[" << (rcv_next_seq_num - slide_size) << 
"] to " 
 1488                 "[" << rcv_next_seq_num << 
"].");
 
 1500  size_t total_written = 0;
 
 1503  for (end_contig_it = start_contig_it;
 
 1507       (end_contig_it != rcv_packets_with_gaps.end()) && (end_contig_it->first == rcv_next_seq_num);
 
 1512    if (reassembly_in_progress)
 
 1532        written = sock->m_rcv_buf.feed_buf_move(&rcvd_packet.
m_data, std::numeric_limits<size_t>::max());
 
 1534        buf_size = sock->m_rcv_buf.data_size();
 
 1536      total_written += written;
 
 1542      assert(written != 0);
 
 1545    advance_seq_num(&rcv_next_seq_num, rcvd_packet.
m_size);
 
 1548                   "[" << rcv_next_seq_num << 
"]; packet subsumed by this move.");
 
 1552  rcv_packets_with_gaps.erase(start_contig_it, end_contig_it); 
 
 1553  sock->m_rcv_reassembly_q_data_size -= total_written;
 
 1563  return uint64_t(sock->opt(sock->m_opts.m_st_rcv_max_packets_after_unrecvd_packet_ratio_percent)) *
 
 1564         uint64_t(sock->opt(sock->m_opts.m_st_rcv_buf_max_size)) /
 
 1565         uint64_t(sock->max_block_size()) /
 
 1573  *first_gap_exists = !sock->m_rcv_packets_with_gaps.empty();
 
 1575  if (*first_gap_exists)
 
 1577    *seq_num_after_first_gap = sock->m_rcv_packets_with_gaps.begin()->first;
 
 1590  sock->m_rcv_stats.total_to_send_ack_packet(data_size);
 
 1592  const size_t acks_pending_before_this = sock->m_rcv_pending_acks.size();
 
 1598  sock->m_rcv_pending_acks.push_back
 
 1627  if (m_socks_with_accumulated_pending_acks.insert(sock).second)
 
 1633    sock->m_rcv_pending_acks_size_at_recv_handler_start = acks_pending_before_this;
 
 1640  using boost::chrono::milliseconds;
 
 1641  using boost::chrono::microseconds;
 
 1642  using boost::chrono::duration_cast;
 
 1643  using boost::chrono::round;
 
 1651  vector<Peer_socket::Individual_ack::Ptr>& pending_acks = sock->m_rcv_pending_acks;
 
 1656    FLOW_LOG_TRACE(
"Was about to perform accumulated acknowledgment tasks on [" << sock << 
"] but skipping because " 
 1657                   "state is now [" << sock->m_int_state << 
"].");
 
 1662  assert(!pending_acks.empty());
 
 1707  const Fine_duration delayed_ack_timer_period = sock->opt(sock->m_opts.m_st_delayed_ack_timer_period);
 
 1709  bool force_ack = delayed_ack_timer_period == Fine_duration::zero(); 
 
 1714      (
"Delayed [ACK] feature disabled on [" << sock << 
"]; forcing immediate [ACK].  " 
 1715       "Receive window state: [" << sock->m_rcv_init_seq_num << 
", " << sock->m_rcv_next_seq_num << 
") " 
 1716       "| " << sock->m_rcv_packets_with_gaps.size() << 
":{...}.");
 
 1718  else if (!sock->m_rcv_packets_with_gaps.empty())
 
 1730    for (
size_t ack_idx = sock->m_rcv_pending_acks_size_at_recv_handler_start;
 
 1731         ack_idx != pending_acks.size(); ++ack_idx)
 
 1733      ack = pending_acks[ack_idx];
 
 1734      if (ack->m_seq_num > sock->m_rcv_next_seq_num)
 
 1744        (
"On [" << sock << 
"] " 
 1745         "received out-of-order packet [" << ack->m_seq_num << 
", size " << ack->m_data_size << 
", " 
 1746         "rexmit " << ack->m_rexmit_id << 
"]; " 
 1747         "forcing immediate [ACK].  " 
 1748         "Receive window state: [" << sock->m_rcv_init_seq_num << 
", " << sock->m_rcv_next_seq_num << 
") " 
 1749         "| " << sock->m_rcv_packets_with_gaps.size() << 
":{...}.");
 
 1757      = sock->opt(sock->m_opts.m_st_max_full_blocks_before_ack_send) * sock->max_block_size();
 
 1761      bytes += ack->m_data_size;
 
 1772                     "accumulated at least [" << limit << 
"] bytes to acknowledge; " 
 1773                     "forcing immediate [ACK].");
 
 1799    if (sock->m_rcv_pending_acks_size_at_recv_handler_start != 0)
 
 1802                     "canceling delayed [ACK] timer due to forcing " 
 1803                     "immediate [ACK]; would have fired " 
 1804                     "in [" << round<milliseconds>(sock->m_rcv_delayed_ack_timer.expires_from_now()) << 
"] " 
 1808      const size_t num_canceled = sock->m_rcv_delayed_ack_timer.cancel(sys_err_code);
 
 1816        rst_and_close_connection_immediately(socket_id, sock,
 
 1823      if (num_canceled == 0)
 
 1829                      "tried to cancel delayed [ACK] timer while " 
 1830                      "forcing [ACK], but it was already just about to fire.");
 
 1838      async_low_lvl_ack_send(sock, 
true);
 
 1841      assert(pending_acks.empty());
 
 1863    if (sock->m_rcv_pending_acks_size_at_recv_handler_start == 0)
 
 1868      sock->m_rcv_delayed_ack_timer.expires_from_now(delayed_ack_timer_period, sys_err_code);
 
 1879        rst_and_close_connection_immediately(socket_id, sock,
 
 1887                     "scheduled delayed [ACK] timer to fire " 
 1888                     "in [" << round<milliseconds>(delayed_ack_timer_period) << 
"].");
 
 1891      sock->m_rcv_delayed_ack_timer.async_wait([
this, socket_id, sock](
const Error_code& sys_err_code)
 
 1893        async_low_lvl_ack_send(sock, 
false, sys_err_code);
 
 1901  sock->m_rcv_stats.current_pending_to_ack_packets(pending_acks.size());
 
 1908  using boost::algorithm::join;
 
 1923      (
"Receive window state for [" << sock << 
"]: " 
 1924       "[" << sock->m_rcv_init_seq_num << 
", " << sock->m_rcv_next_seq_num << 
") " 
 1925       "| " << rcv_packets_with_gaps.size() << 
":{...}.");
 
 1936  vector<string> pkt_strs;
 
 1937  pkt_strs.reserve(rcv_packets_with_gaps.size());
 
 1939  const size_t MAX_TO_SHOW = 100;
 
 1940  bool skipped_some = 
false;
 
 1944       pkt_it != rcv_packets_with_gaps.end();
 
 1947    const bool last_iteration = (count == rcv_packets_with_gaps.size() - 1);
 
 1949    if ((!skipped_some) && (count > MAX_TO_SHOW) && (!last_iteration))
 
 1952      skipped_some = 
true;
 
 1963      if (!last_iteration)
 
 1970      pkt_str = 
"[...skipped...] ";
 
 1975    get_seq_num_range(pkt_it, &start, &end);
 
 1978    pkt_strs.push_back(pkt_str);
 
 1985     "Receive window state for [" << sock << 
"]: " 
 1986       "[" << sock->m_rcv_init_seq_num << 
", " << sock->m_rcv_next_seq_num << 
") " 
 1987       "| " << rcv_packets_with_gaps.size() << 
":{" << join(pkt_strs, 
" ") << 
"}.");
 
 1991                                     boost::shared_ptr<const Ack_packet> ack)
 
 2045  sock->m_snd_pending_rcv_wnd = ack->m_rcv_wnd;
 
 2048  sock->m_rcv_acked_packets.insert(sock->m_rcv_acked_packets.end(), 
 
 2049                                   ack->m_rcv_acked_packets.begin(), ack->m_rcv_acked_packets.end());
 
 2050  m_socks_with_accumulated_acks.insert(sock); 
 
 2052  FLOW_LOG_TRACE(
"NetFlow worker thread working on [" << sock << 
"].  " 
 2053                 "Received and accumulated [" << ack->m_type_ostream_manip << 
"] with " 
 2054                 "[" << ack->m_rcv_acked_packets.size() << 
"] individual acknowledgments " 
 2055                 "and rcv_wnd = [" << ack->m_rcv_wnd << 
"]; total for this socket in this " 
 2056                 "receive handler is [" << sock->m_rcv_acked_packets.size() << 
"] individual acknowledgments.");
 
 2058  sock->m_snd_stats.received_low_lvl_ack_packet(ack->m_rcv_acked_packets.empty());
 
 2066  using boost::unordered_set;
 
 2067  using boost::chrono::round;
 
 2068  using boost::chrono::milliseconds;
 
 2069  using boost::chrono::seconds;
 
 2079  log_accumulated_acks(sock);
 
 2083  using Acks = vector<Ack_packet::Individual_ack::Ptr>;
 
 2084  Acks& acked_packets = sock->m_rcv_acked_packets;
 
 2097    FLOW_LOG_INFO(
"NetFlow worker thread working on [" << sock << 
"].  " 
 2098                  "Accumulated [ACK] packets with [" << acked_packets.size() << 
"] " 
 2099                  "individual acknowledgments, but state is now [" << sock->m_int_state << 
"]; ignoring ACKs forever.");
 
 2243  const bool rexmit_on = sock->rexmit_on();
 
 2244  auto& snd_stats = sock->m_snd_stats;
 
 2245  auto& snd_flying_pkts_by_when = sock->m_snd_flying_pkts_by_sent_when;
 
 2248  auto& pkts_marked_to_drop = sock->m_snd_temp_pkts_marked_to_drop;
 
 2249  pkts_marked_to_drop.clear();
 
 2252  const bool could_send_before_acks = can_send(sock);
 
 2254  const bool had_rexmit_data_before_acks = !sock->m_snd_rexmit_q.empty();
 
 2259  unordered_set<Peer_socket::order_num_t> flying_now_acked_pkts;
 
 2262  size_t clean_acked_bytes = 0;
 
 2263  size_t clean_acked_packets = 0;
 
 2267  using Clean_acked_packet = tuple<Fine_duration, size_t, size_t>;
 
 2268  vector<Clean_acked_packet> clean_acked_packet_events;
 
 2269  clean_acked_packet_events.reserve(min(acked_packets.size(), snd_flying_pkts_by_when.size())); 
 
 2287    const bool error_ack = !categorize_individual_ack(socket_id, sock, ack, &dupe_or_late, &flying_pkt_it);
 
 2300    if (flying_pkt_it != snd_flying_pkts_by_when.past_oldest())
 
 2303      flying_pkt = flying_pkt_it->second;
 
 2304      round_trip_time = compute_rtt_on_ack(flying_pkt, time_now, ack, &sent_when); 
 
 2313    assert(!dupe_or_late);
 
 2315    assert(flying_pkt_it != snd_flying_pkts_by_when.past_oldest());
 
 2319    new_round_trip_time_sample(sock, round_trip_time);
 
 2329    const size_t bytes_acked = flying_pkt->m_size;
 
 2331    clean_acked_packet_events.emplace_back(round_trip_time, bytes_acked, cwnd_bytes);
 
 2334    snd_flying_pkts_erase_one(sock, flying_pkt_it);
 
 2337    clean_acked_bytes += bytes_acked;
 
 2338    ++clean_acked_packets;
 
 2363    flying_now_acked_pkts.insert(sent_when->
m_order_num);
 
 2372    = categorize_pkts_as_dropped_on_acks(sock, flying_now_acked_pkts);
 
 2378  size_t dropped_pkts;
 
 2379  size_t dropped_bytes;
 
 2380  size_t cong_ctl_dropped_bytes;
 
 2381  size_t cong_ctl_dropped_pkts;
 
 2382  if (!drop_pkts_on_acks(sock, last_dropped_pkt_it,
 
 2383                         &cong_ctl_dropped_pkts, &cong_ctl_dropped_bytes,
 
 2384                         &dropped_pkts, &dropped_bytes, &pkts_marked_to_drop))
 
 2405  if (dropped_pkts != 0)
 
 2408    FLOW_LOG_INFO(
"NetFlow worker thread working on [" << sock << 
"].  " 
 2409                  "Considering Dropped: [" << dropped_bytes << 
"] bytes = [" << dropped_pkts << 
"] packets.");
 
 2411    if (cong_ctl_dropped_pkts != 0) 
 
 2414      assert(cong_ctl_dropped_bytes != 0); 
 
 2416      FLOW_LOG_INFO(
"cong_ctl [" << sock << 
"] update: loss event: " 
 2417                    "Dropped [" << cong_ctl_dropped_bytes << 
"] bytes " 
 2418                    "= [" << cong_ctl_dropped_pkts << 
"] packets.");
 
 2420      sock->m_snd_cong_ctl->on_loss_event(cong_ctl_dropped_bytes, cong_ctl_dropped_pkts);
 
 2421      sock->m_snd_last_loss_event_when = Fine_clock::now();
 
 2428    assert(dropped_pkts == 0);
 
 2429    assert(cong_ctl_dropped_pkts == 0);
 
 2432  if (clean_acked_packets != 0)
 
 2434    assert(clean_acked_bytes != 0); 
 
 2435    assert(!clean_acked_packet_events.empty());
 
 2438    for (
const auto& [rtt, bytes, cwnd_bytes] : clean_acked_packet_events)
 
 2440      FLOW_LOG_TRACE(
"cong_ctl [" << sock << 
"] update: clean individual acknowledgment: " 
 2441                     "[" << sock->bytes_blocks_str(bytes) << 
"] with RTT [" << round<milliseconds>(rtt) <<
 
 2442                     "] and sent_cwnd_bytes [" << cwnd_bytes << 
"].");
 
 2444      sock->m_snd_cong_ctl->on_individual_ack(rtt, bytes, cwnd_bytes);
 
 2447    FLOW_LOG_TRACE(
"cong_ctl/bw_est [" << sock << 
"] update: clean acknowledgments: " 
 2448                   "[" << sock->bytes_blocks_str(clean_acked_bytes) << 
"] = " 
 2449                   "[" << clean_acked_packets << 
"] packets.");
 
 2452    sock->m_snd_bandwidth_estimator->on_acks(clean_acked_bytes);
 
 2453    sock->m_snd_cong_ctl->on_acks(clean_acked_bytes, clean_acked_packets);
 
 2461  if (dropped_pkts != 0)
 
 2464    snd_stats.dropped_data(dropped_bytes, dropped_pkts);
 
 2466    const seconds MIN_TIME_BETWEEN_LOGS(1);
 
 2467    const Fine_duration since_last_loss_sock_log = Fine_clock::now() - m_last_loss_sock_log_when;
 
 2469    if (since_last_loss_sock_log > MIN_TIME_BETWEEN_LOGS)
 
 2471      FLOW_LOG_INFO(
"Will log socket state on loss, because last such loss-driven logging was " 
 2472                    "[" << round<milliseconds>(since_last_loss_sock_log) << 
" >" 
 2473                    " " << MIN_TIME_BETWEEN_LOGS << 
"] ago.");
 
 2474      sock_log_detail(sock);
 
 2475      m_last_loss_sock_log_when = Fine_clock::now();
 
 2479      FLOW_LOG_INFO(
"Will NOT log socket state on loss, because last such loss-driven logging was " 
 2480                    "[" << round<milliseconds>(since_last_loss_sock_log) << 
" <=" 
 2481                    " " << MIN_TIME_BETWEEN_LOGS << 
"] ago.");
 
 2486  log_snd_window(sock);
 
 2496  drop_timer->start_contemporaneous_events();
 
 2498  for (
const auto pkt_order_num : flying_now_acked_pkts)
 
 2500    drop_timer->on_ack(pkt_order_num);
 
 2501    drop_timer->on_packet_no_longer_in_flight(pkt_order_num);
 
 2503  for (
const auto pkt_order_num : pkts_marked_to_drop)
 
 2505    drop_timer->on_packet_no_longer_in_flight(pkt_order_num);
 
 2508  drop_timer->end_contemporaneous_events();
 
 2512  if (sock->m_snd_pending_rcv_wnd != sock->m_snd_remote_rcv_wnd)
 
 2515                   "rcv_wnd change [" << sock->m_snd_remote_rcv_wnd << 
"] => [" << sock->m_snd_pending_rcv_wnd << 
"].");
 
 2516    sock->m_snd_remote_rcv_wnd = sock->m_snd_pending_rcv_wnd;
 
 2527    sock->m_snd_stats.updated_rcv_wnd(sock->m_snd_remote_rcv_wnd < sock->max_block_size());
 
 2536  if ((!could_send_before_acks) || (
rexmit_on && (!had_rexmit_data_before_acks)))
 
 2538    send_worker(sock, 
true);
 
 2549  assert(dupe_or_late);
 
 2550  assert(acked_pkt_it);
 
 2594  const bool rexmit_on = sock->rexmit_on();
 
 2595  auto& snd_flying_pkts_by_when = sock->m_snd_flying_pkts_by_sent_when;
 
 2596  auto& snd_flying_pkts_by_seq = sock->m_snd_flying_pkts_by_seq_num;
 
 2597  auto& snd_stats = sock->m_snd_stats;
 
 2602  const unsigned int rexmit_id = ack->m_rexmit_id;
 
 2606  snd_stats.received_ack();
 
 2622                     "acknowledgment [" << seq_num << 
", ...) is outside (ISN, snd_next) " 
 2623                     "range (" << sock->m_snd_init_seq_num << 
", " << sock->m_snd_next_seq_num << 
").");
 
 2626    snd_stats.error_ack();
 
 2632    rst_and_close_connection_immediately(socket_id, sock,
 
 2642  *acked_pkt_it = snd_flying_pkts_by_when.find(seq_num);
 
 2643  if (*acked_pkt_it == snd_flying_pkts_by_when.past_oldest()) 
 
 2710    if (pkt_it != snd_flying_pkts_by_seq.begin())
 
 2716      get_seq_num_range(pkt_it->second, &l1, &l2);
 
 2718      assert(l1 < seq_num); 
 
 2724        snd_stats.error_ack();
 
 2728                         "acknowledgment [" << seq_num << 
", ...) is at least partially inside " 
 2729                         "packet [" << l1 << 
", " << l2 << 
").");
 
 2765    snd_stats.late_or_dupe_ack();
 
 2767    FLOW_LOG_INFO(
"NetFlow worker thread working on [" << sock << 
"].  " 
 2768                  "Acknowledged packet [" << seq_num << 
", ...) is duplicate or late (or invalid).  " 
 2769                  "RTT unknown.  Ignoring.");
 
 2772    *dupe_or_late = 
true;
 
 2773    assert(*acked_pkt_it == snd_flying_pkts_by_when.past_oldest()); 
 
 2777  assert(*acked_pkt_it != snd_flying_pkts_by_when.past_oldest());
 
 2782  const unsigned int acked_rexmit_id = 
rexmit_on ? acked_pkt.
m_packet->m_rexmit_id : 0;
 
 2784  get_seq_num_range(*acked_pkt_it, 0, &seq_num_end);
 
 2788  if (rexmit_id > acked_rexmit_id)
 
 2792                     "Acknowledged packet [" << seq_num << 
", " << seq_num_end << 
") " 
 2793                     "rexmit_id [" << 
int(rexmit_id) << 
"] " 
 2794                     "exceeds highest sent rexmit_id [" << 
int(acked_rexmit_id) << 
"].");
 
 2803  if (rexmit_id != acked_rexmit_id)
 
 2805    assert(rexmit_id < acked_rexmit_id);
 
 2843    snd_stats.late_or_dupe_ack();
 
 2845    FLOW_LOG_INFO(
"NetFlow worker thread working on [" << sock << 
"].  " 
 2846                  "Acknowledged packet [" << seq_num << 
", " << seq_num_end << 
") " 
 2847                  "order_num [" << acked_pkt.
m_sent_when[rexmit_id].m_order_num << 
"] " 
 2848                  "rexmit_id [" << 
int(rexmit_id) << 
"] " 
 2849                  "is less than highest sent [" << 
int(acked_rexmit_id) << 
"].  Ignoring.");
 
 2852    *dupe_or_late = 
true;
 
 2853    assert(*acked_pkt_it != snd_flying_pkts_by_when.past_oldest()); 
 
 2857  assert(rexmit_id == acked_rexmit_id);
 
 2862  snd_stats.good_ack(acked_pkt.
m_size);
 
 2865  *dupe_or_late = 
false;
 
 2866  assert(*acked_pkt_it != snd_flying_pkts_by_when.past_oldest()); 
 
 2875  using boost::chrono::milliseconds;
 
 2876  using boost::chrono::round;
 
 2896  const unsigned int rexmit_id = ack->m_rexmit_id;
 
 2898  *sent_when = &(flying_pkt->m_sent_when[rexmit_id]);
 
 2907  const auto& ack_delay = ack->m_delay;
 
 2908  round_trip_time = time_now - (*sent_when)->m_sent_time - ack_delay;
 
 2910  if (round_trip_time.count() < 0)
 
 2919    FLOW_LOG_TRACE(
"Acknowledged packet [" << ack->m_seq_num << 
", ...) " 
 2920                   "order_num [" << order_num << 
"] has negative " 
 2921                   "RTT [" << round_trip_time << 
"]; assuming zero.  " 
 2922                   "Sent at [" << (*sent_when)->m_sent_time << 
"]; " 
 2923                   "received at [" << time_now << 
"]; " 
 2924                   "receiver-reported ACK delay [" << ack_delay << 
"].");
 
 2925    round_trip_time = Fine_duration::zero();
 
 2927  FLOW_LOG_TRACE(
"Acknowledged packet [" << ack->m_seq_num << 
", ...) " 
 2928                 "order_num [" << order_num << 
"] " 
 2929                 "has RTT [" << round<milliseconds>(round_trip_time) << 
"] " 
 2930                 "(ACK delay [" << round<milliseconds>(ack_delay) << 
"]).");
 
 2932  return round_trip_time;
 
 2937                                           const boost::unordered_set<Peer_socket::order_num_t>& flying_now_acked_pkts)
 
 2939  using std::priority_queue;
 
 2955  auto& snd_flying_pkts_by_when = sock->m_snd_flying_pkts_by_sent_when;
 
 3026  priority_queue<Peer_socket::order_num_t>
 
 3027    high_ack_count_q(flying_now_acked_pkts.begin(), flying_now_acked_pkts.end());
 
 3031  ack_count_t ack_increment_after_me = 0;
 
 3035  for (last_dropped_pkt_it = snd_flying_pkts_by_when.newest();
 
 3036       last_dropped_pkt_it != snd_flying_pkts_by_when.past_oldest();
 
 3037       ++last_dropped_pkt_it) 
 
 3053    while ((!high_ack_count_q.empty()) &&
 
 3055           (high_ack_count_q.top() > cur_pkt_sent_when.
m_order_num))
 
 3058      ++ack_increment_after_me; 
 
 3061      high_ack_count_q.pop(); 
 
 3068    if (cur_sent_pkt.
m_acks_after_me > S_MAX_LATER_ACKS_BEFORE_CONSIDERING_DROPPED)
 
 3082        get_seq_num_range(last_dropped_pkt_it, &cur_pkt_seq_num, &cur_pkt_seq_num_end);
 
 3085          (
"Unacknowledged packet [" << cur_pkt_seq_num << 
", " << cur_pkt_seq_num_end << 
") " 
 3086           "order_num [" << cur_pkt_sent_when.
m_order_num << 
"] has " 
 3088           "for later packets; considering it and " 
 3089           "all unacknowledged packets sent earlier as Dropped.");
 
 3099  return last_dropped_pkt_it;
 
 3104                             size_t* cong_ctl_dropped_pkts, 
size_t* cong_ctl_dropped_bytes,
 
 3105                             size_t* dropped_pkts, 
size_t* dropped_bytes,
 
 3106                             std::vector<Peer_socket::order_num_t>* pkts_marked_to_drop)
 
 3126  const bool rexmit_on = sock->rexmit_on();
 
 3127  auto& snd_flying_pkts_by_when = sock->m_snd_flying_pkts_by_sent_when;
 
 3128  auto& snd_stats = sock->m_snd_stats;
 
 3159  *dropped_pkts = snd_flying_pkts_by_when.size(); 
 
 3160  *dropped_bytes = sock->m_snd_flying_bytes; 
 
 3162  *cong_ctl_dropped_bytes = 0;
 
 3163  *cong_ctl_dropped_pkts = 0;
 
 3164  bool loss_event_finished = 
false;
 
 3179  auto& snd_rexmit_q = sock->m_snd_rexmit_q;
 
 3180  decltype(sock->m_snd_rexmit_q)::iterator snd_rexmit_q_fulcrum_it = snd_rexmit_q.end();
 
 3183  assert(pkts_marked_to_drop->empty());
 
 3185  auto pkt_it = last_dropped_pkt_it;
 
 3186  while (pkt_it != snd_flying_pkts_by_when.past_oldest())
 
 3189    auto next_pkt_it = boost::next(pkt_it);
 
 3197    if (!loss_event_finished)
 
 3202          && (sent_when.
m_sent_time < sock->m_snd_last_loss_event_when))
 
 3206        loss_event_finished = 
true;
 
 3211        *cong_ctl_dropped_bytes += sent_pkt->m_size;
 
 3212        ++(*cong_ctl_dropped_pkts);
 
 3221      if (!ok_to_rexmit_or_close(sock, pkt_it, 
true)) 
 
 3235      snd_rexmit_q_fulcrum_it = snd_rexmit_q.insert(snd_rexmit_q_fulcrum_it, sent_pkt);
 
 3236      ++sock->m_snd_rexmit_q_size;
 
 3246       "Scoreboard must not get otherwise changed when a packet is erased.");
 
 3247    pkts_marked_to_drop->push_back(sent_when.
m_order_num);
 
 3248    snd_flying_pkts_erase_one(sock, pkt_it);
 
 3250    pkt_it = next_pkt_it;
 
 3254  *dropped_pkts -= snd_flying_pkts_by_when.size(); 
 
 3255  *dropped_bytes -= sock->m_snd_flying_bytes; 
 
 3257  if (*cong_ctl_dropped_pkts != 0)
 
 3260    snd_stats.loss_event();
 
 3268  using boost::algorithm::join;
 
 3269  using boost::chrono::symbol_format;
 
 3272  using std::transform;
 
 3281  using Acks = vector<Ack::Ptr>;
 
 3282  const Acks& acked_packets = sock->m_rcv_acked_packets;
 
 3288    vector<string> ack_strs(acked_packets.size());
 
 3289    transform(acked_packets.begin(), acked_packets.end(), ack_strs.begin(),
 
 3290              [](Ack::Const_ptr ack) -> 
string 
 3292      return util::ostream_op_string(
'[', ack->m_seq_num, 
", ", int(ack->m_rexmit_id), 
", ",
 
 3296    const string ack_str = join(ack_strs, 
" ");
 
 3299                                   "Accumulated [ACK] packets with " 
 3300                                   "acknowledgments [seq_num, rexmit_id, delay]: " 
 3301                                   "[" << ack_str << 
"].");
 
 3305    FLOW_LOG_TRACE(
"NetFlow worker thread working on [" << sock << 
"].  " 
 3306                   "Accumulated [ACK] packets with " 
 3307                   "[" << acked_packets.size() << 
"] individual acknowledgments.");
 
 3312    log_snd_window(sock);
 
 3328  auto& snd_flying_pkts_by_when = sock->m_snd_flying_pkts_by_sent_when;
 
 3329  auto& snd_flying_pkts_by_seq = sock->m_snd_flying_pkts_by_seq_num;
 
 3332  assert(!snd_flying_pkts_by_when.empty());
 
 3339  log_snd_window(sock);
 
 3341  const bool rexmit_on = sock->rexmit_on();
 
 3343  const bool could_send_before_drops = can_send(sock);
 
 3345  const bool had_rexmit_data_before_drops = !sock->m_snd_rexmit_q.empty();
 
 3350  size_t cong_ctl_dropped_bytes = 0;
 
 3351  size_t cong_ctl_dropped_pkts = 0;
 
 3353  if (drop_all_packets)
 
 3355    cong_ctl_dropped_bytes = sock->m_snd_flying_bytes;
 
 3356    cong_ctl_dropped_pkts = snd_flying_pkts_by_when.size();
 
 3363           pkt_it != snd_flying_pkts_by_when.past_newest();
 
 3367        if (!ok_to_rexmit_or_close(sock, prior(pkt_it.base()), 
false)) 
 
 3378        sock->m_snd_rexmit_q.push_back(pkt_it->second); 
 
 3380      sock->m_snd_rexmit_q_size += cong_ctl_dropped_pkts;
 
 3387    snd_flying_pkts_updated(sock, snd_flying_pkts_by_when.newest(), snd_flying_pkts_by_when.past_oldest(), 
false);
 
 3388    snd_flying_pkts_by_when.clear();
 
 3389    snd_flying_pkts_by_seq.clear();
 
 3391    packet_marked_to_drop_or_drop_all = 0; 
 
 3399    cong_ctl_dropped_bytes = oldest_pkt->m_size;
 
 3400    cong_ctl_dropped_pkts = 1;
 
 3405      if (!ok_to_rexmit_or_close(sock, oldest_pkt_it, 
false)) 
 
 3412      sock->m_snd_rexmit_q.push_back(oldest_pkt); 
 
 3413      ++sock->m_snd_rexmit_q_size;
 
 3418    packet_marked_to_drop_or_drop_all = oldest_pkt->m_sent_when.back().m_order_num;
 
 3421    snd_flying_pkts_erase_one(sock, oldest_pkt_it);
 
 3446  FLOW_LOG_INFO(
"cong_ctl [" << sock << 
"] update: Drop Timeout event: " 
 3447                "Dropped [" << cong_ctl_dropped_bytes << 
"] bytes = [" << cong_ctl_dropped_pkts << 
"] packets.");
 
 3450  sock->m_snd_cong_ctl->on_drop_timeout(cong_ctl_dropped_bytes, cong_ctl_dropped_pkts);
 
 3451  sock->m_snd_last_loss_event_when = Fine_clock::now();
 
 3454  sock->m_snd_stats.drop_timeout();
 
 3455  sock->m_snd_stats.dropped_data(cong_ctl_dropped_bytes, cong_ctl_dropped_pkts);
 
 3458  log_snd_window(sock);
 
 3463  drop_timer->start_contemporaneous_events();
 
 3471  if (packet_marked_to_drop_or_drop_all == 0)
 
 3474    drop_timer->on_no_packets_in_flight_any_longer();
 
 3478    drop_timer->on_packet_no_longer_in_flight(packet_marked_to_drop_or_drop_all);
 
 3483  drop_timer->end_contemporaneous_events();
 
 3490  if ((!could_send_before_drops) || (
rexmit_on && (!had_rexmit_data_before_drops)))
 
 3492    send_worker(sock, 
false);
 
 3502  using boost::ratio_subtract;
 
 3503  using boost::ratio_string;
 
 3504  using boost::chrono::round;
 
 3505  using boost::chrono::milliseconds;
 
 3506  using boost::chrono::microseconds;
 
 3507  using boost::chrono::seconds;
 
 3536  if (srtt == Fine_duration::zero())
 
 3544                   "srtt = [" << round<milliseconds>(srtt) << 
" = " << srtt << 
"]; " 
 3545                   "rtt_var = [" << round<milliseconds>(rtt_var) << 
" = " << rtt_var << 
"]; " 
 3546                   "rtt = [" << rtt << 
"].");
 
 3564    using Alpha = ratio<1, 8>; 
 
 3565    using One_minus_alpha = ratio_subtract<ratio<1>, Alpha>;
 
 3566    using Beta = ratio<1, 4>; 
 
 3567    using One_minus_beta = ratio_subtract<ratio<1>, Beta>;
 
 3572    if (abs_srtt_minus_rtt.count() < 0)
 
 3574      abs_srtt_minus_rtt = -abs_srtt_minus_rtt;
 
 3579      = rtt_var * One_minus_beta::num / One_minus_beta::den
 
 3580        + abs_srtt_minus_rtt * Beta::num / Beta::den;
 
 3582      = srtt * One_minus_alpha::num / One_minus_alpha::den
 
 3583        + rtt * Alpha::num / Alpha::den;
 
 3587                   "srtt = [" << round<milliseconds>(srtt) << 
" = " << srtt << 
"]; " 
 3588                   "rtt_var = [" << round<milliseconds>(rtt_var) << 
" = " << rtt_var << 
"]; " 
 3589                   "rtt = [" << rtt << 
"]; " 
 3590                   "prev_srtt = [" << prev_srtt << 
"]; " 
 3591                   "prev_rtt_var = [" << prev_rtt_var << 
"]; " 
 3592                   "alpha = " << (ratio_string<Alpha, char>::prefix()) << 
"; " 
 3593                   "(1 - alpha) = " << (ratio_string<One_minus_alpha, char>::prefix()) << 
"; " 
 3594                   "beta = " << (ratio_string<Beta, char>::prefix()) << 
"; " 
 3595                   "(1 - beta) = " << (ratio_string<One_minus_beta, char>::prefix()) << 
"; " 
 3596                   "|srtt - rtt| = [" << abs_srtt_minus_rtt << 
"].");
 
 3613  const Fine_duration clock_resolution_at_least = microseconds(500);
 
 3615  const Fine_duration ceiling = sock->opt(sock->m_opts.m_dyn_drop_timeout_ceiling);
 
 3616  const unsigned int k = 4;
 
 3620  const Fine_duration srtt_plus_var_term = srtt + max(clock_resolution_at_least, rtt_var_k);
 
 3621  dto = max(srtt_plus_var_term, floor);
 
 3622  dto = min(dto, ceiling);
 
 3626                 "dto = [" << round<milliseconds>(dto) << 
" = " << dto << 
"]; " 
 3627                 "rtt_var * k = [" << rtt_var_k << 
"]; " 
 3628                 "srtt + max(G, rtt_var * k) = [" << srtt_plus_var_term << 
"]; " 
 3629                 "k = [" << k << 
"]; " 
 3630                 "floor = [" << floor << 
"]; ceiling = [" << ceiling << 
"]; " 
 3631                 "clock_resolution = [" << clock_resolution_at_least << 
"]; " 
 3632                 "prev_dto = [" << prev_dto << 
"].");
 
 3640  using boost::algorithm::join;
 
 3648  const auto& snd_flying_pkts_by_seq = sock->m_snd_flying_pkts_by_seq_num;
 
 3649  const auto& snd_flying_pkts_by_when = sock->m_snd_flying_pkts_by_sent_when;
 
 3650  const size_t num_flying_pkts = snd_flying_pkts_by_seq.size();
 
 3654  if (snd_flying_pkts_by_seq.empty())
 
 3658                           "Send window state for [" << sock << 
"]: cong_wnd " 
 3659                             "[" << sock->bytes_blocks_str(sock->m_snd_cong_ctl->congestion_window_bytes()) << 
"]; " 
 3660                             "sent+acked/dropped " 
 3661                             "[" << sock->m_snd_init_seq_num << 
", " << sock->m_snd_next_seq_num << 
") " 
 3662                             "unsent [" << sock->m_snd_next_seq_num << 
", ...).");
 
 3673      (
"Send window state for [" << sock << 
"]: cong_wnd " 
 3674       "[" << sock->bytes_blocks_str(sock->m_snd_cong_ctl->congestion_window_bytes()) << 
"]; " 
 3675       "sent+acked/dropped [" << sock->m_snd_init_seq_num << 
", " << snd_flying_pkts_by_seq.begin()->first << 
") " 
 3676       "in-flight [" << sock->m_snd_flying_bytes << 
"] bytes: " << num_flying_pkts << 
":{...} " 
 3677       "unsent [" << sock->m_snd_next_seq_num << 
", ...).");
 
 3684  const bool rexmit_on = sock->rexmit_on();
 
 3686  vector<string> pkt_strs;
 
 3687  pkt_strs.reserve(num_flying_pkts);
 
 3689       pkt_it_it != snd_flying_pkts_by_seq.end();
 
 3693    get_seq_num_range(pkt_it_it->second, &start, &end);
 
 3697    String_ostream pkt_str_os;
 
 3698    pkt_str_os.os() << 
'[' << start;
 
 3701      pkt_str_os.os() << 
'[' << int(sent_pkt->m_packet->m_rexmit_id) << 
'/' << sent_pkt->m_sent_when.back().m_order_num
 
 3706      pkt_str_os.os() << 
", ";
 
 3708    pkt_str_os.os() << end << 
")<" << sent_pkt->m_acks_after_me << 
"acks" << flush;
 
 3710    pkt_strs.push_back(pkt_str_os.str());
 
 3715     "Send window state for [" << sock << 
"]: cong_wnd " 
 3716       "[" << sock->bytes_blocks_str(sock->m_snd_cong_ctl->congestion_window_bytes()) << 
"]; " 
 3717       "sent+acked/dropped [" << sock->m_snd_init_seq_num << 
", " << snd_flying_pkts_by_seq.begin()->first << 
") " 
 3719       "[" << sock->m_snd_flying_bytes << 
"] bytes: " << num_flying_pkts << 
":{" << join(pkt_strs, 
" ") <<
 
 3720       "} unsent [" << sock->m_snd_next_seq_num << 
", ...).");
 
 3730  vector<string> pkt_strs_time;
 
 3731  pkt_strs_time.reserve(num_flying_pkts);
 
 3734       pkt_it != snd_flying_pkts_by_when.const_past_newest();
 
 3739    get_seq_num_range(prior(pkt_it.base()), &start, &end);
 
 3745                               start, 
'[', 
int(sent_pkt->m_packet->m_rexmit_id), 
'/',
 
 3746                               sent_pkt->m_sent_when.back().m_order_num, 
"], ", end, 
")<",
 
 3747                               sent_pkt->m_acks_after_me, 
"acks");
 
 3748    pkt_strs_time.push_back(pkt_str);
 
 3752  if (pkt_strs_time != pkt_strs)
 
 3756       "Sorted by time sent: {" << join(pkt_strs_time, 
" ") << 
"}.");
 
 3765  if (flying_packets.empty())
 
 3772  const Peer_socket::Sent_pkt_by_seq_num_map::value_type& highest_val = *(prior(flying_packets.end()));
 
 3776  advance_seq_num(&seq_num, highest_val.second->second->m_size);
 
 3791    get_seq_num_range(pkt_it, &seq_num, &seq_num_end);
 
 3793    if (sock->rexmit_on())
 
 3796        (
"On [" << sock << 
"] erasing packet [" << seq_num << 
", " << seq_num_end << 
") " 
 3797         "order_num [" << order_num << 
"] rexmit_id [" << 
int(sent_pkt.
m_packet->m_rexmit_id) << 
"] from " 
 3798         "snd_flying_pkts* and friends.");
 
 3803        (
"On [" << sock << 
"] erasing packet [" << seq_num << 
", " << seq_num_end << 
") " 
 3804         "order_num [" << order_num << 
"] from snd_flying_pkts* and friends.");
 
 3809  snd_flying_pkts_updated(sock, pkt_it, boost::next(pkt_it), 
false);
 
 3812  sock->m_snd_flying_pkts_by_seq_num.erase(pkt_it->first);
 
 3813  sock->m_snd_flying_pkts_by_sent_when.erase(pkt_it);
 
 3823  using std::make_pair;
 
 3827  auto& snd_flying_pkts_by_when = sock->m_snd_flying_pkts_by_sent_when;
 
 3830  const auto insert_result =
 
 3832    snd_flying_pkts_by_when.insert(make_pair(seq_num, sent_pkt));
 
 3836  assert(insert_result.second); 
 
 3837  assert(insert_result.first == pkt_it); 
 
 3839  snd_flying_pkts_updated(sock, pkt_it, boost::next(pkt_it), 
true); 
 
 3843  const auto insert_result_by_seq =
 
 3845    sock->m_snd_flying_pkts_by_seq_num.insert(make_pair(seq_num, pkt_it));
 
 3848  assert(insert_result_by_seq.second);
 
 3869  get_seq_num_range(pkt_it, 0, &seq_num_end);
 
 3870  if (sock->rexmit_on())
 
 3873      (
"On [" << sock << 
"] pushing packet [" << seq_num << 
", " << seq_num_end << 
") " 
 3874       "rexmit_id [" << 
int(sent_pkt->m_packet->m_rexmit_id) << 
"] onto snd_flying_pkts and friends.");
 
 3879      (
"On [" << sock << 
"] pushing packet [" << seq_num << 
", " << seq_num_end << 
") " 
 3880       "onto snd_flying_pkts and friends.");
 
 3891  if (pkt_begin == pkt_end)
 
 3897  const auto& snd_flying_pkts_by_when = sock->m_snd_flying_pkts_by_sent_when;
 
 3898  size_t& snd_flying_bytes = sock->m_snd_flying_bytes;
 
 3902      && (pkt_begin == snd_flying_pkts_by_when.const_newest())
 
 3903      && (pkt_end == snd_flying_pkts_by_when.const_past_oldest()))
 
 3905    snd_flying_bytes = 0;
 
 3909    size_t delta_bytes = 0;
 
 3910    for ( ; pkt_begin != pkt_end; ++pkt_begin)
 
 3912      delta_bytes += pkt_begin->second->m_size;
 
 3914    added ? (snd_flying_bytes += delta_bytes) : (snd_flying_bytes -= delta_bytes);
 
 3918                 "In-flight [" << sock->bytes_blocks_str(snd_flying_bytes) << 
"].");
 
 3923                                 bool defer_delta_check)
 
 3928  get_seq_num_range(pkt_it, &seq_num, &seq_num_end);
 
 3930  const unsigned int rexmit_id = pkt.
m_packet->m_rexmit_id;
 
 3931  FLOW_LOG_TRACE(
"On [" << sock << 
"] attempting to queue for retransmission " 
 3932                 "[" << seq_num << 
", " << seq_num_end << 
"] which has been " 
 3933                 "retransmitted [" << rexmit_id << 
"] times so far.");
 
 3934  if (rexmit_id == sock->opt(sock->m_opts.m_st_max_rexmissions_per_packet))
 
 3936    rst_and_close_connection_immediately(socket_id(sock), sock,
 
 3947  return connect_with_metadata(to, boost::asio::buffer(&S_DEFAULT_CONN_METADATA, 
sizeof(S_DEFAULT_CONN_METADATA)),
 
 3952                                             const boost::asio::const_buffer& serialized_metadata,
 
 3956  namespace bind_ns = util::bind_ns;
 
 3958                                     bind_ns::cref(to), bind_ns::cref(serialized_metadata), _1, sock_opts);
 
 3961  namespace bind_ns = util::bind_ns;
 
 3964  using bind_ns::bind;
 
 3991                     [&]() { connect_worker(to, serialized_metadata, sock_opts, &sock); });
 
 3995  if (sock->m_disconnect_cause)
 
 3997    *err_code = sock->m_disconnect_cause;
 
 4009  using boost::asio::buffer;
 
 4010  using boost::asio::ip::address;
 
 4018  auto& sock = *sock_ptr;
 
 4025    const bool opts_ok = sock_validate_options(*sock_opts, 0, &err_code);
 
 4028    sock.reset(sock_create(*sock_opts));
 
 4033      sock->m_disconnect_cause = err_code;
 
 4050      sock_non_ptr = sock_create(
m_opts.m_dyn_sock_opts);
 
 4052    sock.reset(sock_non_ptr);
 
 4057  sock->m_active_connect = 
true;
 
 4058  sock->m_node = 
this;
 
 4060  sock->m_remote_endpoint = to;
 
 4062  sock->m_serialized_metadata.assign_copy(serialized_metadata);
 
 4071  sock->m_snd_cong_ctl.reset
 
 4080  bool ip_addr_any_error = 
false;
 
 4084    if (addr.to_v4() == util::Ip_address_v4::any())
 
 4086      ip_addr_any_error = 
true;
 
 4089  else if (addr.is_v6())
 
 4091    if (addr.to_v6() == util::Ip_address_v6::any())
 
 4093      ip_addr_any_error = 
true;
 
 4097  if (ip_addr_any_error)
 
 4100    Error_code* err_code = &sock->m_disconnect_cause;
 
 4108  sock->m_local_port = m_ports.reserve_ephemeral_port(&sock->m_disconnect_cause);
 
 4117  FLOW_LOG_INFO(
"NetFlow worker thread starting active-connect of [" << sock << 
"].");
 
 4126    FLOW_LOG_WARNING(
"Cannot add [" << sock << 
"], because such a connection already exists.  " 
 4127                     "This is an ephemeral port collision and " 
 4128                     "constitutes either a bug or an extremely unlikely condition.");
 
 4131    Error_code* err_code = &sock->m_disconnect_cause;
 
 4136    m_ports.return_port(sock->m_local_port, &return_err_code);
 
 4137    assert(!return_err_code);
 
 4148  setup_connection_timers(socket_id, sock, 
true);
 
 4153  init_seq_num = m_seq_num_generator.generate_init_seq_num();
 
 4157  init_seq_num.
set_metadata(
'L', init_seq_num + 1, sock->max_block_size());
 
 4159  sock->m_snd_next_seq_num = init_seq_num + 1;
 
 4162  auto syn = create_syn(sock);
 
 4165  if (!async_sock_low_lvl_packet_send_paced(sock,
 
 4167                                            &sock->m_disconnect_cause))
 
 4173    m_ports.return_port(sock->m_local_port, &return_err_code);
 
 4174    assert(!return_err_code);
 
 4177    cancel_timers(sock);
 
 4185  m_socks[socket_id] = sock;
 
 4194  return sync_connect_with_metadata(to, Fine_duration::max(),
 
 4195                                    boost::asio::buffer(&S_DEFAULT_CONN_METADATA, 
sizeof(S_DEFAULT_CONN_METADATA)),
 
 4196                                    err_code, sock_opts);
 
 4200                                                  const boost::asio::const_buffer& serialized_metadata,
 
 4203  return sync_connect_with_metadata(to, Fine_duration::max(), serialized_metadata, err_code, opts);
 
 4207                                         const boost::asio::const_buffer& serialized_metadata,
 
 4210  namespace bind_ns = util::bind_ns;
 
 4212                                     bind_ns::cref(to), bind_ns::cref(max_wait), bind_ns::cref(serialized_metadata),
 
 4216  using util::bind_ns::bind;
 
 4243    event_set->close(&dummy_prevents_throw);
 
 4246  const auto sock = connect_with_metadata(to, serialized_metadata, err_code, sock_opts);
 
 4260                                                          &dummy_prevents_throw);
 
 4264  result = event_set->sync_wait(max_wait, err_code);
 
 4279    sock->close_abruptly(&dummy_prevents_throw); 
 
 4285  const bool ready = event_set->events_detected(err_code);
 
 4308      *err_code = sock->m_disconnect_cause; 
 
 4318  sock->close_abruptly(&dummy_prevents_throw);
 
 4327  using boost::chrono::microseconds;
 
 4328  using boost::chrono::duration_cast;
 
 4329  using boost::weak_ptr;
 
 4333  Fine_duration rexmit_from_now = sock->opt(sock->m_opts.m_st_connect_retransmit_period);
 
 4340    ++sock->m_init_rexmit_count;
 
 4350  sock->m_init_rexmit_scheduled_task
 
 4353                              sock_observer = weak_ptr<Peer_socket>(sock)]
 
 4356    auto sock = sock_observer.lock();
 
 4359      handle_connection_rexmit_timer_event(socket_id, sock);
 
 4367    sock->m_connection_timeout_scheduled_task
 
 4369                               sock->opt(sock->m_opts.m_st_connect_retransmit_timeout),
 
 4370                               true, &m_task_engine,
 
 4372                                sock_observer = weak_ptr<Peer_socket>(sock)]
 
 4377      auto sock = sock_observer.lock();
 
 4384      FLOW_LOG_INFO(
"Connection handshake timeout timer [" << sock << 
"] has been triggered; was on " 
 4385                    "attempt [" << (sock->m_init_rexmit_count + 1) << 
"].");
 
 4412  FLOW_LOG_INFO(
"Connection handshake retransmit timer [" << sock << 
"] triggered; was on " 
 4413                "attempt [" << (sock->m_init_rexmit_count + 1) << 
"].");
 
 4424  if (sock->m_active_connect)
 
 4468  sock->m_rcv_delayed_ack_timer.cancel();
 
 4469  sock->m_snd_pacing_data.m_slice_timer.cancel();
 
 4471  if (sock->m_init_rexmit_scheduled_task)
 
 4476  if (sock->m_connection_timeout_scheduled_task)
 
 4481  if (sock->m_rcv_in_rcv_wnd_recovery)
 
 4484    sock->m_rcv_in_rcv_wnd_recovery = 
false;
 
 4487  if (sock->m_snd_drop_timer)
 
 4490    sock->m_snd_drop_timer->done();
 
 4494    sock->m_snd_drop_timer.reset();
 
 4500  sock->m_snd_drop_timeout = sock->opt(sock->m_opts.m_st_init_drop_timeout);
 
 4507  const auto on_timer = [
this, 
socket_id, sock](
bool drop_all_packets)
 
 4521                  const Function<
size_t (
size_t max_data_size)>& snd_buf_feed_func,
 
 4524  using boost::asio::post;
 
 4544  if (sock->m_disconnect_cause) 
 
 4578  const size_t sent = snd_buf_feed_func(sock->max_block_size_multiple(sock->m_opts.m_st_snd_buf_max_size));
 
 4581  sock->m_snd_stats.buffer_fed(sock->m_snd_buf.data_size());
 
 4702  if ((!was_deqable) && (sent != 0))
 
 4715  using boost::any_cast;
 
 4763  switch (sock->m_int_state)
 
 4778                  "in state [" << sock->m_int_state << 
"] " 
 4779                  "closed before asynchronous send_worker() could proceed.");
 
 4785                     "in state [" << sock->m_int_state << 
"] " 
 4786                     "somehow had send() called on it.");
 
 4794  using boost::asio::buffer;
 
 4797  using boost::ratio_string;
 
 4798  using boost::chrono::milliseconds;
 
 4799  using boost::chrono::round;
 
 4800  using boost::shared_ptr;
 
 4842  using Idle_timeout_dto_factor = ratio<110, 100>;
 
 4844    = sock->m_snd_drop_timeout * Idle_timeout_dto_factor::num / Idle_timeout_dto_factor::den;
 
 4845  const Fine_duration since_last_send = Fine_clock::now() - sock->m_snd_last_data_sent_when;
 
 4847  if ((sock->m_snd_last_data_sent_when != 
Fine_time_pt()) && (since_last_send > idle_timeout))
 
 4850    FLOW_LOG_INFO(
"Idle timeout triggered for [" << sock << 
"]; " 
 4851                  "last activity [" << round<milliseconds>(since_last_send) << 
"] ago " 
 4852                  "exceeds idle timeout [" << round<milliseconds>(idle_timeout) << 
"] " 
 4853                  "= " << (ratio_string<Idle_timeout_dto_factor, char>::prefix()) << 
" x " 
 4854                  "[" << round<milliseconds>(sock->m_snd_drop_timeout) << 
"].");
 
 4855    sock->m_snd_cong_ctl->on_idle_timeout();
 
 4856    sock->m_snd_stats.idle_timeout();
 
 4865                   "Initial check: can_send() is false.");
 
 4876  const bool rexmit_on = sock->rexmit_on();
 
 4885                     "Initial check: can_send() is true, but no data to send.");
 
 4892    list<Peer_socket::Sent_packet::Ptr>& rexmit_q = sock->m_snd_rexmit_q;
 
 4893    size_t& rexmit_q_size = sock->m_snd_rexmit_q_size;
 
 4900                   "Initial check: Will send from rexmit queue of size [" << rexmit_q_size << 
"] and/or " 
 4901                   "Send buffer with total size [" << snd_buf.
data_size() << 
"].");
 
 4908      shared_ptr<Data_packet> data;
 
 4910      bool rexmit = 
false;
 
 4918      if (rexmit_q.empty())
 
 4923        data = Low_lvl_packet::create_uninit_packet<Data_packet>(
get_logger());
 
 4924        data->m_rexmit_id = 0; 
 
 4935        assert(!data->m_data.empty());
 
 4938        data->m_seq_num = snd_next_seq_num;
 
 4962        sent_pkt = rexmit_q.front();
 
 4965        rexmit_q.pop_front();
 
 4968        data = sent_pkt->m_packet;
 
 4971        ++data->m_rexmit_id;
 
 4974        sent_pkt->m_sent_when.push_back(sent_when);
 
 4977        sent_pkt->m_acks_after_me = 0;
 
 5006      sock->m_snd_stats.data_sent(data->m_data.size(), rexmit);
 
 5012                   "can_send() == [" << 
can_send(sock) << 
"]; " 
 5013                   "snd_deqable() == [" << 
snd_deqable(sock) << 
"].");
 
 5066  const size_t pipe_taken = sock->m_snd_flying_bytes;
 
 5067  const size_t cong_wnd = sock->m_snd_cong_ctl->congestion_window_bytes();
 
 5068  const size_t& rcv_wnd = sock->m_snd_remote_rcv_wnd; 
 
 5070  const size_t pipe_total = min(cong_wnd, rcv_wnd);
 
 5073    = (pipe_taken < pipe_total) && ((pipe_total - pipe_taken) >= sock->max_block_size());
 
 5075  FLOW_LOG_TRACE(
"cong_ctl [" << sock << 
"] info: can_send = [" << can << 
"]; " 
 5076                 "pipe_taken = [" << sock->bytes_blocks_str(pipe_taken) << 
"]; " 
 5077                 "cong_wnd = [" << sock->bytes_blocks_str(cong_wnd) << 
"]; " 
 5078                 "rcv_wnd = [" << sock->bytes_blocks_str(rcv_wnd) << 
"].");
 
 5084                     const Function<
size_t ()>& rcv_buf_consume_func,
 
 5087  using boost::asio::post;
 
 5115  const bool no_bytes_available = sock->m_rcv_buf.empty();
 
 5116  const size_t bytes_consumed = rcv_buf_consume_func();
 
 5118  if (bytes_consumed != 0)
 
 5126                   "has successfully returned [" << bytes_consumed << 
"] bytes.");
 
 5135    if (sock->m_rcv_buf.empty()
 
 5151    return bytes_consumed;
 
 5160                   "has successfully returned no bytes because still not fully connected.");
 
 5182  if (!no_bytes_available)
 
 5186                   "has data to return, but the provided buffer size is too small.");
 
 5194                 "returning no data because Receive buffer empty.");
 
 5235  using boost::any_cast;
 
 5325    FLOW_LOG_INFO(
'[' << sock << 
"] Receive buffer space freed, " 
 5326                  "but state is now [" << sock->m_int_state << 
"]; ignoring.");
 
 5331  if (sock->m_rcv_in_rcv_wnd_recovery)
 
 5337    FLOW_LOG_TRACE(
'[' << sock << 
"] Receive buffer space freed, but " 
 5338                   "we are already in rcv_wnd recovery mode.  Nothing to do.");
 
 5347  const size_t& last_rcv_wnd = sock->m_rcv_last_sent_rcv_wnd;
 
 5349  if (rcv_wnd <= last_rcv_wnd)
 
 5353    FLOW_LOG_TRACE(
'[' << sock << 
"] Receive buffer space freed, but " 
 5354                   "free space [" << sock->bytes_blocks_str(rcv_wnd) << 
"] <= prev " 
 5355                   "free space [" << sock->bytes_blocks_str(last_rcv_wnd) << 
"].  Nothing to do.");
 
 5360  const size_t diff = rcv_wnd - last_rcv_wnd;
 
 5361  const unsigned int pct = sock->opt(sock->m_opts.m_st_rcv_buf_max_size_to_advertise_percent);
 
 5362  const size_t max_rcv_buf_size = sock->max_block_size_multiple(sock->m_opts.m_st_rcv_buf_max_size);
 
 5363  const size_t min_inc = max_rcv_buf_size * pct / 100;
 
 5369                   "freed is [" << sock->bytes_blocks_str(diff) << 
"] since last advertisement; " 
 5370                   "< threshold [" << pct << 
"%] x " 
 5371                   "[" << sock->bytes_blocks_str(max_rcv_buf_size) << 
"] = " 
 5372                   "[" << sock->bytes_blocks_str(min_inc) << 
"].  Not advertising rcv_wnd yet.");
 
 5379                "freed is [" << sock->bytes_blocks_str(diff) << 
"] since last advertisement; " 
 5380                "rcv_wnd = [" << sock->bytes_blocks_str(rcv_wnd) << 
"]; " 
 5381                ">= threshold [" << pct << 
"%] x " 
 5382                "[" << sock->bytes_blocks_str(max_rcv_buf_size) << 
"] = " 
 5383                "[" << sock->bytes_blocks_str(min_inc) << 
"].  Sending unsolicited rcv_wnd-advertising ACK " 
 5384                "and entering rcv_wnd recovery.");
 
 5387  sock->m_rcv_in_rcv_wnd_recovery = 
true;
 
 5389  sock->m_rcv_wnd_recovery_start_time = Fine_clock::now();
 
 5392  sock->m_rcv_stats.rcv_wnd_recovery_event_start();
 
 5399  using boost::chrono::milliseconds;
 
 5400  using boost::chrono::round;
 
 5401  using boost::weak_ptr;
 
 5408  auto ack = Low_lvl_packet::create_uninit_packet<Ack_packet>(
get_logger());
 
 5409  ack->m_rcv_wnd = rcv_wnd;
 
 5411  sock->m_rcv_last_sent_rcv_wnd = rcv_wnd;
 
 5423  sock->m_rcv_stats.sent_low_lvl_ack_packet(
true);
 
 5427  const Fine_duration fire_when_from_now = sock->opt(sock->m_opts.m_dyn_rcv_wnd_recovery_timer_period);
 
 5430                "[" << round<milliseconds>(fire_when_from_now) << 
"] from now.");
 
 5439  sock->m_rcv_wnd_recovery_scheduled_task
 
 5441                             [
this, sock_observer = weak_ptr<Peer_socket>(sock)](
bool)
 
 5445    auto sock = sock_observer.lock();
 
 5452    const Fine_duration since_recovery_started = Fine_clock::now() - sock->m_rcv_wnd_recovery_start_time;
 
 5453    if (since_recovery_started > sock->opt(sock->m_opts.m_dyn_rcv_wnd_recovery_max_period))
 
 5458      FLOW_LOG_INFO(
'[' << sock << 
"]: still no new DATA arrived since last rcv_wnd advertisement; " 
 5459                    "Time since entering recovery [" << round<milliseconds>(since_recovery_started) << 
"] expired.  " 
 5460                    "Ending rcv_wnd recovery.");
 
 5461      sock->m_rcv_in_rcv_wnd_recovery = 
false;
 
 5464      sock->m_rcv_stats.rcv_wnd_recovery_event_finish(
false);
 
 5476    FLOW_LOG_INFO(
'[' << sock << 
"]: still no new DATA arrived since last rcv_wnd advertisement; " 
 5477                  "rcv_wnd = [" << sock->bytes_blocks_str(rcv_wnd) << 
"]; " 
 5478                  "time since entering recovery [" << round<milliseconds>(since_recovery_started) << 
"].  " 
 5479                  "Sending unsolicited rcv_wnd-advertising ACK and continuing rcv_wnd recovery.");
 
 5487  using boost::chrono::milliseconds;
 
 5488  using boost::chrono::round;
 
 5495  if (!sock->m_rcv_in_rcv_wnd_recovery)
 
 5503  FLOW_LOG_INFO(
'[' << sock << 
"]: Canceling rcv_wnd recovery; " 
 5504                "Time since entering recovery " 
 5505                "[" << round<milliseconds>(Fine_clock::now() - sock->m_rcv_wnd_recovery_start_time) << 
"].");
 
 5507  sock->m_rcv_in_rcv_wnd_recovery = 
false;
 
 5509  const bool canceled =
 
 5515  sock->m_rcv_stats.rcv_wnd_recovery_event_finish(
true);
 
 5520  using std::numeric_limits;
 
 5524  if (!sock->opt(sock->m_opts.m_st_rcv_flow_control_on))
 
 5529    return numeric_limits<size_t>::max();
 
 5534  size_t rcv_buf_size;
 
 5537    rcv_buf_size = sock->m_rcv_buf.data_size();
 
 5541  if (sock->rexmit_on())
 
 5543    rcv_buf_size += sock->m_rcv_reassembly_q_data_size; 
 
 5546  const size_t max_rcv_buf_size = sock->max_block_size_multiple(sock->m_opts.m_st_rcv_buf_max_size);
 
 5548  return (max_rcv_buf_size > rcv_buf_size) ? (max_rcv_buf_size - rcv_buf_size) : 0;
 
 5570                  "was completely closed before asynchronous " 
 5571                  "receive_emptied_rcv_buf_while_disconnecting() could proceed.");
 
 5589                   "is gracefully closing, and Receive buffer is empty, but graceful close itself not yet finished.");
 
 5595  if (!sock->m_rcv_buf.empty())
 
 5600                   "is gracefully closing, but Receive buffer has data again.");
 
 5607                 "is gracefully closing, and Receive buffer is now empty.  Ready to permanently close.");
 
 5617  using boost::adopt_lock;
 
 5661      *err_code = sock->m_disconnect_cause;
 
 5684                                        const Error_code& err_code, 
bool defer_delta_check)
 
 5686  using boost::lexical_cast;
 
 5697    FLOW_LOG_INFO(
"Closing and destroying [" << sock << 
"] abruptly.");
 
 5702    FLOW_LOG_INFO(
"Closing and destroying [" << sock << 
"] after graceful close.");
 
 5722    sock->m_info_on_close.m_disconnect_cause = err_code;
 
 5747  const auto erased = 1 ==
 
 5753  if (!sock->m_active_connect)
 
 5761    Port_to_server_map::const_iterator port_to_server_it = 
m_servs.find(sock->m_local_port);
 
 5762    if (port_to_server_it != 
m_servs.end()) 
 
 5777  if (sock->m_active_connect)
 
 5781    assert(!return_err_code);
 
 5796  if (inserted_rd || inserted_wr) 
 
 5804                                                const Error_code& err_code, 
bool defer_delta_check)
 
 5815  auto syn = Low_lvl_packet::create_uninit_packet<Syn_packet>(
get_logger());
 
 5817  syn->m_init_seq_num = sock->m_snd_init_seq_num;
 
 5821  syn->m_serialized_metadata = 
static_cast<const Blob&
>(sock->m_serialized_metadata);
 
 5828  auto syn_ack = Low_lvl_packet::create_uninit_packet<Syn_ack_packet>(
get_logger());
 
 5830  syn_ack->m_init_seq_num = sock->m_snd_init_seq_num;
 
 5832  syn_ack->m_packed.m_security_token = sock->m_security_token;
 
 5834  syn_ack->m_packed.m_rcv_wnd = sock->m_rcv_last_sent_rcv_wnd;
 
 5840                                                               boost::shared_ptr<const Syn_ack_packet>& syn_ack)
 
 5843  auto syn_ack_ack = Low_lvl_packet::create_uninit_packet<Syn_ack_ack_packet>(
get_logger());
 
 5846  syn_ack_ack->m_packed.m_security_token = syn_ack->m_packed.m_security_token;
 
 5848  syn_ack_ack->m_packed.m_rcv_wnd = sock->m_rcv_last_sent_rcv_wnd = 
sock_rcv_wnd(sock);
 
 5859  using boost::chrono::milliseconds;
 
 5860  using boost::chrono::duration_cast;
 
 5861  using std::make_pair;
 
 5863  using std::numeric_limits;
 
 5870  vector<Peer_socket::Individual_ack::Ptr>& pending_acks = sock->m_rcv_pending_acks;
 
 5872  if (sys_err_code == boost::asio::error::operation_aborted)
 
 5875                   "pending acknowledgment count [" << pending_acks.size() << 
"].");
 
 5880  FLOW_LOG_TRACE(
"Delayed [ACK] timer [" << sock << 
"] triggered, or ACK forced; " 
 5881                 "pending acknowledgment count [" << pending_acks.size() << 
"].");
 
 5894    FLOW_LOG_TRACE(
"Delayed [ACK] timer [" << sock << 
"] triggered, " 
 5895                   "but socket already in inapplicable state [" << sock->m_int_state << 
"].  Ignoring.");
 
 5900  if (pending_acks.empty())
 
 5905                     "but socket has no pending acknowledgments.  This is likely an internal bug.  Ignoring.");
 
 5932  const size_t& rcv_wnd = sock->m_rcv_last_sent_rcv_wnd = 
sock_rcv_wnd(sock);
 
 5934  auto ack = Low_lvl_packet::create_uninit_packet<Ack_packet>(
get_logger());
 
 5935  ack->m_rcv_wnd = rcv_wnd; 
 
 5940  if (sock->rexmit_on())
 
 5962      sock->m_rcv_stats.sent_low_lvl_ack_packet(
false);
 
 5965      ack = Low_lvl_packet::create_uninit_packet<Ack_packet>(
get_logger());
 
 5966      ack->m_rcv_wnd = rcv_wnd; 
 
 5994    if (delay.count() < 0)
 
 5999                       "delay for packet [" << seq_num << 
", ...) is " 
 6000                       "negative: [" << delay << 
"]; using zero.");
 
 6001      delay = Fine_duration::zero();
 
 6016    if (uint64_t(pkt_delay.count()) > uint64_t(MAX_DELAY_VALUE))
 
 6021                       "delay for packet [" << seq_num << 
", ...) is [" << pkt_delay << 
"]; overflow; " 
 6022                       "using max value [" << MAX_DELAY_VALUE << 
"] units.");
 
 6028    if (sock->rexmit_on())
 
 6030      ack->m_rcv_acked_packets_rexmit_on_out.push_back
 
 6032                                              ind_ack->m_rexmit_id,
 
 6037      ack->m_rcv_acked_packets_rexmit_off_out.push_back
 
 6041    size_est_so_far += size_est_inc;
 
 6044    sock->m_rcv_stats.sent_individual_ack();
 
 6048  if ((size_est_so_far != 0)
 
 6051                                                               defer_delta_check)))
 
 6057  sock->m_rcv_stats.sent_low_lvl_ack_packet(
false);
 
 6060  pending_acks.clear();
 
 6063  sock->m_rcv_stats.current_pending_to_ack_packets(0);
 
 6071  return Socket_id{ sock->remote_endpoint(), sock->local_port() };
 
 6077  return !(sock->m_snd_rexmit_q.empty() && sock->m_snd_buf.empty());
 
 6093  return sock->m_snd_buf.data_size() + sock->max_block_size()
 
 6094           <= sock->opt(sock->m_opts.m_st_snd_buf_max_size);
 
 6100  return !sock->m_rcv_buf.empty();
 
 6108                 sock->m_int_state << 
"] to [" << new_state << 
"].");
 
 6109  sock->m_int_state = new_state;
 
 6118  sock->m_state = state;
 
 6121    sock->m_open_sub_state = open_sub_state;
 
 6138  sock->m_disconnect_cause = disconnect_cause;
 
 6160  assert(sock->m_disconnect_cause);
 
 6170  sock->m_rcv_buf.clear();
 
 6171  sock->m_snd_buf.clear();
 
 6172  sock->m_rcv_packets_with_gaps.clear();
 
 6173  sock->m_rcv_reassembly_q_data_size = 0;
 
 6174  sock->m_snd_flying_pkts_by_sent_when.clear();
 
 6175  sock->m_snd_flying_pkts_by_seq_num.clear();
 
 6176  sock->m_snd_rexmit_q.clear();
 
 6177  sock->m_serialized_metadata.make_zero(); 
 
 6178  sock->m_rcv_syn_rcvd_data_q.clear();
 
 6179  sock->m_rcv_pending_acks.clear();
 
 6180  sock->m_rcv_acked_packets.clear();
 
 6181  sock->m_snd_pacing_data.m_packet_q.clear();
 
 6186  sock->m_snd_cong_ctl.reset();
 
 6188  sock->m_snd_bandwidth_estimator.reset();
 
 6221  sock->m_opts = opts;
 
 6231#define VALIDATE_STATIC_OPTION(ARG_opt) \ 
 6232  validate_static_option(opts.ARG_opt, prev_opts->ARG_opt, #ARG_opt, err_code) 
 6233#define VALIDATE_CHECK(ARG_check) \ 
 6234  validate_option_check(ARG_check, #ARG_check, err_code) 
 6256  using boost::chrono::seconds;
 
 6257  using std::numeric_limits;
 
 6268    const bool static_ok
 
 6269      = VALIDATE_STATIC_OPTION(m_st_max_block_size) &&
 
 6270        VALIDATE_STATIC_OPTION(m_st_connect_retransmit_period) &&
 
 6271        VALIDATE_STATIC_OPTION(m_st_connect_retransmit_timeout) &&
 
 6272        VALIDATE_STATIC_OPTION(m_st_snd_buf_max_size) &&
 
 6273        VALIDATE_STATIC_OPTION(m_st_rcv_buf_max_size) &&
 
 6274        VALIDATE_STATIC_OPTION(m_st_rcv_flow_control_on) &&
 
 6275        VALIDATE_STATIC_OPTION(m_st_rcv_buf_max_size_slack_percent) &&
 
 6276        VALIDATE_STATIC_OPTION(m_st_rcv_buf_max_size_to_advertise_percent) &&
 
 6277        VALIDATE_STATIC_OPTION(m_st_rcv_max_packets_after_unrecvd_packet_ratio_percent) &&
 
 6278        VALIDATE_STATIC_OPTION(m_st_delayed_ack_timer_period) &&
 
 6279        VALIDATE_STATIC_OPTION(m_st_max_full_blocks_before_ack_send) &&
 
 6280        VALIDATE_STATIC_OPTION(m_st_rexmit_on) &&
 
 6281        VALIDATE_STATIC_OPTION(m_st_max_rexmissions_per_packet) &&
 
 6282        VALIDATE_STATIC_OPTION(m_st_init_drop_timeout) &&
 
 6283        VALIDATE_STATIC_OPTION(m_st_snd_pacing_enabled) &&
 
 6284        VALIDATE_STATIC_OPTION(m_st_snd_bandwidth_est_sample_period_floor) &&
 
 6285        VALIDATE_STATIC_OPTION(m_st_cong_ctl_strategy) &&
 
 6286        VALIDATE_STATIC_OPTION(m_st_cong_ctl_init_cong_wnd_blocks) &&
 
 6287        VALIDATE_STATIC_OPTION(m_st_cong_ctl_max_cong_wnd_blocks) &&
 
 6288        VALIDATE_STATIC_OPTION(m_st_cong_ctl_cong_wnd_on_drop_timeout_blocks) &&
 
 6289        VALIDATE_STATIC_OPTION(m_st_cong_ctl_classic_wnd_decay_percent) &&
 
 6290        VALIDATE_STATIC_OPTION(m_st_drop_packet_exactly_after_drop_timeout) &&
 
 6291        VALIDATE_STATIC_OPTION(m_st_drop_all_on_drop_timeout) &&
 
 6292        VALIDATE_STATIC_OPTION(m_st_out_of_order_ack_restarts_drop_timer);
 
 6303  const bool checks_ok
 
 6334#undef VALIDATE_CHECK 
 6335#undef VALIDATE_STATIC_OPTION 
 6342  using boost::adopt_lock;
 
 6384  using boost::lexical_cast;
 
 6389  stats->
m_rcv = sock->m_rcv_stats.stats();
 
 6390  stats->
m_snd = sock->m_snd_stats.stats();
 
 6412    = sock->m_rcv_syn_rcvd_data_q.empty() ? 0 : sock->m_rcv_syn_rcvd_data_cumulative_size;
 
 6428    = util::to_mbit_per_sec<Send_bandwidth_estimator::Time_unit>
 
 6429        (sock->m_snd_bandwidth_estimator->bandwidth_bytes_per_time());
 
 6446  FLOW_LOG_INFO(
"[=== Socket state for [" << sock << 
"]. ===\n" << stats);
 
 6453  FLOW_LOG_INFO(
"=== Socket state for [" << sock << 
"]. ===]");
 
 6475  *seq_num += data_size;
 
 6478template<
typename Packet_map_iter>
 
 6485    *seq_num_start = seq_num_start_cref;
 
 6489    *seq_num_end = seq_num_start_cref;
 
 6497  return ++sock->m_snd_last_order_num;
 
 6503  return sock_create_forward_plus_ctor_args<Peer_socket>(opts);
 
 6513         << 
"NetFlow_socket " 
 6515         "@" << 
static_cast<const void*
>(sock))
 
 6516      : (os << 
"NetFlow_socket@null");
 
 6524#define STATE_TO_CASE_STATEMENT(ARG_state) \ 
 6525  case Peer_socket::Int_state::S_##ARG_state: \ 
 6526    return os << #ARG_state 
 6535    STATE_TO_CASE_STATEMENT(CLOSED);
 
 6536    STATE_TO_CASE_STATEMENT(SYN_SENT);
 
 6537    STATE_TO_CASE_STATEMENT(SYN_RCVD);
 
 6538    STATE_TO_CASE_STATEMENT(ESTABLISHED);
 
 6541#undef STATE_TO_CASE_STATEMENT 
const Component & get_log_component() const
Returns reference to the stored Component object, particularly as many FLOW_LOG_*() macros expect.
Logger * get_logger() const
Returns the stored Logger pointer, particularly as many FLOW_LOG_*() macros expect.
Interface that the user should implement, passing the implementing Logger into logging classes (Flow'...
static Congestion_control_strategy * create_strategy(Strategy_choice strategy_choice, log::Logger *logger_ptr, Peer_socket::Const_ptr sock)
Factory method that, given an enum identifying the desired strategy, allocates the appropriate Conges...
static Ptr create_drop_timer(log::Logger *logger_ptr, util::Task_engine *node_task_engine, Fine_duration *sock_drop_timeout, Peer_socket::Const_ptr &&sock, const Function< void(const Error_code &err_code)> &timer_failure, const Function< void(bool drop_all_packets)> &timer_fired)
Constructs Drop_timer and returns a ref-counted pointer wrapping it.
@ S_PEER_SOCKET_WRITABLE
Event type specifying the condition of interest wherein a target Peer_socket sock is such that callin...
@ S_PEER_SOCKET_READABLE
Event type specifying the condition of interest wherein a target Peer_socket sock is such that callin...
An object of this class is a single Flow-protocol networking node, in the sense that: (1) it has a di...
void snd_flying_pkts_updated(Peer_socket::Ptr sock, Peer_socket::Sent_pkt_ordered_by_when_const_iter pkt_begin, const Peer_socket::Sent_pkt_ordered_by_when_const_iter &pkt_end, bool added)
Updates Peer_socket::m_snd_flying_bytes according to an operation (add packets, remove packets) calle...
bool categorize_individual_ack(const Socket_id &socket_id, Peer_socket::Ptr sock, Ack_packet::Individual_ack::Const_ptr ack, bool *dupe_or_late, Peer_socket::Sent_pkt_ordered_by_when_iter *acked_pkt_it)
Helper of perform_accumulated_on_recv_tasks() that categorizes the given accumulated individual ackno...
void handle_data_to_established(const Socket_id &socket_id, Peer_socket::Ptr sock, boost::shared_ptr< Data_packet > packet, bool syn_rcvd_qd_packet)
Handles a just-deserialized, just-demultiplexed, low-level DATA packet delivered to the given peer so...
bool sock_is_writable(const boost::any &sock_as_any) const
Returns true if and only if calling sock->send() with at least some arguments would return either non...
Peer_socket_info sock_info(Peer_socket::Const_ptr sock)
Implementation of sock->info() for socket sock in all cases except when sock->state() == Peer_socket:...
void receive_wnd_updated(Peer_socket::Ptr sock)
Placed by receive() onto W if it has dequeued data from Receive buffer and given it to the user,...
void sock_track_new_data_after_gap_rexmit_off(Peer_socket::Ptr sock, boost::shared_ptr< const Data_packet > packet, size_t data_size, bool *slide, size_t *slide_size)
Helper for handle_data_to_established() that aims to register the given DATA packet as an out-of-orde...
bool sock_data_to_reassembly_q_unless_overflow(Peer_socket::Ptr sock, boost::shared_ptr< Data_packet > packet)
Helper for handle_data_to_established() that aims to register the given DATA packet as an out-of-orde...
static bool ensure_sock_open(Socket_ptr sock, Error_code *err_code)
Helper method that checks whether the given Peer_socket or Server_socket is CLOSED; if so,...
void send_worker(Peer_socket::Ptr sock, bool defer_delta_check)
Thread W implemention of send(): synchronously or asynchronously send the contents of sock->m_snd_buf...
void handle_accumulated_acks(const Socket_id &socket_id, Peer_socket::Ptr sock)
Helper of perform_accumulated_on_recv_tasks() that handles any incoming acknowledgments and rcv_wnd u...
void async_rcv_wnd_recovery(Peer_socket::Ptr sock, size_t rcv_wnd)
receive_wnd_updated() helper that continues rcv_wnd recovery: that is, sends unsolicited ACK with a r...
void log_accumulated_acks(Peer_socket::Const_ptr sock) const
Helper of handle_accumulated_acks() that logs the about-to-be-handled accumulated individual acknowle...
void sock_free_memory(Peer_socket::Ptr sock)
Helper that clears all non-O(1)-space data structures stored inside sock.
void sock_load_info_struct(Peer_socket::Const_ptr sock, Peer_socket_info *stats) const
Given a Peer_socket, copies all stats info (as available via Peer_socket::info()) from various struct...
void log_snd_window(Peer_socket::Const_ptr sock, bool force_verbose_info_logging=false) const
Logs TRACE or DATA messages thats show the detailed state of the sending sequence number space.
void send_worker_check_state(Peer_socket::Ptr sock)
Helper placed by send() onto W to invoke send_worker() but ensures that the socket has not entered so...
size_t m_low_lvl_max_buf_size
OS-reported m_low_lvl_sock UDP receive buffer maximum size, obtained right after we OS-set that setti...
Non_blocking_func_ret_type sync_op(typename Socket::Ptr sock, const Function< Non_blocking_func_ret_type()> &non_blocking_func, Non_blocking_func_ret_type would_block_ret_val, Event_set::Event_type ev_type, const Fine_time_pt &wait_until, Error_code *err_code)
Implementation of core blocking transfer methods, namely Peer_socket::sync_send(),...
size_t sock_max_packets_after_unrecvd_packet(Peer_socket::Const_ptr sock) const
Computes and returns the max size for Peer_socket::m_rcv_packets_with_gaps for sock.
Peer_socket::Sent_pkt_ordered_by_when_iter categorize_pkts_as_dropped_on_acks(Peer_socket::Ptr sock, const boost::unordered_set< Peer_socket::order_num_t > &flying_now_acked_pkts)
Helper of perform_accumulated_on_recv_tasks() that determines the range of In-flight packets that sho...
void rcv_get_first_gap_info(Peer_socket::Const_ptr sock, bool *first_gap_exists, Sequence_number *seq_num_after_first_gap)
Helper for handle_data_to_established() that gets simple info about Peer_socket::m_rcv_packets_with_g...
bool snd_deqable(Peer_socket::Const_ptr sock) const
Return true if and only if there are enough data either in Peer_socket::m_snd_rexmit_q of sock (if re...
void cancel_timers(Peer_socket::Ptr sock)
Cancel any timers and scheduled tasks active in the given socket.
void sock_rcv_buf_now_readable(Peer_socket::Ptr sock, bool syn_rcvd_qd_packet)
Helper for handle_data_to_established() that assumes the given's socket Receive buffer is currently r...
void snd_flying_pkts_erase_one(Peer_socket::Ptr sock, Peer_socket::Sent_pkt_ordered_by_when_iter pkt_it)
Erases (for example if considered Acknowledged or Dropped) a packet struct from the "scoreboard" (Pee...
Opt_type opt(const Opt_type &opt_val_ref) const
Obtain a copy of the value of a given option in a thread-safe manner.
bool sock_validate_options(const Peer_socket_options &opts, const Peer_socket_options *prev_opts, Error_code *err_code) const
Analogous to validate_options() but checks per-socket options instead of per-Node options.
void handle_accumulated_pending_acks(const Socket_id &socket_id, Peer_socket::Ptr sock)
Helper of perform_accumulated_on_recv_tasks() that handles any additional individual outgoing acknowl...
void receive_wnd_recovery_data_received(Peer_socket::Ptr sock)
Pertaining to the async_rcv_wnd_recovery() mechanism, this handles the event that we have received an...
static Peer_socket::order_num_t sock_get_new_snd_order_num(Peer_socket::Ptr sock)
Returns the "order number" to use for Peer_socket::Sent_packet::Sent_when structure corresponding to ...
Peer_socket::Ptr sync_connect_impl(const Remote_endpoint &to, const Fine_duration &max_wait, const boost::asio::const_buffer &serialized_metadata, Error_code *err_code, const Peer_socket_options *opts)
Implementation core of sync_connect*() that gets rid of templated or missing arguments thereof.
size_t max_block_size() const
The maximum number of bytes of user data per received or sent block on connections generated from thi...
void snd_flying_pkts_push_one(Peer_socket::Ptr sock, const Sequence_number &seq_num, Peer_socket::Sent_packet::Ptr sent_pkt)
Adds a new packet struct (presumably representing packet to be sent shortly) to the "scoreboard" (Pee...
Syn_packet::Ptr create_syn(Peer_socket::Const_ptr sock)
Helper that creates a new SYN packet object to the extent that is suitable for immediately passing to...
void close_abruptly(Peer_socket::Ptr sock, Error_code *err_code)
Implementation of non-blocking sock->close_abruptly() for socket sock in all cases except when sock->...
static void get_seq_num_range(const Packet_map_iter &packet_it, Sequence_number *seq_num_start, Sequence_number *seq_num_end)
Given an iterator into a Peer_socket::Sent_pkt_by_sent_when_map or Peer_socket::Recv_pkt_map,...
Peer_socket::Ptr sync_connect_with_metadata(const Remote_endpoint &to, const boost::chrono::duration< Rep, Period > &max_wait, const boost::asio::const_buffer &serialized_metadata, Error_code *err_code=0, const Peer_socket_options *opts=0)
A combination of sync_connect() and connect_with_metadata() (blocking connect, with supplied metadata...
Syn_ack_packet::Ptr create_syn_ack(Peer_socket::Const_ptr sock)
Like create_syn() but for SYN_ACK.
virtual Peer_socket * sock_create(const Peer_socket_options &opts)
Internal factory used for ALL Peer_socket objects created by this Node (including subclasses).
bool snd_buf_enqable(Peer_socket::Const_ptr sock) const
Return true if and only if there is enough free space in Peer_socket::m_snd_buf of sock to enqueue an...
bool can_send(Peer_socket::Const_ptr sock) const
Answers the perennial question of congestion and flow control: assuming there is a DATA packet to sen...
void sock_slide_rcv_next_seq_num(Peer_socket::Ptr sock, size_t slide_size, bool reassembly_in_progress)
Helper for handle_data_to_established() that aims to register a set of received DATA packet data as i...
void sock_log_detail(Peer_socket::Const_ptr sock) const
Logs a verbose state report for the given socket.
static void advance_seq_num(Sequence_number *seq_num, boost::shared_ptr< const Data_packet > data)
Assuming *seq_num points to the start of data.m_data, increments *seq_num to point to the datum just ...
void async_low_lvl_ack_send(Peer_socket::Ptr sock, bool defer_delta_check, const Error_code &sys_err_code=Error_code())
Sends a low-level ACK packet, with all accumulated in Peer_socket::m_rcv_pending_acks of sock individ...
static Sequence_number snd_past_last_flying_datum_seq_num(Peer_socket::Const_ptr sock)
Obtain the sequence number for the datum just past the last (latest) In-flight (i....
Peer_socket::Ptr connect(const Remote_endpoint &to, Error_code *err_code=0, const Peer_socket_options *opts=0)
Initiates an active connect to the specified remote Flow server.
void event_set_all_check_delta(bool defer_delta_check)
For each WAITING Event_set within the Node: checks for any events that hold, and if any do hold,...
void serv_peer_socket_closed(Server_socket::Ptr serv, Peer_socket::Ptr sock)
Records that a Server_socket-contained (i.e., currently un-established, or established but not yet ac...
bool rcv_buf_deqable(Peer_socket::Const_ptr sock) const
Return true if and only if there are enough data in Peer_socket::m_rcv_buf of sock to give the user s...
void async_acknowledge_packet(Peer_socket::Ptr sock, const Sequence_number &seq_num, unsigned int rexmit_id, size_t data_size)
Causes an acknowledgment of the given received packet to be included in a future Ack_packet sent to t...
Socket_id_to_socket_map m_socks
The peer-to-peer connections this Node is currently tracking.
Peer_socket::Options_lock Options_lock
Short-hand for lock that acquires exclusive access to an Options_mutex.
static Socket_id socket_id(Peer_socket::Const_ptr sock)
Constructs the socket pair (connection ID) for the given socket.
void handle_syn_ack_to_syn_sent(const Socket_id &socket_id, Peer_socket::Ptr sock, boost::shared_ptr< const Syn_ack_packet > syn_ack)
Handles a just-deserialized, just-demultiplexed low-level SYN_ACK packet delivered to the given peer ...
size_t send(Peer_socket::Ptr sock, const Function< size_t(size_t max_data_size)> &snd_buf_feed_func, Error_code *err_code)
Implementation of non-blocking sock->send() for socket sock in all cases except when sock->state() ==...
void sock_set_int_state(Peer_socket::Ptr sock, Peer_socket::Int_state new_state)
Sets internal state of given socket to the given state and logs a TRACE message about it.
bool sock_is_readable(const boost::any &sock_as_any) const
Returns true if and only if calling sock->receive() with at least some arguments would return either ...
bool async_sock_low_lvl_packet_send_or_close_immediately(const Peer_socket::Ptr &sock, Low_lvl_packet::Ptr &&packet, bool defer_delta_check)
Similar to async_sock_low_lvl_packet_send_paced() except it also calls close_connection_immediately(s...
bool sock_data_to_rcv_buf_unless_overflow(Peer_socket::Ptr sock, boost::shared_ptr< Data_packet > packet)
Helper for handle_data_to_established() that aims to pass the payload of the given DATA packet to the...
bool sock_set_options(Peer_socket::Ptr sock, const Peer_socket_options &opts, Error_code *err_code)
Thread W implementation of sock->set_options().
bool running() const
Returns true if and only if the Node is operating.
Port_to_server_map m_servs
The server sockets this Node is currently tracking.
Event_set::Ev_type_to_socks_map m_sock_events
All sockets that have been detected to be "ready" (by the Event_set doc header definition) at any poi...
static const uint8_t S_DEFAULT_CONN_METADATA
Type and value to supply as user-supplied metadata in SYN, if user chooses to use [[a]sync_]connect()...
void setup_drop_timer(const Socket_id &socket_id, Peer_socket::Ptr sock)
Creates a new Drop Timer and saves it to sock->m_snd_drop_timer.
void handle_ack_to_established(Peer_socket::Ptr sock, boost::shared_ptr< const Ack_packet > ack)
Handles a just-deserialized, just-demultiplexed, low-level ACK packet delivered to the given peer soc...
Peer_socket::Ptr sync_connect(const Remote_endpoint &to, const boost::chrono::duration< Rep, Period > &max_wait, Error_code *err_code=0, const Peer_socket_options *opts=0)
The blocking (synchronous) version of connect().
void handle_syn_ack_to_established(Peer_socket::Ptr sock, boost::shared_ptr< const Syn_ack_packet > syn_ack)
Handles a just-deserialized, just-demultiplexed, duplicate (equal to already-received SYN_ACK) low-le...
void setup_connection_timers(const Socket_id &socket_id, Peer_socket::Ptr sock, bool initial)
Assuming we've just sent SYN or SYN_ACK, sets up an asynchronous scheduled task to fire within some a...
void log_rcv_window(Peer_socket::Const_ptr sock, bool force_verbose_info_logging=false) const
Logs TRACE or DATA messages that show the detailed state of the receiving sequence number space.
size_t sock_rcv_wnd(Peer_socket::Const_ptr sock) const
Computes and returns the currently correct rcv_wnd value; that is the amount of space free in Receive...
void connect_worker(const Remote_endpoint &to, const boost::asio::const_buffer &serialized_metadata, const Peer_socket_options *opts, Peer_socket::Ptr *sock)
Thread W implementation of connect().
bool drop_pkts_on_acks(Peer_socket::Ptr sock, const Peer_socket::Sent_pkt_ordered_by_when_iter &last_dropped_pkt_it, size_t *cong_ctl_dropped_pkts, size_t *cong_ctl_dropped_bytes, size_t *dropped_pkts, size_t *dropped_bytes, std::vector< Peer_socket::order_num_t > *pkts_marked_to_drop)
Helper of perform_accumulated_on_recv_tasks() that acts on the determination made by categorize_pkts_...
static const Peer_socket::Sent_packet::ack_count_t S_MAX_LATER_ACKS_BEFORE_CONSIDERING_DROPPED
For a given unacknowledged sent packet P, the maximum number of times any individual packet with high...
bool async_low_lvl_syn_ack_ack_send_or_close_immediately(const Peer_socket::Ptr &sock, boost::shared_ptr< const Syn_ack_packet > &syn_ack)
Helper to create, fully fill out, and asynchronously send via async_sock_low_lvl_packet_send_or_close...
Error_code sock_categorize_data_to_established(Peer_socket::Ptr sock, boost::shared_ptr< const Data_packet > packet, bool *dupe, bool *slide, size_t *slide_size)
Helper for handle_data_to_established() that categorizes the DATA packet received as either illegal; ...
void async_sock_low_lvl_rst_send(Peer_socket::Ptr sock)
Sends an RST to the other side of the given socket asynchronously when possible.
void sock_set_state(Peer_socket::Ptr sock, Peer_socket::State state, Peer_socket::Open_sub_state open_sub_state=Peer_socket::Open_sub_state::S_CONNECTED)
Sets Peer_socket::m_state and Peer_socket::m_open_sub_state.
void receive_emptied_rcv_buf_while_disconnecting(Peer_socket::Ptr sock)
Placed by receive() onto W during a graceful close, after the Receive buffer had been emptied by the ...
void sock_disconnect_detected(Peer_socket::Ptr sock, const Error_code &disconnect_cause, bool close)
Records that thread W shows underlying connection is broken (graceful termination,...
size_t receive(Peer_socket::Ptr sock, const Function< size_t()> &rcv_buf_consume_func, Error_code *err_code)
Implementation of non-blocking sock->receive() for socket sock in all cases except when sock->state()...
void handle_connection_rexmit_timer_event(const Socket_id &socket_id, Peer_socket::Ptr sock)
Handles the triggering of the retransmit timer wait set up by setup_connection_timers(); it will re-s...
Node_options m_opts
This Node's global set of options.
void close_connection_immediately(const Socket_id &socket_id, Peer_socket::Ptr sock, const Error_code &err_code, bool defer_delta_check)
A thread W method that handles the transition of the given socket from OPEN (any sub-state) to CLOSED...
void sock_disconnect_completed(Peer_socket::Ptr sock)
While in S_OPEN+S_DISCONNECTING state (i.e., after beginning a graceful close with sock_disconnect_de...
Fine_duration compute_rtt_on_ack(Peer_socket::Sent_packet::Const_ptr flying_pkt, const Fine_time_pt &time_now, Ack_packet::Individual_ack::Const_ptr ack, const Peer_socket::Sent_packet::Sent_when **sent_when) const
Helper of perform_accumulated_on_recv_tasks() that computes the RTT implied by a given individual ack...
Peer_socket::Ptr connect_with_metadata(const Remote_endpoint &to, const boost::asio::const_buffer &serialized_metadata, Error_code *err_code=0, const Peer_socket_options *opts=0)
Same as connect() but sends, as part of the connection handshake, the user-supplied metadata,...
void new_round_trip_time_sample(Peer_socket::Ptr sock, Fine_duration round_trip_time)
Handles a just-computed new RTT (round trip time) measurement for an individual packet earlier sent: ...
bool ok_to_rexmit_or_close(Peer_socket::Ptr sock, const Peer_socket::Sent_pkt_ordered_by_when_iter &pkt_it, bool defer_delta_check)
Checks whether the given sent packet has been retransmitted the maximum number of allowed times; if s...
util::Task_engine m_task_engine
The main loop engine, functioning in the single-threaded-but-asynchronous callback-based "reactor" st...
Port_space m_ports
Flow port space for both client and server sockets. All threads may access this.
void rst_and_close_connection_immediately(const Socket_id &socket_id, Peer_socket::Ptr sock, const Error_code &err_code, bool defer_delta_check)
Asynchronously send RST to the other side of the given socket and close_connection_immediately().
void drop_timer_action(Peer_socket::Ptr sock, bool drop_all_packets)
Handles a Drop_timer (Peer_socket::m_snd_drop_timer) event in ESTABLISHED state by dropping the speci...
A class that keeps a Peer_socket_receive_stats data store, includes methods to conveniently accumulat...
void good_data_accepted_packet(size_t data)
Indicates good_data_packet(), and these data are not dropped (so either delivered into Receive buffer...
void good_data_dropped_reassembly_q_overflow_packet(size_t data)
Indicates good_data_packet(), but these data are dropped due to insufficient Receive reassembly queue...
void presumed_dropped_data(size_t data)
Indicates that one or more unreceived data packets have been considered Dropped due to the number of ...
void good_data_delivered_packet(size_t data)
Indicates good_data_accepted_packet(), and these data are delivered into Receive buffer (either immed...
void late_or_dupe_to_send_ack_packet(size_t data)
Indicates that late_or_dupe_data_packet() and therefore an individual acknowledgment for this packet ...
void total_data_packet(size_t data)
Indicates one DATA packet has been received on socket.
void good_to_send_ack_packet(size_t data)
Indicates that good_data_delivered_packet() and therefore an individual acknowledgment for this packe...
void good_data_packet(size_t data)
Indicates total_data_packet(), and these data are new and acceptable into Receive buffer assuming the...
void error_data_packet(size_t data)
Indicates total_data_packet(), but there is some error about the sequence numbers so that they are no...
void buffer_fed(size_t size)
Indicates the Receive buffer was enqueued with data from network (so its data_size() increased).
void good_data_first_qd_packet(size_t data)
Indicates good_data_accepted_packet(), and these data are, upon receipt, queued for reassembly (not i...
void good_data_dropped_buf_overflow_packet(size_t data)
Indicates good_data_packet(), but these data are dropped due to insufficient Receive buffer space.
void late_or_dupe_data_packet(size_t data)
Indicates total_data_packet(), but the arrived data have either already been received before or (more...
A peer (non-server) socket operating over the Flow network protocol, with optional stream-of-bytes an...
State m_state
See state().
size_t get_connect_metadata(const boost::asio::mutable_buffer &buffer, Error_code *err_code=0) const
Obtains the serialized connect metadata, as supplied by the user during the connection handshake.
size_t max_block_size_multiple(const size_t &opt_val_ref, const unsigned int *inflate_pct_val_ptr=0) const
Returns the smallest multiple of max_block_size() that is >= the given option value,...
bool sync_send_reactor_pattern_impl(const Fine_time_pt &wait_until, Error_code *err_code)
Helper similar to sync_send_impl() but for the null_buffers versions of sync_send().
std::map< Sequence_number, Sent_pkt_ordered_by_when_iter > Sent_pkt_by_seq_num_map
Short-hand for m_snd_flying_pkts_by_seq_num type; see that data member.
bool sync_receive_reactor_pattern_impl(const Fine_time_pt &wait_until, Error_code *err_code)
Helper similar to sync_receive_impl() but for the null_buffers versions of sync_receive().
Remote_endpoint m_remote_endpoint
See remote_endpoint(). Should be set before user gets access to *this and not changed afterwards.
size_t sync_receive(const Mutable_buffer_sequence &target, const boost::chrono::duration< Rep, Period > &max_wait, Error_code *err_code=0)
Blocking (synchronous) version of receive().
util::Blob m_serialized_metadata
If !m_active_connect, this contains the serialized metadata that the user supplied on the other side ...
size_t node_sync_send(const Function< size_t(size_t max_data_size)> &snd_buf_feed_func_or_empty, const Fine_time_pt &wait_until, Error_code *err_code)
This is to sync_send() as node_send() is to send().
Error_code m_disconnect_cause
The Error_code causing disconnection (if one has occurred or is occurring) on this socket; otherwise ...
Peer_socket(log::Logger *logger_ptr, util::Task_engine *task_engine, const Peer_socket_options &opts)
Constructs object; initializes most values to well-defined (0, empty, etc.) but not necessarily meani...
Sequence_number m_rcv_init_seq_num
The Initial Sequence Number (ISN) contained in the original Syn_packet or Syn_ack_packet we received.
const Remote_endpoint & remote_endpoint() const
Intended other side of the connection (regardless of success, failure, or current State).
State
State of a Peer_socket.
@ S_OPEN
Future reads or writes may be possible. A socket in this state may be Writable or Readable.
@ S_CLOSED
Neither future reads nor writes are possible, AND Node has disowned the Peer_socket.
Open_sub_state
The sub-state of a Peer_socket when state is State::S_OPEN.
@ S_CONNECTED
This Peer_socket was created through a passive connect (Node::accept() and the like) or an active con...
@ S_CONNECTING
This Peer_socket was created through an active connect (Node::connect() and the like),...
@ S_DISCONNECTING
This Peer_socket was created through a passive connect (Node::accept() and the like) or an active con...
size_t sync_send(const Const_buffer_sequence &data, const boost::chrono::duration< Rep, Period > &max_wait, Error_code *err_code=0)
Blocking (synchronous) version of send().
~Peer_socket() override
Boring virtual destructor. Note that deletion is to be handled exclusively via shared_ptr,...
Error_code disconnect_cause() const
The error code that perviously caused state() to become State::S_CLOSED, or success code if state is ...
Sequence_number::seq_num_t order_num_t
Short-hand for order number type. 0 is reserved. Caution: Keep in sync with Drop_timer::packet_id_t.
flow_port_t local_port() const
The local Flow-protocol port chosen by the Node (if active or passive open) or user (if passive open)...
flow_port_t m_local_port
See local_port(). Should be set before user gets access to *this and not changed afterwards.
friend class Send_bandwidth_estimator
Stats modules have const access to all socket internals.
bool set_options(const Peer_socket_options &opts, Error_code *err_code=0)
Dynamically replaces the current options set (options()) with the given options set.
size_t node_send(const Function< size_t(size_t max_data_size)> &snd_buf_feed_func, Error_code *err_code)
Non-template helper for template send() that forwards the send() logic to Node::send().
bool rexmit_on() const
Whether retransmission is enabled on this connection.
size_t node_sync_receive(const Function< size_t()> &rcv_buf_consume_func_or_empty, const Fine_time_pt &wait_until, Error_code *err_code)
This is to sync_receive() as node_receive() is to receive().
util::Lock_guard< Mutex > Lock_guard
Short-hand for RAII lock guard of Mutex.
Int_state
The state of the socket (and the connection from this end's point of view) for the internal state mac...
@ S_ESTABLISHED
Public state is OPEN+CONNECTED; in our opinion the connection is established.
@ S_SYN_SENT
Public state is OPEN+CONNECTING; user requested active connect; we sent SYN and are awaiting response...
@ S_CLOSED
Closed (dead or new) socket.
@ S_SYN_RCVD
Public state is OPEN+CONNECTING; other side requested passive connect via SYN; we sent SYN_ACK and ar...
util::Lock_guard< Options_mutex > Options_lock
Short-hand for lock that acquires exclusive access to an Options_mutex.
void close_abruptly(Error_code *err_code=0)
Acts as if fatal error error::Code::S_USER_CLOSED_ABRUPTLY has been discovered on the connection.
size_t max_block_size() const
The maximum number of bytes of user data per received or sent packet on this connection.
Node * node() const
Node that produced this Peer_socket.
Peer_socket_info info() const
Returns a structure containing the most up-to-date stats about this connection.
Recvd_pkt_map::iterator Recvd_pkt_iter
Short-hand for m_rcv_packets_with_gaps iterator type.
Mutex m_mutex
This object's mutex.
Sent_pkt_by_sent_when_map::iterator Sent_pkt_ordered_by_when_iter
Short-hand for m_snd_flying_pkts_by_sent_when iterator type.
Sent_pkt_by_seq_num_map::const_iterator Sent_pkt_ordered_by_seq_const_iter
Short-hand for m_snd_flying_pkts_by_seq_num const iterator type.
Peer_socket_info m_info_on_close
This is the final set of stats collected at the time the socket was moved to S_CLOSED m_state.
bool ensure_open(Error_code *err_code) const
Helper that is equivalent to Node::ensure_sock_open(this, err_code).
Sent_pkt_by_sent_when_map::const_iterator Sent_pkt_ordered_by_when_const_iter
Short-hand for m_snd_flying_pkts_by_sent_when const iterator type.
Opt_type opt(const Opt_type &opt_val_ref) const
Analogous to Node::opt() but for per-socket options.
std::string bytes_blocks_str(size_t bytes) const
Helper that, given a byte count, returns a string with that byte count and the number of max_block_si...
Peer_socket_options m_opts
This socket's per-socket set of options.
Peer_socket_options options() const
Copies this socket's option set and returns that copy.
Options_mutex m_opts_mutex
The mutex protecting m_opts.
std::map< Sequence_number, boost::shared_ptr< Received_packet > > Recvd_pkt_map
Short-hand for m_rcv_packets_with_gaps type; see that data member.
Recvd_pkt_map::const_iterator Recvd_pkt_const_iter
Short-hand for m_rcv_packets_with_gaps const iterator type.
Open_sub_state m_open_sub_state
See state().
size_t node_receive(const Function< size_t()> &rcv_buf_consume_func, Error_code *err_code)
Non-template helper for template receive() that forwards the receive() logic to Node::receive().
State state(Open_sub_state *open_sub_state=0) const
Current State of the socket.
void return_port(flow_port_t port, Error_code *err_code)
Return a previously reserved port (of any type).
An internal net_flow sequence number identifying a piece of data.
void set_metadata(char num_line_id=0, const Sequence_number &zero_point=Sequence_number(), seq_num_delta_t multiple_size=0)
Updates the full set of metadata (used at least for convenient convention-based logging but not actua...
uint64_t seq_num_t
Raw sequence number type.
Internal net_flow class that implements a socket buffer, as used by Peer_socket for Send and Receive ...
void consume_buf_move(util::Blob *target_buf, size_t max_data_size)
Consumes (removes from the front of the internal byte buffer and returns them to the caller) a byte s...
size_t data_size() const
The total number of bytes of application-layer data stored in this object.
Properties of various container types.
typename Value_list::const_reverse_iterator Const_reverse_iterator
Type for reverse iterator pointing into an immutable structure of this type.
typename Value_list::reverse_iterator Reverse_iterator
Type for reverse iterator pointing into a mutable structure of this type.
std::pair< Iterator, bool > insert(Value const &key_and_mapped)
Attempts to insert the given key/mapped-value pair into the map.
static Ptr ptr_cast(const From_ptr &ptr_to_cast)
Provides syntactic-sugary way to perform a static_pointer_cast<> from a compatible smart pointer type...
boost::shared_ptr< Peer_socket > Ptr
Short-hand for ref-counted pointer to mutable values of type Target_type::element_type (a-la T*).
Const_target_ptr Const_ptr
Short-hand for ref-counted pointer to immutable values of type Target_type::element_type (a-la T cons...
Similar to ostringstream but allows fast read-only access directly into the std::string being written...
#define FLOW_ERROR_SYS_ERROR_LOG_WARNING()
Logs a warning about the (often errno-based or from a library) error code in sys_err_code.
#define FLOW_ERROR_EXEC_AND_THROW_ON_ERROR(ARG_ret_type, ARG_method_name,...)
Narrow-use macro that implements the error code/exception semantics expected of most public-facing Fl...
#define FLOW_ERROR_LOG_ERROR(ARG_val)
Logs a warning about the given error code using FLOW_LOG_WARNING().
#define FLOW_ERROR_EMIT_ERROR(ARG_val)
Sets *err_code to ARG_val and logs a warning about the error using FLOW_LOG_WARNING().
#define FLOW_ERROR_EMIT_ERROR_LOG_INFO(ARG_val)
Identical to FLOW_ERROR_EMIT_ERROR(), but the message logged has flow::log::Sev::S_INFO severity inst...
#define FLOW_LOG_DATA(ARG_stream_fragment)
Logs a DATA message into flow::log::Logger *get_logger() with flow::log::Component get_log_component(...
#define FLOW_LOG_INFO(ARG_stream_fragment)
Logs an INFO message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
#define FLOW_LOG_WITHOUT_CHECKING(ARG_sev, ARG_stream_fragment)
Identical to FLOW_LOG_WITH_CHECKING() but foregoes the filter (Logger::should_log()) check.
#define FLOW_LOG_WARNING(ARG_stream_fragment)
Logs a WARNING message into flow::log::Logger *get_logger() with flow::log::Component get_log_compone...
#define FLOW_LOG_WITH_CHECKING(ARG_sev, ARG_stream_fragment)
Logs a message of the specified severity into flow::log::Logger *get_logger() with flow::log::Compone...
#define FLOW_LOG_TRACE_WITHOUT_CHECKING(ARG_stream_fragment)
Logs a TRACE message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
#define FLOW_LOG_DATA_WITHOUT_CHECKING(ARG_stream_fragment)
Logs a DATA message into flow::log::Logger *get_logger() with flow::log::Component get_log_component(...
#define FLOW_LOG_TRACE(ARG_stream_fragment)
Logs a TRACE message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
Synchronicity
Enumeration indicating the manner in which asio_exec_ctx_post(), and various boost....
void asio_exec_ctx_post(log::Logger *logger_ptr, Execution_context *exec_ctx, Synchronicity synchronicity, Task &&task)
An extension of boost.asio's post() and dispatch() free function templates, this free function templa...
bool exec_void_and_throw_on_error(const Func &func, Error_code *err_code, util::String_view context)
Equivalent of exec_and_throw_on_error() for operations with void return type.
@ S_DATA
Message satisfies Sev::S_TRACE description AND contains variable-length structure (like packet,...
@ S_TRACE
Message indicates any condition that may occur with great frequency (thus verbose if logged).
@ S_INFO
Message indicates a not-"bad" condition that is not frequent enough to be of severity Sev::S_TRACE.
@ S_CONN_TIMEOUT
Other side did not complete connection handshake within the allowed time; perhaps no one is listening...
@ S_USER_CLOSED_ABRUPTLY
User code on this side abruptly closed connection; other side may be informed of this.
@ S_CONN_RESET_TOO_MANY_REXMITS
Connection reset because a packet has been retransmitted too many times.
@ S_SEQ_NUM_IMPLIES_CONNECTION_COLLISION
Other side has sent packet with sequence number that implies a port collision between two connections...
@ S_SEQ_NUM_ARITHMETIC_FAILURE
Other side has sent packets with inconsistent sequence numbers.
@ S_CONN_METADATA_TOO_LARGE
During connection user supplied metadata that is too large.
@ S_CANNOT_CONNECT_TO_IP_ANY
Cannot ask to connect to "any" IP address. Use specific IP address.
@ S_WAIT_USER_TIMEOUT
A blocking (sync_) or background-blocking (async_) operation timed out versus user-supplied time limi...
@ S_WAIT_INTERRUPTED
A blocking (sync_) or background-blocking (async_) operation was interrupted, such as by a signal.
@ S_INTERNAL_ERROR_SYSTEM_ERROR_ASIO_TIMER
Internal error: System error: Something went wrong with boost.asio timer subsystem.
@ S_EVENT_SET_CLOSED
Attempted operation on an event set, when that event set was closed.
@ S_INTERNAL_ERROR_PORT_COLLISION
Internal error: Ephemeral port double reservation allowed.
@ S_NODE_NOT_RUNNING
Node not running.
Flow module containing the API and implementation of the Flow network protocol, a TCP-inspired stream...
uint16_t flow_port_t
Logical Flow port type (analogous to a UDP/TCP port in spirit but in no way relevant to UDP/TCP).
const flow_port_t S_PORT_ANY
Special Flow port value used to indicate "invalid port" or "please pick a random available ephemeral ...
std::ostream & operator<<(std::ostream &os, const Congestion_control_selector::Strategy_choice &strategy_choice)
Serializes a Peer_socket_options::Congestion_control_strategy_choice enum to a standard ostream – the...
bool key_exists(const Container &container, const typename Container::key_type &key)
Returns true if and only if the given key is present at least once in the given associative container...
Auto_cleanup setup_auto_cleanup(const Cleanup_func &func)
Provides a way to execute arbitrary (cleanup) code at the exit of the current block.
std::string buffers_dump_string(const Const_buffer_sequence &data, const std::string &indentation, size_t bytes_per_line)
Identical to buffers_to_ostream() but returns an std::string instead of writing to a given ostream.
bool subtract_with_floor(Minuend *minuend, const Subtrahend &subtrahend, const Minuend &floor)
Performs *minuend -= subtrahend, subject to a floor of floor.
Integer ceil_div(Integer dividend, Integer divisor)
Returns the result of the given non-negative integer divided by a positive integer,...
bool in_open_open_range(T const &min_val, T const &val, T const &max_val)
Returns true if and only if the given value is within the given range, given as a (low,...
Scheduled_task_handle schedule_task_from_now(log::Logger *logger_ptr, const Fine_duration &from_now, bool single_threaded, Task_engine *task_engine, Scheduled_task_handler &&task_body_moved)
Schedule the given function to execute in a certain amount of time: A handy wrapper around Timer (asi...
bool scheduled_task_fired(log::Logger *logger_ptr, Scheduled_task_const_handle task)
Returns whether a previously scheduled (by schedule_task_from_now() or similar) task has already fire...
bool in_open_closed_range(T const &min_val, T const &val, T const &max_val)
Returns true if and only if the given value is within the given range, given as a (low,...
void ostream_op_to_string(std::string *target_str, T const &... ostream_args)
Writes to the specified string, as if the given arguments were each passed, via << in sequence,...
Fine_duration scheduled_task_fires_from_now_or_canceled(log::Logger *logger_ptr, Scheduled_task_const_handle task)
Returns how long remains until a previously scheduled (by schedule_task_from_now() or similar) task f...
boost::shared_ptr< Scheduled_task_handle_state > Scheduled_task_handle
Black-box type that represents a handle to a scheduled task as scheduled by schedule_task_at() or sch...
bool in_closed_range(T const &min_val, T const &val, T const &max_val)
Returns true if and only if the given value is within the given range, inclusive.
boost::shared_ptr< void > Auto_cleanup
Helper type for setup_auto_cleanup().
bool scheduled_task_cancel(log::Logger *logger_ptr, Scheduled_task_handle task)
Attempts to prevent the execution of a previously scheduled (by schedule_task_from_now() or similar) ...
boost::asio::io_service Task_engine
Short-hand for boost.asio event service, the central class of boost.asio.
Blob_with_log_context<> Blob
A concrete Blob_with_log_context that compile-time-disables Basic_blob::share() and the sharing API d...
boost::system::error_code Error_code
Short-hand for a boost.system error code (which basically encapsulates an integer/enum error code and...
Flow_log_component
The flow::log::Component payload enumeration comprising various log components used by Flow's own int...
Fine_clock::duration Fine_duration
A high-res time duration as computed from two Fine_time_pts.
Fine_clock::time_point Fine_time_pt
A high-res time point as returned by Fine_clock::now() and suitable for precise time math in general.
unsigned char uint8_t
Byte. Best way to represent a byte of binary data. This is 8 bits on all modern systems.
Specifies the outgoing (pre-serialization) acknowledgment of a single received Data_packet,...
Equivalent of Individual_ack_rexmit_off but for sockets with retransmission enabled.
Specifies the incoming (post-deserialization) acknowledgment of a single received Data_packet.
boost::shared_ptr< const Individual_ack > Const_ptr
Short-hand for ref-counted pointer to immutable objects of this class.
uint64_t ack_delay_t
Type used to store the ACK delay for a given individual acknowledged packet.
Fine_duration Ack_delay_time_unit
Ack_delay_time_unit(1) is the duration corresponding to the ack_delay_t value 1; and proportionally f...
uint32_t rcv_wnd_t
Type used to store the size of m_rcv_wnd member in a couple of different packet types.
uint8_t rexmit_id_t
Type used to store the retransmission count in DATA and ACK packets.
The data nugget uniquely identifying a peer-to-peer connection from a remote endpoint to a port in th...
Metadata describing the data sent in the acknowledgment of an individual received packet.
boost::shared_ptr< const Individual_ack > Const_ptr
Short-hand for ref-counted pointer to immutable objects of this class.
boost::shared_ptr< Individual_ack > Ptr
Short-hand for ref-counted pointer to mutable objects of this class.
Metadata (and data, if retransmission is on) for a packet that has been received (and,...
const size_t m_size
Number of bytes in the Data_packet::m_data field of that packet.
Received_packet(log::Logger *logger_ptr, size_t size, util::Blob *src_data)
Constructs object by storing size of data and, if so instructed, the data themselves.
util::Blob m_data
Byte sequence equal to that of Data_packet::m_data of the packet.
Data store to keep timing related info when a packet is sent out.
const order_num_t m_order_num
Order number of the packet.
size_t m_sent_cwnd_bytes
The congestion window size (in bytes) that is used when the packet is sent out.
Fine_time_pt m_sent_time
The timestamp when the packet is sent out.
Metadata (and data, if retransmission is on) for a packet that has been sent one (if retransmission i...
Sent_packet(bool rexmit_on, boost::shared_ptr< Data_packet > packet, const Sent_when &sent_when)
Constructs object with the given values and m_acks_after_me at zero.
std::vector< Sent_when > m_sent_when
Time stamps, order numbers, and other info at the times when the different attempts (including origin...
const size_t m_size
Number of bytes in the Data_packet::m_data field of the sent packet.
const boost::shared_ptr< Data_packet > m_packet
If retransmission is on, this is the DATA packet itself that was sent; otherwise null.
uint16_t ack_count_t
Type used for m_acks_after_me.
ack_count_t m_acks_after_me
The number of times any packet with m_sent_when.back().m_order_num > this->m_sent_when....
A data store that keeps stats about the a Peer_socket connection.
Peer_socket_send_stats m_snd
Stats for outgoing direction of traffic. As opposed to the other m_snd_* members, this typically accu...
Node_options m_node_opts
Per-node options currently set on the socket's Node.
size_t m_low_lvl_max_buf_size
The UDP receive buffer maximum size, as reported by an appropriate call to the appropriate getsockopt...
size_t m_rcv_buf_size
The number of bytes in the internal Receive buffer.
size_t m_rcv_wnd_last_advertised
The last rcv_wnd (receive window) size sent to sender (not necessarily received; packets can be lost)...
Fine_duration m_snd_pacing_slice_period
In pacing, the duration of the current pacing time slice.
size_t m_rcv_reassembly_q_data_size
If rexmit_on is false then 0; otherwise the total DATA payload in the reassembly queue of the socket.
size_t m_snd_pacing_bytes_allowed_this_slice
This many bytes worth of DATA packets may still be sent, at this time, within the time slice defined ...
Peer_socket_options m_sock_opts
Per-socket options currently set on the socket.
size_t m_snd_buf_size
The number of bytes in the internal Send buffer.
size_t m_rcv_syn_rcvd_data_cumulative_size
Total size of DATA payload queued while waiting for SYN_ACK_ACK in SYN_RCVD state.
size_t m_rcv_syn_rcvd_data_q_size
Number of DATA packets queued while waiting for SYN_ACK_ACK in SYN_RCVD state.
std::string m_int_state_str
The internal state of the socket, rendered into string (e.g., "SYN_RECEIVED" or "ESTABLISHED").
Fine_time_pt m_snd_pacing_slice_start
In pacing, the time point marking the beginning of the current pacing time slice.
size_t m_snd_cong_ctl_in_flight_count
In congestion control, the current sent data packets that have been neither acknowledged nor consider...
size_t m_snd_cong_ctl_in_flight_bytes
In congestion control, the current sent data bytes that have been neither acknowledged nor considered...
double m_snd_est_bandwidth_mbit_per_sec
Estimate of the currently available (to this connection) outgoing bandwidth, in megabits per second.
size_t m_rcv_wnd
Receive window size = max Receive buffer space minus space taken. Infinity if flow control disabled.
size_t m_rcv_packets_with_gaps
Number of DATA packets tracked in structure tracking all valid received packets such at least one pac...
size_t m_snd_cong_ctl_wnd_bytes
In congestion control, the current congestion window (number of outgoing data bytes allowed In-flight...
Fine_duration m_snd_smoothed_round_trip_time
Estimated current round trip time of packets, computed as a smooth value over the past individual RTT...
Error_code m_disconnect_cause
If the socket is closing or closed, this is the reason for the closure; otherwise the default-constru...
size_t m_snd_cong_ctl_wnd_count_approx
In congestion control, the approximate equivalent of m_snd_cong_ctl_in_flight_bytes as a full packet ...
size_t m_snd_rcv_wnd
The receive window (rcv_wnd a/k/a free Receive buffer space) value of the peer socket on the other si...
bool m_is_active_connect
true if this is the "client" socket (connect()ed); false otherwise (accept()ed).
size_t m_snd_pacing_packet_q_size
In pacing, number of packets currently queued to be sent out by the pacing module.
Fine_duration m_snd_round_trip_time_variance
RTTVAR used for m_snd_smoothed_round_trip_time calculation; it is the current RTT variance.
Peer_socket_receive_stats m_rcv
Stats for incoming direction of traffic. As opposed to the other m_rcv_* members, this typically accu...
Fine_duration m_snd_drop_timeout
Drop Timeout: how long a given packet must remain unacknowledged to be considered dropped due to Drop...
A set of low-level options affecting a single Peer_socket.
Fine_duration m_st_init_drop_timeout
Once socket enters ESTABLISHED state, this is the value for Peer_socket::m_snd_drop_timeout until the...
unsigned int m_st_max_rexmissions_per_packet
If retransmission is enabled and a given packet is retransmitted this many times and has to be retran...
size_t m_st_rcv_buf_max_size
Maximum number of bytes that the Receive buffer can hold.
size_t m_st_cong_ctl_max_cong_wnd_blocks
The constant that determines the CWND limit in Congestion_control_classic_data::congestion_window_at_...
Fine_duration m_st_snd_bandwidth_est_sample_period_floor
When estimating the available send bandwidth, each sample must be compiled over at least this long of...
unsigned int m_st_cong_ctl_cong_avoidance_increment_blocks
The multiple of max-block-size by which to increment CWND in congestion avoidance mode after receivin...
size_t m_st_cong_ctl_cong_wnd_on_drop_timeout_blocks
On Drop Timeout, set congestion window to this value times max-block-size.
size_t m_st_cong_ctl_init_cong_wnd_blocks
The initial size of the congestion window, given in units of max-block-size-sized blocks.
bool m_st_rexmit_on
Whether to enable reliability via retransmission.
size_t m_st_snd_buf_max_size
Maximum number of bytes that the Send buffer can hold.
Fine_duration m_st_connect_retransmit_period
How often to resend SYN or SYN_ACK while SYN_ACK or SYN_ACK_ACK, respectively, has not been received.
Fine_duration m_dyn_rcv_wnd_recovery_timer_period
When the mode triggered by rcv-buf-max-size-to-advertise-percent being exceeded is in effect,...
Fine_duration m_st_connect_retransmit_timeout
How long from the first SYN or SYN_ACK to allow for connection handshake before aborting connection.
size_t m_st_max_full_blocks_before_ack_send
If there are at least this many TIMES max-block-size bytes' worth of individual acknowledgments to be...
Fine_duration m_st_delayed_ack_timer_period
The maximum amount of time to delay sending ACK with individual packet's acknowledgment since receivi...
unsigned int m_dyn_drop_timeout_backoff_factor
Whenever the Drop Timer fires, upon the requisite Dropping of packet(s), the DTO (Drop Timeout) is se...
size_t m_st_max_block_size
The size of block that we will strive to (and will, assuming at least that many bytes are available i...
unsigned int m_st_cong_ctl_classic_wnd_decay_percent
In classic congestion control, RFC 5681 specifies the window should be halved on loss; this option al...
unsigned int m_st_rcv_buf_max_size_to_advertise_percent
% of rcv-buf-max-size that has to be freed, since the last receive window advertisement,...
unsigned int m_st_rcv_max_packets_after_unrecvd_packet_ratio_percent
The limit on the size of Peer_socket::m_rcv_packets_with_gaps, expressed as what percentage the maxim...
Fine_duration m_dyn_drop_timeout_ceiling
Ceiling to impose on the Drop Timeout.
Represents the remote endpoint of a Flow-protocol connection; identifies the UDP endpoint of the remo...
util::Udp_endpoint m_udp_endpoint
UDP address (IP address/UDP port) where the Node identified by this endpoint bound its low-level UDP ...
#define FLOW_UTIL_WHERE_AM_I_STR()
Same as FLOW_UTIL_WHERE_AM_I() but evaluates to an std::string.