21#include <boost/move/make_unique.hpp> 
   32  if (!start_ops<Op::S_SND>(std::move(ev_wait_func)))
 
   42           && 
"Protocol_negotiator not properly marking the once-only sending-out of protocol version?");
 
   56    const Blob_const payload_blob(&fake_meta_length_raw, 
sizeof(fake_meta_length_raw));
 
   58    FLOW_LOG_TRACE(
"Socket stream [" << *
this << 
"]: Want to send protocol-negotiation info.  " 
   59                   "About to send payload 1 of 1; " 
   60                   "contains low-level blob of size [" << payload_blob.size() << 
"] " 
   61                   "located @ [" << payload_blob.data() << 
"].");
 
   73    FLOW_LOG_TRACE(
"Socket stream [" << *
this << 
"]: Wanted to send protocol-negotiation info; " 
   74                   "but we've marked it as already-sent, even though we are in start_*_ops() in PEER state.  " 
   75                   "Probably we come from a .release()d Native_socket_stream which has already done it; cool.");
 
   99  using flow::util::buffers_dump_string;
 
  100  using boost::chrono::round;
 
  101  using boost::chrono::milliseconds;
 
  104                                     hndl_or_null, flow::util::bind_ns::cref(meta_blob), _1);
 
  109  if ((!op_started<Op::S_SND>(
"send_native_handle()")) || (!state_peer(
"send_native_handle()")))
 
  116  const size_t meta_size = meta_blob.size();
 
  117  assert(((!hndl_or_null.
null()) || (meta_size != 0))
 
  118         && 
"Native_socket_stream::send_blob() blob must have length 1+; " 
  119              "Native_socket_stream::send_native_handle() must have same or non-null hndl_or_null or both.");
 
  121  FLOW_LOG_TRACE(
"Socket stream [" << *
this << 
"]: Will send handle [" << hndl_or_null << 
"] with " 
  122                 "meta-blob of size [" << meta_blob.size() << 
"].");
 
  126    FLOW_LOG_DATA(
"Socket stream [" << *
this << 
"]: Meta-blob contents are " 
  127                  "[\n" << buffers_dump_string(meta_blob, 
"  ") << 
"].");
 
  147    FLOW_LOG_WARNING(
"Socket stream [" << *
this << 
"]: Send: User argument length [" << meta_blob.size() << 
"] " 
  151  else if (m_snd_pending_err_code) 
 
  160    FLOW_LOG_INFO(
"Socket stream [" << *
this << 
"]: An error was detected earlier and saved for any subsequent " 
  161                  "send attempts like this.  Will not proceed with send.  More info in WARNING below.");
 
  162    *err_code = m_snd_pending_err_code;
 
  175    const Blob_const meta_length_blob(&meta_length_raw, 
sizeof(meta_length_raw));
 
  180    const auto send_low_lvl_payload
 
  183      FLOW_LOG_TRACE(
"Socket stream [" << *
this << 
"]: Wanted to send handle [" << hndl_or_null << 
"] with " 
  184                     "meta-blob of size [" << meta_blob.size() << 
"].  " 
  185                     "About to send payload index [" << idx << 
"] of " 
  186                     "[" << ((meta_length_raw == 0) ? 1 : 2) << 
"] new low-level payloads; " 
  187                     "includes handle [" << payload_hndl << 
"] and " 
  188                     "low-level blob of size [" << payload_blob.size() << 
"] " 
  189                     "located @ [" << payload_blob.data() << 
"].");
 
  193      snd_sync_write_or_q_payload(payload_hndl, payload_blob, 
false);
 
  204    send_low_lvl_payload(1, hndl_or_null, meta_length_blob); 
 
  205    if ((meta_length_raw != 0) && (!m_snd_pending_err_code))
 
  210    *err_code = m_snd_pending_err_code; 
 
  215      FLOW_LOG_TRACE(
"Socket stream [" << *
this << 
"]: Wanted to send user message but detected error " 
  217                     "Error code details follow: [" << *err_code << 
"] [" << err_code->message() << 
"].  " 
  218                     "Saved error code to return in next user send attempt if any, after this attempt also " 
  219                     "returns that error code synchronously first.");
 
  221    else if (m_snd_auto_ping_period != Fine_duration::zero()) 
 
  227      const size_t n_canceled = m_snd_auto_ping_timer.expires_after(m_snd_auto_ping_period);
 
  229      FLOW_LOG_TRACE(
"Socket stream [" << *
this << 
"]: Send request from user; hence rescheduled " 
  230                     "auto-ping to occur in " 
  231                     "[" << round<milliseconds>(m_snd_auto_ping_period) << 
"] (will re-reschedule " 
  232                     "again upon any other outgoing traffic that might be requested before then).  As a result " 
  233                     "[" << n_canceled << 
"] previously scheduled auto-pings have been canceled; 1 is most likely; " 
  234                     "0 means an auto-ping is *just* about to fire (we lost the race -- which is fine).");
 
  241        m_timer_worker.timer_async_wait(&m_snd_auto_ping_timer, m_snd_auto_ping_timer_fired_peer);
 
  245        assert((n_canceled == 0) && 
"We only invoke one timer async_wait() at a time.");
 
  258    FLOW_LOG_WARNING(
"Socket stream [" << *
this << 
"]: Wanted to send handle [" << hndl_or_null << 
"] with " 
  259                     "meta-blob of size [" << meta_blob.size() << 
"], but an error (not necessarily new error) " 
  260                     "encountered on pipe or in user API args.  Error code details follow: " 
  261                     "[" << *err_code << 
"] [" << err_code->message() << 
"];  " 
  262                     "pipe hosed (sys/protocol error)? = " 
  265                     "sending disabled by user? = " 
  274  return async_end_sending_impl(
nullptr, flow::async::Task_asio_err());
 
  278                                                   flow::async::Task_asio_err&& on_done_func)
 
  282  const bool ok = async_end_sending_impl(&sync_err_code, std::move(on_done_func));
 
  291  if ((!sync_err_code_ptr) && sync_err_code)
 
  293    throw flow::error::Runtime_error(sync_err_code, 
"Native_socket_stream::Impl::async_end_sending()");
 
  296  sync_err_code_ptr && (*sync_err_code_ptr = sync_err_code);
 
  303                                                        flow::async::Task_asio_err&& on_done_func_or_empty)
 
  306  using flow::async::Task_asio_err;
 
  308  assert(
bool(sync_err_code_ptr_or_null) == (!on_done_func_or_empty.empty()));
 
  312  if ((!op_started<Op::S_SND>(
"async_end_sending()")) || (!state_peer(
"async_end_sending()")))
 
  355    FLOW_LOG_WARNING(
"Socket stream [" << *
this << 
"]: User wants to end sending, but we're in sends-finished " 
  356                     "state already.  Ignoring.");
 
  360  m_snd_finished = 
true; 
 
  363  if (m_snd_pending_err_code)
 
  370    assert(m_snd_pending_on_last_send_done_func_or_empty.empty());
 
  371    m_snd_pending_on_last_send_done_func_or_empty = std::move(on_done_func_or_empty);
 
  378    const Blob_const blob_with_0(&ZERO_SIZE_RAW, 
sizeof(ZERO_SIZE_RAW));
 
  386    qd = !snd_sync_write_or_q_payload(
Native_handle(), blob_with_0, 
false);
 
  387    if (qd && sync_err_code_ptr_or_null)
 
  392      assert(m_snd_pending_on_last_send_done_func_or_empty.empty());
 
  393      m_snd_pending_on_last_send_done_func_or_empty = std::move(on_done_func_or_empty);
 
  405  if (m_snd_pending_err_code)
 
  407    FLOW_LOG_WARNING(
"Socket stream [" << *
this << 
"]: User wanted to end sending, but an error (not necessarily " 
  408                     "new error) encountered on pipe synchronously when trying to send graceful-close.  " 
  409                     "Nevertheless locally sends-finished state is now active.  Will report completion via " 
  410                     "sync-args (if any).  Error code details follow: " 
  411                     "[" << m_snd_pending_err_code << 
"] [" << m_snd_pending_err_code.message() << 
"].");
 
  416    FLOW_LOG_INFO(
"Socket stream [" << *
this << 
"]: User wanted to end sending.  Success so far but out-queue " 
  417                  "has payloads -- at least the graceful-close payload -- still pending while waiting for " 
  418                  "writability.  Locally sends-finished state is now active, and the other side will be informed " 
  419                  "of this barring subsequent system errors.  " 
  420                  "We cannot report completion via sync-args (if any).");
 
  424    FLOW_LOG_INFO(
"Socket stream [" << *
this << 
"]: User wanted to end sending.  Immediate success: out-queue " 
  425                  "flushed permanently.  " 
  426                  "Locally sends-finished state is now active, and the other side will be informed of this.  " 
  427                  "Locally will report completion via sync-args (if any).");
 
  430  if (sync_err_code_ptr_or_null)
 
  433                                    : m_snd_pending_err_code; 
 
  445  using boost::chrono::round;
 
  446  using boost::chrono::milliseconds;
 
  448  if ((!op_started<Op::S_SND>(
"auto_ping()")) || (!state_peer(
"auto_ping()")))
 
  454  assert(period.count() > 0);
 
  458    FLOW_LOG_WARNING(
"Socket stream [" << *
this << 
"]: User wants to start auto-pings, but we're in " 
  459                     "sends-finished state already.  Ignoring.");
 
  471  if (m_snd_auto_ping_period != Fine_duration::zero())
 
  473    FLOW_LOG_WARNING(
"Socket stream [" << *
this << 
"]: User wants to start auto-pings, but this " 
  474                     "has already been engaged earlier.  Ignoring.");
 
  478  m_snd_auto_ping_period = period; 
 
  480  FLOW_LOG_INFO(
"Socket stream [" << *
this << 
"]: User wants to start auto-pings so that there are " 
  481                "outgoing messages at least as frequently as every " 
  482                "[" << round<milliseconds>(m_snd_auto_ping_period) << 
"].  Sending baseline auto-ping and scheduling " 
  483                "first subsequent auto-ping; it may be rescheduled if more user traffic occurs before then.");
 
  485  if (m_snd_pending_err_code)
 
  489    FLOW_LOG_WARNING(
"Socket stream [" << *
this << 
"]: User wanted to start auto-pings, but an error was " 
  490                     "previously encountered on pipe; so will not auto-ping.  " 
  491                     "Error code details follow: [" << m_snd_pending_err_code << 
"] " 
  492                     "[" << m_snd_pending_err_code.message() << 
"].");
 
  497  const Blob_const blob_with_ff(&S_META_BLOB_LENGTH_PING_SENTINEL, 
sizeof(S_META_BLOB_LENGTH_PING_SENTINEL));
 
  502  snd_sync_write_or_q_payload(
Native_handle(), blob_with_ff, 
true);
 
  503  if (m_snd_pending_err_code)
 
  505    FLOW_LOG_WARNING(
"Socket stream [" << *
this << 
"]: Wanted to send initial auto-ping but detected error " 
  507                     "Error code details follow: [" << m_snd_pending_err_code << 
"] " 
  508                     "[" << m_snd_pending_err_code.message() << 
"].  " 
  509                     "Saved error code to return in next user send attempt if any; otherwise ignoring; " 
  510                     "will not schedule periodic auto-pings.");
 
  525  m_snd_ev_wait_func(&m_snd_ev_wait_hndl_auto_ping_timer_fired_peer,
 
  528                     boost::make_shared<Task>
 
  529                       ([
this]() { snd_on_ev_auto_ping_now_timer_fired(); }));
 
  538  m_snd_auto_ping_timer.expires_after(m_snd_auto_ping_period);
 
  539  m_timer_worker.timer_async_wait(&m_snd_auto_ping_timer, m_snd_auto_ping_timer_fired_peer);
 
  551  m_timer_worker.consume_timer_firing_signal(m_snd_auto_ping_timer_fired_peer);
 
  555  if (m_snd_pending_err_code)
 
  559    FLOW_LOG_WARNING(
"Socket stream [" << *
this << 
"]: Auto-ping timer fired, but an error was " 
  560                     "previously encountered in 2-way pipe; so will neither auto-ping nor schedule next auto-ping.  " 
  561                     "Error code details follow: [" << m_snd_pending_err_code << 
"] " 
  562                     "[" << m_snd_pending_err_code.message() << 
"].");
 
  570    FLOW_LOG_TRACE(
"Socket stream [" << *
this << 
"]: " 
  571                   "Auto-ping timer fired; but graceful-close API earlier instructed us to no-op.  No-op.");
 
  577  FLOW_LOG_INFO(
"Socket stream [" << *
this << 
"]: " 
  578                "Auto-ping timer fired; sending/queueing auto-ping; scheduling for next time; it may be " 
  579                "rescheduled if more user traffic occurs before then.");
 
  583  const Blob_const blob_with_ff{&S_META_BLOB_LENGTH_PING_SENTINEL, 
sizeof(S_META_BLOB_LENGTH_PING_SENTINEL)};
 
  585  snd_sync_write_or_q_payload(
Native_handle(), blob_with_ff, 
true);
 
  586  if (m_snd_pending_err_code)
 
  588    FLOW_LOG_WARNING(
"Socket stream [" << *
this << 
"]: Wanted to send non-initial auto-ping but detected error " 
  590                     "Error code details follow: [" << m_snd_pending_err_code << 
"] " 
  591                     "[" << m_snd_pending_err_code.message() << 
"].  " 
  592                     "Saved error code to return in next user send attempt if any; otherwise ignoring; " 
  593                     "will not continue scheduling periodic auto-pings.");
 
  598  m_snd_ev_wait_func(&m_snd_ev_wait_hndl_auto_ping_timer_fired_peer,
 
  600                     boost::make_shared<Task>
 
  601                       ([
this]() { snd_on_ev_auto_ping_now_timer_fired(); }));
 
  602  m_snd_auto_ping_timer.expires_after(m_snd_auto_ping_period);
 
  603  m_timer_worker.timer_async_wait(&m_snd_auto_ping_timer, m_snd_auto_ping_timer_fired_peer);
 
  609  using flow::util::Blob;
 
  614  assert((!m_snd_pending_err_code) && 
"Pipe must not be pre-hosed by contract.");
 
  616  size_t n_sent_or_zero;
 
  617  if (m_snd_pending_payloads_q.empty())
 
  619    FLOW_LOG_TRACE(
"Socket stream [" << *
this << 
"]: Want to send low-level payload: " 
  620                   "handle [" << hndl_or_null << 
"] with blob of size [" << orig_blob.size() << 
"] " 
  621                   "located @ [" << orig_blob.data() << 
"]; no write is pending so proceeding immediately.  " 
  622                   "Will drop if all of it would-block? = [" << avoid_qing << 
"].");
 
  624    n_sent_or_zero = snd_nb_write_low_lvl_payload(hndl_or_null, orig_blob, &m_snd_pending_err_code);
 
  625    if (m_snd_pending_err_code) 
 
  627      assert(n_sent_or_zero == 0);
 
  632    if (n_sent_or_zero == orig_blob.size())
 
  660    assert(hndl_or_null.
null()
 
  661           && 
"Internal bug?  Do not ask to drop a payload with a native handle inside under any circumstances.");
 
  662    if (n_sent_or_zero == 0)
 
  666      const auto q_size = m_snd_pending_payloads_q.size();
 
  667      FLOW_LOG_INFO(
"Socket stream [" << *
this << 
"]: Wanted to send low-level payload: " 
  668                    "blob of size [" << orig_blob.size() << 
"] located @ [" << orig_blob.data() << 
"]; " 
  669                    "result was would-block for all of its bytes (either because blocked-queue was non-empty " 
  670                    "already, or it was empty, but all of payload's bytes would-block at this time).  " 
  671                    "Therefore dropping payload (done for auto-pings at least).  Out-queue size remains " 
  672                    "[" << q_size << 
"].");
 
  683    FLOW_LOG_INFO(
"Socket stream [" << *
this << 
"]: Wanted to send low-level payload: " 
  684                  "blob of size [" << orig_blob.size() << 
"] located @ [" << orig_blob.data() << 
"]; " 
  685                  "result was would-block for all but [" << n_sent_or_zero << 
"] of its bytes (blocked-queue " 
  686                  "was empty, so nb-send was attmpted, and some -- but not all -- of payload's bytes " 
  687                  "would-block at this time).  We cannot \"get back\" the sent bytes and thus are forced " 
  688                  "to queue the remaining ones (would have dropped payload if all the bytes would-block).");
 
  693  auto new_low_lvl_payload = boost::movelib::make_unique<Snd_low_lvl_payload>();
 
  694  if (n_sent_or_zero == 0)
 
  696    new_low_lvl_payload->m_hndl_or_null = hndl_or_null;
 
  700  new_low_lvl_payload->m_blob = Blob(
get_logger());
 
  707  const auto& new_blob = orig_blob + n_sent_or_zero;
 
  708  new_low_lvl_payload->m_blob.assign_copy(new_blob);
 
  710  FLOW_LOG_TRACE(
"Socket stream [" << *
this << 
"]: Want to send pending-from-would-block low-level payload: " 
  711                 "handle [" << new_low_lvl_payload->m_hndl_or_null << 
"] with " 
  712                 "blob of size [" << new_low_lvl_payload->m_blob.size() << 
"] " 
  713                 "located @ [" << new_blob.data() << 
"]; " 
  714                 "created blob copy @ [" << new_low_lvl_payload->m_blob.const_buffer().data() << 
"]; " 
  715                 "enqueued to out-queue which is now of size [" << (m_snd_pending_payloads_q.size() + 1) << 
"].");
 
  717  m_snd_pending_payloads_q.emplace(std::move(new_low_lvl_payload)); 
 
  719  if (m_snd_pending_payloads_q.size() == 1)
 
  727    snd_async_write_q_head_payload();
 
  733    return bool(m_snd_pending_err_code);
 
  736  assert(!m_snd_pending_err_code);
 
  743  using flow::util::Lock_guard;
 
  752  size_t n_sent_or_zero = 0;
 
  756    Lock_guard<
decltype(m_peer_socket_mutex)> peer_socket_lock(m_peer_socket_mutex);
 
  760      if (hndl_or_null.
null())
 
  770        if (!m_peer_socket->non_blocking()) 
 
  772          FLOW_LOG_TRACE(
"Socket stream [" << *
this << 
"]: Setting boost.asio peer socket non-blocking mode.");
 
  773          m_peer_socket->non_blocking(
true, *err_code); 
 
  782          assert(m_peer_socket->non_blocking());
 
  784          FLOW_LOG_TRACE(
"Writing low-level blob directly via boost.asio (blob details logged above hopefully).");
 
  786          n_sent_or_zero = m_peer_socket->write_some(blob, *err_code);
 
  796                                                          hndl_or_null, blob, err_code);
 
  803      assert(((!*err_code) && (n_sent_or_zero != 0))
 
  804             || (*err_code && (n_sent_or_zero == 0)));
 
  805      if (*err_code == boost::asio::error::would_block)
 
  815        m_peer_socket.reset();
 
  828         || (n_sent_or_zero == 0)); 
 
  832    FLOW_LOG_TRACE(
"Sent nothing due to error [" << *err_code << 
"] [" << err_code->message() << 
"].");
 
  836    FLOW_LOG_TRACE(
"Send: no error.  Was able to send [" << n_sent_or_zero << 
"] of [" << blob.size() << 
"] bytes.");
 
  837    if (!hndl_or_null.
null())
 
  839      FLOW_LOG_TRACE(
"Able to send the native handle? = [" << (n_sent_or_zero != 0) << 
"].");
 
  843  return n_sent_or_zero;
 
  850  using flow::util::Lock_guard;
 
  851  using boost::asio::async_write;
 
  855  assert((!m_snd_pending_payloads_q.empty()) && 
"Contract is stuff is queued to be async-sent.  Bug?");
 
  856  assert((!m_snd_pending_err_code) && 
"Pipe must not be pre-hosed by contract.");
 
  865    Lock_guard<
decltype(m_peer_socket_mutex)> peer_socket_lock(m_peer_socket_mutex);
 
  881      m_snd_ev_wait_func(&m_ev_wait_hndl_peer_socket,
 
  884                         boost::make_shared<Task>
 
  885                           ([
this]() { snd_on_ev_peer_socket_writable_or_error(); }));
 
  902  FLOW_LOG_TRACE(
"Socket stream [" << *
this << 
"]: User-performed wait-for-writable finished (writable or error, " 
  903                 "we do not know which yet).  We endeavour to send->pop->send->... as much of the queue as we " 
  904                 "can until would-block or total success.");
 
  906  assert((!m_snd_pending_payloads_q.empty()) && 
"Send-queue should not be touched while async-write of head is going.");
 
  907  assert((!m_snd_pending_err_code) && 
"Send error would only be detected by us.  Bug?");
 
  910  bool would_block = 
false;
 
  913    auto& low_lvl_payload = *m_snd_pending_payloads_q.front();
 
  914    auto& hndl_or_null = low_lvl_payload.m_hndl_or_null;
 
  915    auto& low_lvl_blob = low_lvl_payload.m_blob;
 
  916    auto low_lvl_blob_view = low_lvl_blob.const_buffer();
 
  918    FLOW_LOG_TRACE(
"Socket stream [" << *
this << 
"]: " 
  919                   "Out-queue size is [" << m_snd_pending_payloads_q.size() << 
"]; " 
  920                   "want to send handle [" << hndl_or_null << 
"] with " 
  921                   "low-level blob of size [" << low_lvl_blob_view.size() << 
"] " 
  922                   "located @ [" << low_lvl_blob_view.data() << 
"].");
 
  924    const auto n_sent_or_zero
 
  925      = snd_nb_write_low_lvl_payload(low_lvl_payload.m_hndl_or_null, low_lvl_blob_view, &m_snd_pending_err_code);
 
  926    if (m_snd_pending_err_code)
 
  932    if (n_sent_or_zero == low_lvl_blob_view.size())
 
  935      m_snd_pending_payloads_q.pop(); 
 
  942      if (n_sent_or_zero != 0)
 
  948        low_lvl_payload.m_blob.start_past_prefix_inc(n_sent_or_zero);
 
  954  while ((!m_snd_pending_payloads_q.empty()) && (!would_block) && (!m_snd_pending_err_code));
 
  957  if ((!m_snd_pending_err_code) && (!m_snd_pending_payloads_q.empty()))
 
  959    FLOW_LOG_TRACE(
"Out-queue has not been emptied.  Must keep async-send chain going.");
 
  962    snd_async_write_q_head_payload();
 
  976  bool invoke_on_done = 
false;
 
  977  if (m_snd_pending_err_code)
 
  979    invoke_on_done = !m_snd_pending_on_last_send_done_func_or_empty.empty();
 
  980    FLOW_LOG_WARNING(
"Socket stream [" << *
this << 
"]: User-performed wait-for-writable reported completion; " 
  981                     "wanted to nb-send any queued data and possibly initiated another wait-for-writable; " 
  982                     "got error during an nb-send or when initiating wait; TRACE details above.  " 
  983                     "Error code details follow: " 
  984                     "[" << m_snd_pending_err_code << 
"] [" << m_snd_pending_err_code.message() << 
"].  " 
  985                     "Saved error code to return in next user send attempt if any.  " 
  986                     "Will run graceful-sends-close completion handler? = ["  << invoke_on_done << 
"].");
 
  988    assert((!m_snd_pending_payloads_q.empty()) && 
"Opportunistic sanity check.");
 
  990  else if (m_snd_pending_payloads_q.empty()) 
 
  992    FLOW_LOG_TRACE(
"Out-queue has been emptied.");
 
  994    if (!m_snd_pending_on_last_send_done_func_or_empty.empty())
 
  997      FLOW_LOG_INFO(
"Socket stream [" << *
this << 
"]: " 
  998                    "We sent graceful-close and any preceding user messages with success.  Will now inform user via " 
  999                    "graceful-sends-close completion handler.");
 
 1000      invoke_on_done = 
true;
 
 1007    FLOW_LOG_TRACE(
"Socket stream [" << *
this << 
"]: Executing end-sending completion handler now.");
 
 1008    auto on_done_func = std::move(m_snd_pending_on_last_send_done_func_or_empty);
 
 1009    m_snd_pending_on_last_send_done_func_or_empty.clear(); 
 
 1011    on_done_func(m_snd_pending_err_code);
 
 1012    FLOW_LOG_TRACE(
"Handler completed.");
 
proto_ver_t local_max_proto_ver_for_sending()
To be called at most once, this returns local_max_proto_ver from ctor the first time and S_VER_UNKNOW...
static constexpr proto_ver_t S_VER_UNKNOWN
A proto_ver_t value, namely a negative one, which is a reserved value indicating "unknown version"; i...
Error_code m_snd_pending_err_code
The first and only connection-hosing error condition detected when attempting to low-level-write on m...
bool async_end_sending(Error_code *sync_err_code, flow::async::Task_asio_err &&on_done_func)
See Native_socket_stream counterpart.
void snd_on_ev_peer_socket_writable_or_error()
Completion handler, from outside event loop via sync_io pattern, for the async-wait initiated by snd_...
Protocol_negotiator m_protocol_negotiator
Handles the protocol negotiation at the start of the pipe.
bool snd_sync_write_or_q_payload(Native_handle hndl_or_null, const util::Blob_const &orig_blob, bool avoid_qing)
Either synchronously sends hndl_or_null handle (if any) and orig_blob low-level blob over m_peer_sock...
void snd_on_ev_auto_ping_now_timer_fired()
Handler for the async-wait, via util::sync_io::Timer_event_emitter, of the auto-ping timer firing; if...
bool send_native_handle(Native_handle hndl_or_null, const util::Blob_const &meta_blob, Error_code *err_code)
See Native_socket_stream counterpart.
void snd_async_write_q_head_payload()
Initiates async-write over m_peer_socket of the low-level payload at the head of out-queue m_snd_pend...
size_t send_meta_blob_max_size() const
See Native_socket_stream counterpart.
bool async_end_sending_impl(Error_code *sync_err_code_ptr_or_null, flow::async::Task_asio_err &&on_done_func_or_empty)
*end_sending() body.
bool start_send_native_handle_ops(util::sync_io::Event_wait_func &&ev_wait_func)
See Native_socket_stream counterpart.
bool start_send_blob_ops(util::sync_io::Event_wait_func &&ev_wait_func)
See Native_socket_stream counterpart.
uint16_t low_lvl_payload_blob_length_t
The type used to encode the meta-blob length; this puts a cap on how long the meta-blobs can be.
bool send_blob(const util::Blob_const &blob, Error_code *err_code)
See Native_socket_stream counterpart.
bool auto_ping(util::Fine_duration period)
See Native_socket_stream counterpart.
bool end_sending()
See Native_socket_stream counterpart.
size_t snd_nb_write_low_lvl_payload(Native_handle hndl_or_null, const util::Blob_const &blob, Error_code *err_code)
Utility that sends non-empty blob, and (unless null) hndl_or_null associated with its 1st byte,...
size_t send_blob_max_size() const
See Native_socket_stream counterpart.
static const size_t & S_MAX_META_BLOB_LENGTH
The maximum length of a blob that can be sent by this protocol.
bool start_send_native_handle_ops(Event_wait_func_t &&ev_wait_func)
Implements Native_handle_sender API per contract.
size_t send_meta_blob_max_size() const
Implements Native_handle_sender API per contract.
bool send_native_handle(Native_handle hndl_or_null, const util::Blob_const &meta_blob, Error_code *err_code=0)
Implements Native_handle_sender API per contract.
flow::log::Logger * get_logger() const
Returns logger (possibly null).
size_t nb_write_some_with_native_handle(flow::log::Logger *logger_ptr, Peer_socket *peer_socket_ptr, Native_handle payload_hndl, const util::Blob_const &payload_blob, Error_code *err_code)
boost.asio extension similar to peer_socket->non_blocking(true); auto n = peer_socket->write_some(pay...
void async_write_with_native_handle(flow::log::Logger *logger_ptr, Peer_socket *peer_socket_ptr, Native_handle payload_hndl, const util::Blob_const &payload_blob, Task_err &&on_sent_or_error)
boost.asio extension similar to boost::asio::async_write(Peer_socket&, Blob_const,...
@ S_SENDS_FINISHED_CANNOT_SEND
Will not send message: local user already ended sending via API marking this.
@ S_SYNC_IO_WOULD_BLOCK
A sync_io operation could not immediately complete; it will complete contingent on active async-wait ...
@ S_INVALID_ARGUMENT
User called an API with 1 or more arguments against the API spec.
@ S_LOW_LVL_TRANSPORT_HOSED_CANNOT_SEND
Unable to send outgoing traffic: an earlier-reported, or at least logged, system error had hosed the ...
sync_io-pattern counterparts to async-I/O-pattern object types in parent namespace ipc::transport.
util::Native_handle Native_handle
Convenience alias for the commonly used type util::Native_handle.
Function< void(Asio_waitable_native_handle *hndl_of_interest, bool ev_of_interest_snd_else_rcv, Task_ptr &&on_active_ev_func)> Event_wait_func
In sync_io pattern, concrete type storing user-supplied function invoked by pattern-implementing ipc:...
flow::async::Task Task
Short-hand for polymorphic function (a-la std::function<>) that takes no arguments and returns nothin...
flow::Fine_duration Fine_duration
Short-hand for Flow's Fine_duration.
boost::asio::const_buffer Blob_const
Short-hand for an immutable blob somewhere in memory, stored as exactly a void const * and a size_t.
flow::Error_code Error_code
Short-hand for flow::Error_code which is very common.
A monolayer-thin wrapper around a native handle, a/k/a descriptor a/k/a FD.
bool null() const
Returns true if and only if m_native_handle equals S_NULL_HANDLE.