21#include <boost/move/make_unique.hpp> 
   32  if (!start_ops<Op::S_SND>(std::move(ev_wait_func)))
 
   42           && 
"Protocol_negotiator not properly marking the once-only sending-out of protocol version?");
 
   56    const Blob_const payload_blob(&fake_meta_length_raw, 
sizeof(fake_meta_length_raw));
 
   58    FLOW_LOG_TRACE(
"Socket stream [" << *
this << 
"]: Want to send protocol-negotiation info.  " 
   59                   "About to send payload 1 of 1; " 
   60                   "contains low-level blob of size [" << payload_blob.size() << 
"] " 
   61                   "located @ [" << payload_blob.data() << 
"].");
 
   73    FLOW_LOG_TRACE(
"Socket stream [" << *
this << 
"]: Wanted to send protocol-negotiation info; " 
   74                   "but we've marked it as already-sent, even though we are in start_*_ops() in PEER state.  " 
   75                   "Probably we come from a .release()d Native_socket_stream which has already done it; cool.");
 
   99  using flow::util::buffers_dump_string;
 
  100  using boost::chrono::round;
 
  101  using boost::chrono::milliseconds;
 
  103  FLOW_ERROR_EXEC_AND_THROW_ON_ERROR(
bool, 
send_native_handle, hndl_or_null, meta_blob, _1);
 
  108  if ((!op_started<Op::S_SND>(
"send_native_handle()")) || (!state_peer(
"send_native_handle()")))
 
  115  const size_t meta_size = meta_blob.size();
 
  116  assert(((!hndl_or_null.
null()) || (meta_size != 0))
 
  117         && 
"Native_socket_stream::send_blob() blob must have length 1+; " 
  118              "Native_socket_stream::send_native_handle() must have same or non-null hndl_or_null or both.");
 
  120  FLOW_LOG_TRACE(
"Socket stream [" << *
this << 
"]: Will send handle [" << hndl_or_null << 
"] with " 
  121                 "meta-blob of size [" << meta_blob.size() << 
"].");
 
  125    FLOW_LOG_DATA(
"Socket stream [" << *
this << 
"]: Meta-blob contents are " 
  126                  "[\n" << buffers_dump_string(meta_blob, 
"  ") << 
"].");
 
  146    FLOW_LOG_WARNING(
"Socket stream [" << *
this << 
"]: Send: User argument length [" << meta_blob.size() << 
"] " 
  150  else if (m_snd_pending_err_code) 
 
  159    FLOW_LOG_INFO(
"Socket stream [" << *
this << 
"]: An error was detected earlier and saved for any subsequent " 
  160                  "send attempts like this.  Will not proceed with send.  More info in WARNING below.");
 
  161    *err_code = m_snd_pending_err_code;
 
  174    const Blob_const meta_length_blob(&meta_length_raw, 
sizeof(meta_length_raw));
 
  179    const auto send_low_lvl_payload
 
  182      FLOW_LOG_TRACE(
"Socket stream [" << *
this << 
"]: Wanted to send handle [" << hndl_or_null << 
"] with " 
  183                     "meta-blob of size [" << meta_blob.size() << 
"].  " 
  184                     "About to send payload index [" << idx << 
"] of " 
  185                     "[" << ((meta_length_raw == 0) ? 1 : 2) << 
"] new low-level payloads; " 
  186                     "includes handle [" << payload_hndl << 
"] and " 
  187                     "low-level blob of size [" << payload_blob.size() << 
"] " 
  188                     "located @ [" << payload_blob.data() << 
"].");
 
  192      snd_sync_write_or_q_payload(payload_hndl, payload_blob, 
false);
 
  203    send_low_lvl_payload(1, hndl_or_null, meta_length_blob); 
 
  204    if ((meta_length_raw != 0) && (!m_snd_pending_err_code))
 
  209    *err_code = m_snd_pending_err_code; 
 
  214      FLOW_LOG_TRACE(
"Socket stream [" << *
this << 
"]: Wanted to send user message but detected error " 
  216                     "Error code details follow: [" << *err_code << 
"] [" << err_code->message() << 
"].  " 
  217                     "Saved error code to return in next user send attempt if any, after this attempt also " 
  218                     "returns that error code synchronously first.");
 
  220    else if (m_snd_auto_ping_period != Fine_duration::zero()) 
 
  226      const size_t n_canceled = m_snd_auto_ping_timer.expires_after(m_snd_auto_ping_period);
 
  228      FLOW_LOG_TRACE(
"Socket stream [" << *
this << 
"]: Send request from user; hence rescheduled " 
  229                     "auto-ping to occur in " 
  230                     "[" << round<milliseconds>(m_snd_auto_ping_period) << 
"] (will re-reschedule " 
  231                     "again upon any other outgoing traffic that might be requested before then).  As a result " 
  232                     "[" << n_canceled << 
"] previously scheduled auto-pings have been canceled; 1 is most likely; " 
  233                     "0 means an auto-ping is *just* about to fire (we lost the race -- which is fine).");
 
  240        m_timer_worker.timer_async_wait(&m_snd_auto_ping_timer, m_snd_auto_ping_timer_fired_peer);
 
  244        assert((n_canceled == 0) && 
"We only invoke one timer async_wait() at a time.");
 
  257    FLOW_LOG_WARNING(
"Socket stream [" << *
this << 
"]: Wanted to send handle [" << hndl_or_null << 
"] with " 
  258                     "meta-blob of size [" << meta_blob.size() << 
"], but an error (not necessarily new error) " 
  259                     "encountered on pipe or in user API args.  Error code details follow: " 
  260                     "[" << *err_code << 
"] [" << err_code->message() << 
"];  " 
  261                     "pipe hosed (sys/protocol error)? = " 
  264                     "sending disabled by user? = " 
  273  return async_end_sending_impl(
nullptr, flow::async::Task_asio_err());
 
  277                                                   flow::async::Task_asio_err&& on_done_func)
 
  281  const bool ok = async_end_sending_impl(&sync_err_code, std::move(on_done_func));
 
  290  if ((!sync_err_code_ptr) && sync_err_code)
 
  292    throw flow::error::Runtime_error(sync_err_code, 
"Native_socket_stream::Impl::async_end_sending()");
 
  295  sync_err_code_ptr && (*sync_err_code_ptr = sync_err_code);
 
  302                                                        flow::async::Task_asio_err&& on_done_func_or_empty)
 
  305  using flow::async::Task_asio_err;
 
  307  assert(
bool(sync_err_code_ptr_or_null) == (!on_done_func_or_empty.empty()));
 
  311  if ((!op_started<Op::S_SND>(
"async_end_sending()")) || (!state_peer(
"async_end_sending()")))
 
  354    FLOW_LOG_WARNING(
"Socket stream [" << *
this << 
"]: User wants to end sending, but we're in sends-finished " 
  355                     "state already.  Ignoring.");
 
  359  m_snd_finished = 
true; 
 
  362  if (m_snd_pending_err_code)
 
  369    assert(m_snd_pending_on_last_send_done_func_or_empty.empty());
 
  370    m_snd_pending_on_last_send_done_func_or_empty = std::move(on_done_func_or_empty);
 
  377    const Blob_const blob_with_0(&ZERO_SIZE_RAW, 
sizeof(ZERO_SIZE_RAW));
 
  385    qd = !snd_sync_write_or_q_payload(
Native_handle(), blob_with_0, 
false);
 
  386    if (qd && sync_err_code_ptr_or_null)
 
  391      assert(m_snd_pending_on_last_send_done_func_or_empty.empty());
 
  392      m_snd_pending_on_last_send_done_func_or_empty = std::move(on_done_func_or_empty);
 
  404  if (m_snd_pending_err_code)
 
  406    FLOW_LOG_WARNING(
"Socket stream [" << *
this << 
"]: User wanted to end sending, but an error (not necessarily " 
  407                     "new error) encountered on pipe synchronously when trying to send graceful-close.  " 
  408                     "Nevertheless locally sends-finished state is now active.  Will report completion via " 
  409                     "sync-args (if any).  Error code details follow: " 
  410                     "[" << m_snd_pending_err_code << 
"] [" << m_snd_pending_err_code.message() << 
"].");
 
  415    FLOW_LOG_INFO(
"Socket stream [" << *
this << 
"]: User wanted to end sending.  Success so far but out-queue " 
  416                  "has payloads -- at least the graceful-close payload -- still pending while waiting for " 
  417                  "writability.  Locally sends-finished state is now active, and the other side will be informed " 
  418                  "of this barring subsequent system errors.  " 
  419                  "We cannot report completion via sync-args (if any).");
 
  423    FLOW_LOG_INFO(
"Socket stream [" << *
this << 
"]: User wanted to end sending.  Immediate success: out-queue " 
  424                  "flushed permanently.  " 
  425                  "Locally sends-finished state is now active, and the other side will be informed of this.  " 
  426                  "Locally will report completion via sync-args (if any).");
 
  429  if (sync_err_code_ptr_or_null)
 
  432                                    : m_snd_pending_err_code; 
 
  444  using boost::chrono::round;
 
  445  using boost::chrono::milliseconds;
 
  447  if ((!op_started<Op::S_SND>(
"auto_ping()")) || (!state_peer(
"auto_ping()")))
 
  453  assert(period.count() > 0);
 
  457    FLOW_LOG_WARNING(
"Socket stream [" << *
this << 
"]: User wants to start auto-pings, but we're in " 
  458                     "sends-finished state already.  Ignoring.");
 
  470  if (m_snd_auto_ping_period != Fine_duration::zero())
 
  472    FLOW_LOG_WARNING(
"Socket stream [" << *
this << 
"]: User wants to start auto-pings, but this " 
  473                     "has already been engaged earlier.  Ignoring.");
 
  477  m_snd_auto_ping_period = period; 
 
  479  FLOW_LOG_INFO(
"Socket stream [" << *
this << 
"]: User wants to start auto-pings so that there are " 
  480                "outgoing messages at least as frequently as every " 
  481                "[" << round<milliseconds>(m_snd_auto_ping_period) << 
"].  Sending baseline auto-ping and scheduling " 
  482                "first subsequent auto-ping; it may be rescheduled if more user traffic occurs before then.");
 
  484  if (m_snd_pending_err_code)
 
  488    FLOW_LOG_WARNING(
"Socket stream [" << *
this << 
"]: User wanted to start auto-pings, but an error was " 
  489                     "previously encountered on pipe; so will not auto-ping.  " 
  490                     "Error code details follow: [" << m_snd_pending_err_code << 
"] " 
  491                     "[" << m_snd_pending_err_code.message() << 
"].");
 
  496  const Blob_const blob_with_ff(&S_META_BLOB_LENGTH_PING_SENTINEL, 
sizeof(S_META_BLOB_LENGTH_PING_SENTINEL));
 
  501  snd_sync_write_or_q_payload(
Native_handle(), blob_with_ff, 
true);
 
  502  if (m_snd_pending_err_code)
 
  504    FLOW_LOG_WARNING(
"Socket stream [" << *
this << 
"]: Wanted to send initial auto-ping but detected error " 
  506                     "Error code details follow: [" << m_snd_pending_err_code << 
"] " 
  507                     "[" << m_snd_pending_err_code.message() << 
"].  " 
  508                     "Saved error code to return in next user send attempt if any; otherwise ignoring; " 
  509                     "will not schedule periodic auto-pings.");
 
  524  m_snd_ev_wait_func(&m_snd_ev_wait_hndl_auto_ping_timer_fired_peer,
 
  527                     boost::make_shared<Task>
 
  528                       ([
this]() { snd_on_ev_auto_ping_now_timer_fired(); }));
 
  537  m_snd_auto_ping_timer.expires_after(m_snd_auto_ping_period);
 
  538  m_timer_worker.timer_async_wait(&m_snd_auto_ping_timer, m_snd_auto_ping_timer_fired_peer);
 
  550  m_timer_worker.consume_timer_firing_signal(m_snd_auto_ping_timer_fired_peer);
 
  554  if (m_snd_pending_err_code)
 
  558    FLOW_LOG_WARNING(
"Socket stream [" << *
this << 
"]: Auto-ping timer fired, but an error was " 
  559                     "previously encountered in 2-way pipe; so will neither auto-ping nor schedule next auto-ping.  " 
  560                     "Error code details follow: [" << m_snd_pending_err_code << 
"] " 
  561                     "[" << m_snd_pending_err_code.message() << 
"].");
 
  569    FLOW_LOG_TRACE(
"Socket stream [" << *
this << 
"]: " 
  570                   "Auto-ping timer fired; but graceful-close API earlier instructed us to no-op.  No-op.");
 
  576  FLOW_LOG_INFO(
"Socket stream [" << *
this << 
"]: " 
  577                "Auto-ping timer fired; sending/queueing auto-ping; scheduling for next time; it may be " 
  578                "rescheduled if more user traffic occurs before then.");
 
  582  const Blob_const blob_with_ff{&S_META_BLOB_LENGTH_PING_SENTINEL, 
sizeof(S_META_BLOB_LENGTH_PING_SENTINEL)};
 
  584  snd_sync_write_or_q_payload(
Native_handle(), blob_with_ff, 
true);
 
  585  if (m_snd_pending_err_code)
 
  587    FLOW_LOG_WARNING(
"Socket stream [" << *
this << 
"]: Wanted to send non-initial auto-ping but detected error " 
  589                     "Error code details follow: [" << m_snd_pending_err_code << 
"] " 
  590                     "[" << m_snd_pending_err_code.message() << 
"].  " 
  591                     "Saved error code to return in next user send attempt if any; otherwise ignoring; " 
  592                     "will not continue scheduling periodic auto-pings.");
 
  597  m_snd_ev_wait_func(&m_snd_ev_wait_hndl_auto_ping_timer_fired_peer,
 
  599                     boost::make_shared<Task>
 
  600                       ([
this]() { snd_on_ev_auto_ping_now_timer_fired(); }));
 
  601  m_snd_auto_ping_timer.expires_after(m_snd_auto_ping_period);
 
  602  m_timer_worker.timer_async_wait(&m_snd_auto_ping_timer, m_snd_auto_ping_timer_fired_peer);
 
  608  using flow::util::Blob;
 
  613  assert((!m_snd_pending_err_code) && 
"Pipe must not be pre-hosed by contract.");
 
  615  size_t n_sent_or_zero;
 
  616  if (m_snd_pending_payloads_q.empty())
 
  618    FLOW_LOG_TRACE(
"Socket stream [" << *
this << 
"]: Want to send low-level payload: " 
  619                   "handle [" << hndl_or_null << 
"] with blob of size [" << orig_blob.size() << 
"] " 
  620                   "located @ [" << orig_blob.data() << 
"]; no write is pending so proceeding immediately.  " 
  621                   "Will drop if all of it would-block? = [" << avoid_qing << 
"].");
 
  623    n_sent_or_zero = snd_nb_write_low_lvl_payload(hndl_or_null, orig_blob, &m_snd_pending_err_code);
 
  624    if (m_snd_pending_err_code) 
 
  626      assert(n_sent_or_zero == 0);
 
  631    if (n_sent_or_zero == orig_blob.size())
 
  659    assert(hndl_or_null.
null()
 
  660           && 
"Internal bug?  Do not ask to drop a payload with a native handle inside under any circumstances.");
 
  661    if (n_sent_or_zero == 0)
 
  665      const auto q_size = m_snd_pending_payloads_q.size();
 
  666      FLOW_LOG_INFO(
"Socket stream [" << *
this << 
"]: Wanted to send low-level payload: " 
  667                    "blob of size [" << orig_blob.size() << 
"] located @ [" << orig_blob.data() << 
"]; " 
  668                    "result was would-block for all of its bytes (either because blocked-queue was non-empty " 
  669                    "already, or it was empty, but all of payload's bytes would-block at this time).  " 
  670                    "Therefore dropping payload (done for auto-pings at least).  Out-queue size remains " 
  671                    "[" << q_size << 
"].");
 
  682    FLOW_LOG_INFO(
"Socket stream [" << *
this << 
"]: Wanted to send low-level payload: " 
  683                  "blob of size [" << orig_blob.size() << 
"] located @ [" << orig_blob.data() << 
"]; " 
  684                  "result was would-block for all but [" << n_sent_or_zero << 
"] of its bytes (blocked-queue " 
  685                  "was empty, so nb-send was attmpted, and some -- but not all -- of payload's bytes " 
  686                  "would-block at this time).  We cannot \"get back\" the sent bytes and thus are forced " 
  687                  "to queue the remaining ones (would have dropped payload if all the bytes would-block).");
 
  692  auto new_low_lvl_payload = boost::movelib::make_unique<Snd_low_lvl_payload>();
 
  693  if (n_sent_or_zero == 0)
 
  695    new_low_lvl_payload->m_hndl_or_null = hndl_or_null;
 
  699  new_low_lvl_payload->m_blob = Blob(
get_logger());
 
  706  const auto& new_blob = orig_blob + n_sent_or_zero;
 
  707  new_low_lvl_payload->m_blob.assign_copy(new_blob);
 
  709  FLOW_LOG_TRACE(
"Socket stream [" << *
this << 
"]: Want to send pending-from-would-block low-level payload: " 
  710                 "handle [" << new_low_lvl_payload->m_hndl_or_null << 
"] with " 
  711                 "blob of size [" << new_low_lvl_payload->m_blob.size() << 
"] " 
  712                 "located @ [" << new_blob.data() << 
"]; " 
  713                 "created blob copy @ [" << new_low_lvl_payload->m_blob.const_buffer().data() << 
"]; " 
  714                 "enqueued to out-queue which is now of size [" << (m_snd_pending_payloads_q.size() + 1) << 
"].");
 
  716  m_snd_pending_payloads_q.emplace(std::move(new_low_lvl_payload)); 
 
  718  if (m_snd_pending_payloads_q.size() == 1)
 
  726    snd_async_write_q_head_payload();
 
  732    return bool(m_snd_pending_err_code);
 
  735  assert(!m_snd_pending_err_code);
 
  742  using flow::util::Lock_guard;
 
  751  size_t n_sent_or_zero = 0;
 
  755    Lock_guard<
decltype(m_peer_socket_mutex)> peer_socket_lock(m_peer_socket_mutex);
 
  759      if (hndl_or_null.
null())
 
  769        if (!m_peer_socket->non_blocking()) 
 
  771          FLOW_LOG_TRACE(
"Socket stream [" << *
this << 
"]: Setting boost.asio peer socket non-blocking mode.");
 
  772          m_peer_socket->non_blocking(
true, *err_code); 
 
  781          assert(m_peer_socket->non_blocking());
 
  783          FLOW_LOG_TRACE(
"Writing low-level blob directly via boost.asio (blob details logged above hopefully).");
 
  785          n_sent_or_zero = m_peer_socket->write_some(blob, *err_code);
 
  795                                                          hndl_or_null, blob, err_code);
 
  802      assert(((!*err_code) && (n_sent_or_zero != 0))
 
  803             || (*err_code && (n_sent_or_zero == 0)));
 
  804      if (*err_code == boost::asio::error::would_block)
 
  816        assert((!m_peer_socket_hosed) && 
"m_peer_socket_hosed must start as null and only become non-null once.  Bug?");
 
  817        m_peer_socket_hosed = std::move(m_peer_socket);
 
  818        assert((!m_peer_socket) && 
"Shocking unique_ptr misbehavior!");
 
  831         || (n_sent_or_zero == 0)); 
 
  835    FLOW_LOG_TRACE(
"Sent nothing due to error [" << *err_code << 
"] [" << err_code->message() << 
"].");
 
  839    FLOW_LOG_TRACE(
"Send: no error.  Was able to send [" << n_sent_or_zero << 
"] of [" << blob.size() << 
"] bytes.");
 
  840    if (!hndl_or_null.
null())
 
  842      FLOW_LOG_TRACE(
"Able to send the native handle? = [" << (n_sent_or_zero != 0) << 
"].");
 
  846  return n_sent_or_zero;
 
  853  using flow::util::Lock_guard;
 
  854  using boost::asio::async_write;
 
  858  assert((!m_snd_pending_payloads_q.empty()) && 
"Contract is stuff is queued to be async-sent.  Bug?");
 
  859  assert((!m_snd_pending_err_code) && 
"Pipe must not be pre-hosed by contract.");
 
  868    Lock_guard<
decltype(m_peer_socket_mutex)> peer_socket_lock(m_peer_socket_mutex);
 
  884      m_snd_ev_wait_func(&m_ev_wait_hndl_peer_socket,
 
  887                         boost::make_shared<Task>
 
  888                           ([
this]() { snd_on_ev_peer_socket_writable_or_error(); }));
 
  905  FLOW_LOG_TRACE(
"Socket stream [" << *
this << 
"]: User-performed wait-for-writable finished (writable or error, " 
  906                 "we do not know which yet).  We endeavour to send->pop->send->... as much of the queue as we " 
  907                 "can until would-block or total success.");
 
  909  assert((!m_snd_pending_payloads_q.empty()) && 
"Send-queue should not be touched while async-write of head is going.");
 
  910  assert((!m_snd_pending_err_code) && 
"Send error would only be detected by us.  Bug?");
 
  913  bool would_block = 
false;
 
  916    auto& low_lvl_payload = *m_snd_pending_payloads_q.front();
 
  917    auto& hndl_or_null = low_lvl_payload.m_hndl_or_null;
 
  918    auto& low_lvl_blob = low_lvl_payload.m_blob;
 
  919    auto low_lvl_blob_view = low_lvl_blob.const_buffer();
 
  921    FLOW_LOG_TRACE(
"Socket stream [" << *
this << 
"]: " 
  922                   "Out-queue size is [" << m_snd_pending_payloads_q.size() << 
"]; " 
  923                   "want to send handle [" << hndl_or_null << 
"] with " 
  924                   "low-level blob of size [" << low_lvl_blob_view.size() << 
"] " 
  925                   "located @ [" << low_lvl_blob_view.data() << 
"].");
 
  927    const auto n_sent_or_zero
 
  928      = snd_nb_write_low_lvl_payload(low_lvl_payload.m_hndl_or_null, low_lvl_blob_view, &m_snd_pending_err_code);
 
  929    if (m_snd_pending_err_code)
 
  935    if (n_sent_or_zero == low_lvl_blob_view.size())
 
  938      m_snd_pending_payloads_q.pop(); 
 
  945      if (n_sent_or_zero != 0)
 
  951        low_lvl_payload.m_blob.start_past_prefix_inc(n_sent_or_zero);
 
  957  while ((!m_snd_pending_payloads_q.empty()) && (!would_block) && (!m_snd_pending_err_code));
 
  960  if ((!m_snd_pending_err_code) && (!m_snd_pending_payloads_q.empty()))
 
  962    FLOW_LOG_TRACE(
"Out-queue has not been emptied.  Must keep async-send chain going.");
 
  965    snd_async_write_q_head_payload();
 
  979  bool invoke_on_done = 
false;
 
  980  if (m_snd_pending_err_code)
 
  982    invoke_on_done = !m_snd_pending_on_last_send_done_func_or_empty.empty();
 
  983    FLOW_LOG_WARNING(
"Socket stream [" << *
this << 
"]: User-performed wait-for-writable reported completion; " 
  984                     "wanted to nb-send any queued data and possibly initiated another wait-for-writable; " 
  985                     "got error during an nb-send or when initiating wait; TRACE details above.  " 
  986                     "Error code details follow: " 
  987                     "[" << m_snd_pending_err_code << 
"] [" << m_snd_pending_err_code.message() << 
"].  " 
  988                     "Saved error code to return in next user send attempt if any.  " 
  989                     "Will run graceful-sends-close completion handler? = ["  << invoke_on_done << 
"].");
 
  991    assert((!m_snd_pending_payloads_q.empty()) && 
"Opportunistic sanity check.");
 
  993  else if (m_snd_pending_payloads_q.empty()) 
 
  995    FLOW_LOG_TRACE(
"Out-queue has been emptied.");
 
  997    if (!m_snd_pending_on_last_send_done_func_or_empty.empty())
 
 1000      FLOW_LOG_INFO(
"Socket stream [" << *
this << 
"]: " 
 1001                    "We sent graceful-close and any preceding user messages with success.  Will now inform user via " 
 1002                    "graceful-sends-close completion handler.");
 
 1003      invoke_on_done = 
true;
 
 1010    FLOW_LOG_TRACE(
"Socket stream [" << *
this << 
"]: Executing end-sending completion handler now.");
 
 1011    auto on_done_func = std::move(m_snd_pending_on_last_send_done_func_or_empty);
 
 1012    m_snd_pending_on_last_send_done_func_or_empty.clear(); 
 
 1014    on_done_func(m_snd_pending_err_code);
 
 1015    FLOW_LOG_TRACE(
"Handler completed.");
 
proto_ver_t local_max_proto_ver_for_sending()
To be called at most once, this returns local_max_proto_ver from ctor the first time and S_VER_UNKNOW...
static constexpr proto_ver_t S_VER_UNKNOWN
A proto_ver_t value, namely a negative one, which is a reserved value indicating "unknown version"; i...
Error_code m_snd_pending_err_code
The first and only connection-hosing error condition detected when attempting to low-level-write on m...
bool async_end_sending(Error_code *sync_err_code, flow::async::Task_asio_err &&on_done_func)
See Native_socket_stream counterpart.
void snd_on_ev_peer_socket_writable_or_error()
Completion handler, from outside event loop via sync_io pattern, for the async-wait initiated by snd_...
Protocol_negotiator m_protocol_negotiator
Handles the protocol negotiation at the start of the pipe.
bool snd_sync_write_or_q_payload(Native_handle hndl_or_null, const util::Blob_const &orig_blob, bool avoid_qing)
Either synchronously sends hndl_or_null handle (if any) and orig_blob low-level blob over m_peer_sock...
void snd_on_ev_auto_ping_now_timer_fired()
Handler for the async-wait, via util::sync_io::Timer_event_emitter, of the auto-ping timer firing; if...
bool send_native_handle(Native_handle hndl_or_null, const util::Blob_const &meta_blob, Error_code *err_code)
See Native_socket_stream counterpart.
void snd_async_write_q_head_payload()
Initiates async-write over m_peer_socket of the low-level payload at the head of out-queue m_snd_pend...
size_t send_meta_blob_max_size() const
See Native_socket_stream counterpart.
bool async_end_sending_impl(Error_code *sync_err_code_ptr_or_null, flow::async::Task_asio_err &&on_done_func_or_empty)
*end_sending() body.
bool start_send_native_handle_ops(util::sync_io::Event_wait_func &&ev_wait_func)
See Native_socket_stream counterpart.
bool start_send_blob_ops(util::sync_io::Event_wait_func &&ev_wait_func)
See Native_socket_stream counterpart.
uint16_t low_lvl_payload_blob_length_t
The type used to encode the meta-blob length; this puts a cap on how long the meta-blobs can be.
bool send_blob(const util::Blob_const &blob, Error_code *err_code)
See Native_socket_stream counterpart.
bool auto_ping(util::Fine_duration period)
See Native_socket_stream counterpart.
bool end_sending()
See Native_socket_stream counterpart.
size_t snd_nb_write_low_lvl_payload(Native_handle hndl_or_null, const util::Blob_const &blob, Error_code *err_code)
Utility that sends non-empty blob, and (unless null) hndl_or_null associated with its 1st byte,...
size_t send_blob_max_size() const
See Native_socket_stream counterpart.
static const size_t & S_MAX_META_BLOB_LENGTH
The maximum length of a blob that can be sent by this protocol.
bool start_send_native_handle_ops(Event_wait_func_t &&ev_wait_func)
Implements Native_handle_sender API per contract.
size_t send_meta_blob_max_size() const
Implements Native_handle_sender API per contract.
bool send_native_handle(Native_handle hndl_or_null, const util::Blob_const &meta_blob, Error_code *err_code=0)
Implements Native_handle_sender API per contract.
flow::log::Logger * get_logger() const
Returns logger (possibly null).
size_t nb_write_some_with_native_handle(flow::log::Logger *logger_ptr, Peer_socket *peer_socket_ptr, Native_handle payload_hndl, const util::Blob_const &payload_blob, Error_code *err_code)
boost.asio extension similar to peer_socket->non_blocking(true); auto n = peer_socket->write_some(pay...
void async_write_with_native_handle(flow::log::Logger *logger_ptr, Peer_socket *peer_socket_ptr, Native_handle payload_hndl, const util::Blob_const &payload_blob, Task_err &&on_sent_or_error)
boost.asio extension similar to boost::asio::async_write(Peer_socket&, Blob_const,...
@ S_SENDS_FINISHED_CANNOT_SEND
Will not send message: local user already ended sending via API marking this.
@ S_SYNC_IO_WOULD_BLOCK
A sync_io operation could not immediately complete; it will complete contingent on active async-wait ...
@ S_INVALID_ARGUMENT
User called an API with 1 or more arguments against the API spec.
@ S_LOW_LVL_TRANSPORT_HOSED_CANNOT_SEND
Unable to send outgoing traffic: an earlier-reported, or at least logged, system error had hosed the ...
sync_io-pattern counterparts to async-I/O-pattern object types in parent namespace ipc::transport.
util::Native_handle Native_handle
Convenience alias for the commonly used type util::Native_handle.
Function< void(Asio_waitable_native_handle *hndl_of_interest, bool ev_of_interest_snd_else_rcv, Task_ptr &&on_active_ev_func)> Event_wait_func
In sync_io pattern, concrete type storing user-supplied function invoked by pattern-implementing ipc:...
flow::async::Task Task
Short-hand for polymorphic function (a-la std::function<>) that takes no arguments and returns nothin...
flow::Fine_duration Fine_duration
Short-hand for Flow's Fine_duration.
boost::asio::const_buffer Blob_const
Short-hand for an immutable blob somewhere in memory, stored as exactly a void const * and a size_t.
flow::Error_code Error_code
Short-hand for flow::Error_code which is very common.
A monolayer-thin wrapper around a native handle, a/k/a descriptor a/k/a FD.
bool null() const
Returns true if and only if m_native_handle equals S_NULL_HANDLE.