21#include <boost/move/make_unique.hpp>
32 if (!start_ops<Op::S_SND>(std::move(ev_wait_func)))
42 &&
"Protocol_negotiator not properly marking the once-only sending-out of protocol version?");
56 const Blob_const payload_blob(&fake_meta_length_raw,
sizeof(fake_meta_length_raw));
58 FLOW_LOG_TRACE(
"Socket stream [" << *
this <<
"]: Want to send protocol-negotiation info. "
59 "About to send payload 1 of 1; "
60 "contains low-level blob of size [" << payload_blob.size() <<
"] "
61 "located @ [" << payload_blob.data() <<
"].");
73 FLOW_LOG_TRACE(
"Socket stream [" << *
this <<
"]: Wanted to send protocol-negotiation info; "
74 "but we've marked it as already-sent, even though we are in start_*_ops() in PEER state. "
75 "Probably we come from a .release()d Native_socket_stream which has already done it; cool.");
99 using flow::util::buffers_dump_string;
100 using boost::chrono::round;
101 using boost::chrono::milliseconds;
104 hndl_or_null, flow::util::bind_ns::cref(meta_blob), _1);
109 if ((!op_started<Op::S_SND>(
"send_native_handle()")) || (!state_peer(
"send_native_handle()")))
116 const size_t meta_size = meta_blob.size();
117 assert(((!hndl_or_null.
null()) || (meta_size != 0))
118 &&
"Native_socket_stream::send_blob() blob must have length 1+; "
119 "Native_socket_stream::send_native_handle() must have same or non-null hndl_or_null or both.");
121 FLOW_LOG_TRACE(
"Socket stream [" << *
this <<
"]: Will send handle [" << hndl_or_null <<
"] with "
122 "meta-blob of size [" << meta_blob.size() <<
"].");
126 FLOW_LOG_DATA(
"Socket stream [" << *
this <<
"]: Meta-blob contents are "
127 "[\n" << buffers_dump_string(meta_blob,
" ") <<
"].");
147 FLOW_LOG_WARNING(
"Socket stream [" << *
this <<
"]: Send: User argument length [" << meta_blob.size() <<
"] "
151 else if (m_snd_pending_err_code)
160 FLOW_LOG_INFO(
"Socket stream [" << *
this <<
"]: An error was detected earlier and saved for any subsequent "
161 "send attempts like this. Will not proceed with send. More info in WARNING below.");
162 *err_code = m_snd_pending_err_code;
175 const Blob_const meta_length_blob(&meta_length_raw,
sizeof(meta_length_raw));
180 const auto send_low_lvl_payload
183 FLOW_LOG_TRACE(
"Socket stream [" << *
this <<
"]: Wanted to send handle [" << hndl_or_null <<
"] with "
184 "meta-blob of size [" << meta_blob.size() <<
"]. "
185 "About to send payload index [" << idx <<
"] of "
186 "[" << ((meta_length_raw == 0) ? 1 : 2) <<
"] new low-level payloads; "
187 "includes handle [" << payload_hndl <<
"] and "
188 "low-level blob of size [" << payload_blob.size() <<
"] "
189 "located @ [" << payload_blob.data() <<
"].");
193 snd_sync_write_or_q_payload(payload_hndl, payload_blob,
false);
204 send_low_lvl_payload(1, hndl_or_null, meta_length_blob);
205 if ((meta_length_raw != 0) && (!m_snd_pending_err_code))
210 *err_code = m_snd_pending_err_code;
215 FLOW_LOG_TRACE(
"Socket stream [" << *
this <<
"]: Wanted to send user message but detected error "
217 "Error code details follow: [" << *err_code <<
"] [" << err_code->message() <<
"]. "
218 "Saved error code to return in next user send attempt if any, after this attempt also "
219 "returns that error code synchronously first.");
221 else if (m_snd_auto_ping_period != Fine_duration::zero())
227 const size_t n_canceled = m_snd_auto_ping_timer.expires_after(m_snd_auto_ping_period);
229 FLOW_LOG_TRACE(
"Socket stream [" << *
this <<
"]: Send request from user; hence rescheduled "
230 "auto-ping to occur in "
231 "[" << round<milliseconds>(m_snd_auto_ping_period) <<
"] (will re-reschedule "
232 "again upon any other outgoing traffic that might be requested before then). As a result "
233 "[" << n_canceled <<
"] previously scheduled auto-pings have been canceled; 1 is most likely; "
234 "0 means an auto-ping is *just* about to fire (we lost the race -- which is fine).");
241 m_timer_worker.timer_async_wait(&m_snd_auto_ping_timer, m_snd_auto_ping_timer_fired_peer);
245 assert((n_canceled == 0) &&
"We only invoke one timer async_wait() at a time.");
258 FLOW_LOG_WARNING(
"Socket stream [" << *
this <<
"]: Wanted to send handle [" << hndl_or_null <<
"] with "
259 "meta-blob of size [" << meta_blob.size() <<
"], but an error (not necessarily new error) "
260 "encountered on pipe or in user API args. Error code details follow: "
261 "[" << *err_code <<
"] [" << err_code->message() <<
"]; "
262 "pipe hosed (sys/protocol error)? = "
265 "sending disabled by user? = "
274 return async_end_sending_impl(
nullptr, flow::async::Task_asio_err());
278 flow::async::Task_asio_err&& on_done_func)
282 const bool ok = async_end_sending_impl(&sync_err_code, std::move(on_done_func));
291 if ((!sync_err_code_ptr) && sync_err_code)
293 throw flow::error::Runtime_error(sync_err_code,
"Native_socket_stream::Impl::async_end_sending()");
296 sync_err_code_ptr && (*sync_err_code_ptr = sync_err_code);
303 flow::async::Task_asio_err&& on_done_func_or_empty)
306 using flow::async::Task_asio_err;
308 assert(
bool(sync_err_code_ptr_or_null) == (!on_done_func_or_empty.empty()));
312 if ((!op_started<Op::S_SND>(
"async_end_sending()")) || (!state_peer(
"async_end_sending()")))
355 FLOW_LOG_WARNING(
"Socket stream [" << *
this <<
"]: User wants to end sending, but we're in sends-finished "
356 "state already. Ignoring.");
360 m_snd_finished =
true;
363 if (m_snd_pending_err_code)
370 assert(m_snd_pending_on_last_send_done_func_or_empty.empty());
371 m_snd_pending_on_last_send_done_func_or_empty = std::move(on_done_func_or_empty);
378 const Blob_const blob_with_0(&ZERO_SIZE_RAW,
sizeof(ZERO_SIZE_RAW));
386 qd = !snd_sync_write_or_q_payload(
Native_handle(), blob_with_0,
false);
387 if (qd && sync_err_code_ptr_or_null)
392 assert(m_snd_pending_on_last_send_done_func_or_empty.empty());
393 m_snd_pending_on_last_send_done_func_or_empty = std::move(on_done_func_or_empty);
405 if (m_snd_pending_err_code)
407 FLOW_LOG_WARNING(
"Socket stream [" << *
this <<
"]: User wanted to end sending, but an error (not necessarily "
408 "new error) encountered on pipe synchronously when trying to send graceful-close. "
409 "Nevertheless locally sends-finished state is now active. Will report completion via "
410 "sync-args (if any). Error code details follow: "
411 "[" << m_snd_pending_err_code <<
"] [" << m_snd_pending_err_code.message() <<
"].");
416 FLOW_LOG_INFO(
"Socket stream [" << *
this <<
"]: User wanted to end sending. Success so far but out-queue "
417 "has payloads -- at least the graceful-close payload -- still pending while waiting for "
418 "writability. Locally sends-finished state is now active, and the other side will be informed "
419 "of this barring subsequent system errors. "
420 "We cannot report completion via sync-args (if any).");
424 FLOW_LOG_INFO(
"Socket stream [" << *
this <<
"]: User wanted to end sending. Immediate success: out-queue "
425 "flushed permanently. "
426 "Locally sends-finished state is now active, and the other side will be informed of this. "
427 "Locally will report completion via sync-args (if any).");
430 if (sync_err_code_ptr_or_null)
433 : m_snd_pending_err_code;
445 using boost::chrono::round;
446 using boost::chrono::milliseconds;
448 if ((!op_started<Op::S_SND>(
"auto_ping()")) || (!state_peer(
"auto_ping()")))
454 assert(period.count() > 0);
458 FLOW_LOG_WARNING(
"Socket stream [" << *
this <<
"]: User wants to start auto-pings, but we're in "
459 "sends-finished state already. Ignoring.");
471 if (m_snd_auto_ping_period != Fine_duration::zero())
473 FLOW_LOG_WARNING(
"Socket stream [" << *
this <<
"]: User wants to start auto-pings, but this "
474 "has already been engaged earlier. Ignoring.");
478 m_snd_auto_ping_period = period;
480 FLOW_LOG_INFO(
"Socket stream [" << *
this <<
"]: User wants to start auto-pings so that there are "
481 "outgoing messages at least as frequently as every "
482 "[" << round<milliseconds>(m_snd_auto_ping_period) <<
"]. Sending baseline auto-ping and scheduling "
483 "first subsequent auto-ping; it may be rescheduled if more user traffic occurs before then.");
485 if (m_snd_pending_err_code)
489 FLOW_LOG_WARNING(
"Socket stream [" << *
this <<
"]: User wanted to start auto-pings, but an error was "
490 "previously encountered on pipe; so will not auto-ping. "
491 "Error code details follow: [" << m_snd_pending_err_code <<
"] "
492 "[" << m_snd_pending_err_code.message() <<
"].");
497 const Blob_const blob_with_ff(&S_META_BLOB_LENGTH_PING_SENTINEL,
sizeof(S_META_BLOB_LENGTH_PING_SENTINEL));
502 snd_sync_write_or_q_payload(
Native_handle(), blob_with_ff,
true);
503 if (m_snd_pending_err_code)
505 FLOW_LOG_WARNING(
"Socket stream [" << *
this <<
"]: Wanted to send initial auto-ping but detected error "
507 "Error code details follow: [" << m_snd_pending_err_code <<
"] "
508 "[" << m_snd_pending_err_code.message() <<
"]. "
509 "Saved error code to return in next user send attempt if any; otherwise ignoring; "
510 "will not schedule periodic auto-pings.");
525 m_snd_ev_wait_func(&m_snd_ev_wait_hndl_auto_ping_timer_fired_peer,
528 boost::make_shared<Task>
529 ([
this]() { snd_on_ev_auto_ping_now_timer_fired(); }));
538 m_snd_auto_ping_timer.expires_after(m_snd_auto_ping_period);
539 m_timer_worker.timer_async_wait(&m_snd_auto_ping_timer, m_snd_auto_ping_timer_fired_peer);
551 m_timer_worker.consume_timer_firing_signal(m_snd_auto_ping_timer_fired_peer);
555 if (m_snd_pending_err_code)
559 FLOW_LOG_WARNING(
"Socket stream [" << *
this <<
"]: Auto-ping timer fired, but an error was "
560 "previously encountered in 2-way pipe; so will neither auto-ping nor schedule next auto-ping. "
561 "Error code details follow: [" << m_snd_pending_err_code <<
"] "
562 "[" << m_snd_pending_err_code.message() <<
"].");
570 FLOW_LOG_TRACE(
"Socket stream [" << *
this <<
"]: "
571 "Auto-ping timer fired; but graceful-close API earlier instructed us to no-op. No-op.");
577 FLOW_LOG_INFO(
"Socket stream [" << *
this <<
"]: "
578 "Auto-ping timer fired; sending/queueing auto-ping; scheduling for next time; it may be "
579 "rescheduled if more user traffic occurs before then.");
583 const Blob_const blob_with_ff{&S_META_BLOB_LENGTH_PING_SENTINEL,
sizeof(S_META_BLOB_LENGTH_PING_SENTINEL)};
585 snd_sync_write_or_q_payload(
Native_handle(), blob_with_ff,
true);
586 if (m_snd_pending_err_code)
588 FLOW_LOG_WARNING(
"Socket stream [" << *
this <<
"]: Wanted to send non-initial auto-ping but detected error "
590 "Error code details follow: [" << m_snd_pending_err_code <<
"] "
591 "[" << m_snd_pending_err_code.message() <<
"]. "
592 "Saved error code to return in next user send attempt if any; otherwise ignoring; "
593 "will not continue scheduling periodic auto-pings.");
598 m_snd_ev_wait_func(&m_snd_ev_wait_hndl_auto_ping_timer_fired_peer,
600 boost::make_shared<Task>
601 ([
this]() { snd_on_ev_auto_ping_now_timer_fired(); }));
602 m_snd_auto_ping_timer.expires_after(m_snd_auto_ping_period);
603 m_timer_worker.timer_async_wait(&m_snd_auto_ping_timer, m_snd_auto_ping_timer_fired_peer);
609 using flow::util::Blob;
614 assert((!m_snd_pending_err_code) &&
"Pipe must not be pre-hosed by contract.");
616 size_t n_sent_or_zero;
617 if (m_snd_pending_payloads_q.empty())
619 FLOW_LOG_TRACE(
"Socket stream [" << *
this <<
"]: Want to send low-level payload: "
620 "handle [" << hndl_or_null <<
"] with blob of size [" << orig_blob.size() <<
"] "
621 "located @ [" << orig_blob.data() <<
"]; no write is pending so proceeding immediately. "
622 "Will drop if all of it would-block? = [" << avoid_qing <<
"].");
624 n_sent_or_zero = snd_nb_write_low_lvl_payload(hndl_or_null, orig_blob, &m_snd_pending_err_code);
625 if (m_snd_pending_err_code)
627 assert(n_sent_or_zero == 0);
632 if (n_sent_or_zero == orig_blob.size())
660 assert(hndl_or_null.
null()
661 &&
"Internal bug? Do not ask to drop a payload with a native handle inside under any circumstances.");
662 if (n_sent_or_zero == 0)
666 const auto q_size = m_snd_pending_payloads_q.size();
667 FLOW_LOG_INFO(
"Socket stream [" << *
this <<
"]: Wanted to send low-level payload: "
668 "blob of size [" << orig_blob.size() <<
"] located @ [" << orig_blob.data() <<
"]; "
669 "result was would-block for all of its bytes (either because blocked-queue was non-empty "
670 "already, or it was empty, but all of payload's bytes would-block at this time). "
671 "Therefore dropping payload (done for auto-pings at least). Out-queue size remains "
672 "[" << q_size <<
"].");
683 FLOW_LOG_INFO(
"Socket stream [" << *
this <<
"]: Wanted to send low-level payload: "
684 "blob of size [" << orig_blob.size() <<
"] located @ [" << orig_blob.data() <<
"]; "
685 "result was would-block for all but [" << n_sent_or_zero <<
"] of its bytes (blocked-queue "
686 "was empty, so nb-send was attmpted, and some -- but not all -- of payload's bytes "
687 "would-block at this time). We cannot \"get back\" the sent bytes and thus are forced "
688 "to queue the remaining ones (would have dropped payload if all the bytes would-block).");
693 auto new_low_lvl_payload = boost::movelib::make_unique<Snd_low_lvl_payload>();
694 if (n_sent_or_zero == 0)
696 new_low_lvl_payload->m_hndl_or_null = hndl_or_null;
700 new_low_lvl_payload->m_blob = Blob(
get_logger());
707 const auto& new_blob = orig_blob + n_sent_or_zero;
708 new_low_lvl_payload->m_blob.assign_copy(new_blob);
710 FLOW_LOG_TRACE(
"Socket stream [" << *
this <<
"]: Want to send pending-from-would-block low-level payload: "
711 "handle [" << new_low_lvl_payload->m_hndl_or_null <<
"] with "
712 "blob of size [" << new_low_lvl_payload->m_blob.size() <<
"] "
713 "located @ [" << new_blob.data() <<
"]; "
714 "created blob copy @ [" << new_low_lvl_payload->m_blob.const_buffer().data() <<
"]; "
715 "enqueued to out-queue which is now of size [" << (m_snd_pending_payloads_q.size() + 1) <<
"].");
717 m_snd_pending_payloads_q.emplace(std::move(new_low_lvl_payload));
719 if (m_snd_pending_payloads_q.size() == 1)
727 snd_async_write_q_head_payload();
733 return bool(m_snd_pending_err_code);
736 assert(!m_snd_pending_err_code);
743 using flow::util::Lock_guard;
752 size_t n_sent_or_zero = 0;
756 Lock_guard<
decltype(m_peer_socket_mutex)> peer_socket_lock(m_peer_socket_mutex);
760 if (hndl_or_null.
null())
770 if (!m_peer_socket->non_blocking())
772 FLOW_LOG_TRACE(
"Socket stream [" << *
this <<
"]: Setting boost.asio peer socket non-blocking mode.");
773 m_peer_socket->non_blocking(
true, *err_code);
782 assert(m_peer_socket->non_blocking());
784 FLOW_LOG_TRACE(
"Writing low-level blob directly via boost.asio (blob details logged above hopefully).");
786 n_sent_or_zero = m_peer_socket->write_some(blob, *err_code);
796 hndl_or_null, blob, err_code);
803 assert(((!*err_code) && (n_sent_or_zero != 0))
804 || (*err_code && (n_sent_or_zero == 0)));
805 if (*err_code == boost::asio::error::would_block)
817 assert((!m_peer_socket_hosed) &&
"m_peer_socket_hosed must start as null and only become non-null once. Bug?");
818 m_peer_socket_hosed = std::move(m_peer_socket);
819 assert((!m_peer_socket) &&
"Shocking unique_ptr misbehavior!");
832 || (n_sent_or_zero == 0));
836 FLOW_LOG_TRACE(
"Sent nothing due to error [" << *err_code <<
"] [" << err_code->message() <<
"].");
840 FLOW_LOG_TRACE(
"Send: no error. Was able to send [" << n_sent_or_zero <<
"] of [" << blob.size() <<
"] bytes.");
841 if (!hndl_or_null.
null())
843 FLOW_LOG_TRACE(
"Able to send the native handle? = [" << (n_sent_or_zero != 0) <<
"].");
847 return n_sent_or_zero;
854 using flow::util::Lock_guard;
855 using boost::asio::async_write;
859 assert((!m_snd_pending_payloads_q.empty()) &&
"Contract is stuff is queued to be async-sent. Bug?");
860 assert((!m_snd_pending_err_code) &&
"Pipe must not be pre-hosed by contract.");
869 Lock_guard<
decltype(m_peer_socket_mutex)> peer_socket_lock(m_peer_socket_mutex);
885 m_snd_ev_wait_func(&m_ev_wait_hndl_peer_socket,
888 boost::make_shared<Task>
889 ([
this]() { snd_on_ev_peer_socket_writable_or_error(); }));
906 FLOW_LOG_TRACE(
"Socket stream [" << *
this <<
"]: User-performed wait-for-writable finished (writable or error, "
907 "we do not know which yet). We endeavour to send->pop->send->... as much of the queue as we "
908 "can until would-block or total success.");
910 assert((!m_snd_pending_payloads_q.empty()) &&
"Send-queue should not be touched while async-write of head is going.");
911 assert((!m_snd_pending_err_code) &&
"Send error would only be detected by us. Bug?");
914 bool would_block =
false;
917 auto& low_lvl_payload = *m_snd_pending_payloads_q.front();
918 auto& hndl_or_null = low_lvl_payload.m_hndl_or_null;
919 auto& low_lvl_blob = low_lvl_payload.m_blob;
920 auto low_lvl_blob_view = low_lvl_blob.const_buffer();
922 FLOW_LOG_TRACE(
"Socket stream [" << *
this <<
"]: "
923 "Out-queue size is [" << m_snd_pending_payloads_q.size() <<
"]; "
924 "want to send handle [" << hndl_or_null <<
"] with "
925 "low-level blob of size [" << low_lvl_blob_view.size() <<
"] "
926 "located @ [" << low_lvl_blob_view.data() <<
"].");
928 const auto n_sent_or_zero
929 = snd_nb_write_low_lvl_payload(low_lvl_payload.m_hndl_or_null, low_lvl_blob_view, &m_snd_pending_err_code);
930 if (m_snd_pending_err_code)
936 if (n_sent_or_zero == low_lvl_blob_view.size())
939 m_snd_pending_payloads_q.pop();
946 if (n_sent_or_zero != 0)
952 low_lvl_payload.m_blob.start_past_prefix_inc(n_sent_or_zero);
958 while ((!m_snd_pending_payloads_q.empty()) && (!would_block) && (!m_snd_pending_err_code));
961 if ((!m_snd_pending_err_code) && (!m_snd_pending_payloads_q.empty()))
963 FLOW_LOG_TRACE(
"Out-queue has not been emptied. Must keep async-send chain going.");
966 snd_async_write_q_head_payload();
980 bool invoke_on_done =
false;
981 if (m_snd_pending_err_code)
983 invoke_on_done = !m_snd_pending_on_last_send_done_func_or_empty.empty();
984 FLOW_LOG_WARNING(
"Socket stream [" << *
this <<
"]: User-performed wait-for-writable reported completion; "
985 "wanted to nb-send any queued data and possibly initiated another wait-for-writable; "
986 "got error during an nb-send or when initiating wait; TRACE details above. "
987 "Error code details follow: "
988 "[" << m_snd_pending_err_code <<
"] [" << m_snd_pending_err_code.message() <<
"]. "
989 "Saved error code to return in next user send attempt if any. "
990 "Will run graceful-sends-close completion handler? = [" << invoke_on_done <<
"].");
992 assert((!m_snd_pending_payloads_q.empty()) &&
"Opportunistic sanity check.");
994 else if (m_snd_pending_payloads_q.empty())
996 FLOW_LOG_TRACE(
"Out-queue has been emptied.");
998 if (!m_snd_pending_on_last_send_done_func_or_empty.empty())
1001 FLOW_LOG_INFO(
"Socket stream [" << *
this <<
"]: "
1002 "We sent graceful-close and any preceding user messages with success. Will now inform user via "
1003 "graceful-sends-close completion handler.");
1004 invoke_on_done =
true;
1011 FLOW_LOG_TRACE(
"Socket stream [" << *
this <<
"]: Executing end-sending completion handler now.");
1012 auto on_done_func = std::move(m_snd_pending_on_last_send_done_func_or_empty);
1013 m_snd_pending_on_last_send_done_func_or_empty.clear();
1015 on_done_func(m_snd_pending_err_code);
1016 FLOW_LOG_TRACE(
"Handler completed.");
proto_ver_t local_max_proto_ver_for_sending()
To be called at most once, this returns local_max_proto_ver from ctor the first time and S_VER_UNKNOW...
static constexpr proto_ver_t S_VER_UNKNOWN
A proto_ver_t value, namely a negative one, which is a reserved value indicating "unknown version"; i...
Error_code m_snd_pending_err_code
The first and only connection-hosing error condition detected when attempting to low-level-write on m...
bool async_end_sending(Error_code *sync_err_code, flow::async::Task_asio_err &&on_done_func)
See Native_socket_stream counterpart.
void snd_on_ev_peer_socket_writable_or_error()
Completion handler, from outside event loop via sync_io pattern, for the async-wait initiated by snd_...
Protocol_negotiator m_protocol_negotiator
Handles the protocol negotiation at the start of the pipe.
bool snd_sync_write_or_q_payload(Native_handle hndl_or_null, const util::Blob_const &orig_blob, bool avoid_qing)
Either synchronously sends hndl_or_null handle (if any) and orig_blob low-level blob over m_peer_sock...
void snd_on_ev_auto_ping_now_timer_fired()
Handler for the async-wait, via util::sync_io::Timer_event_emitter, of the auto-ping timer firing; if...
bool send_native_handle(Native_handle hndl_or_null, const util::Blob_const &meta_blob, Error_code *err_code)
See Native_socket_stream counterpart.
void snd_async_write_q_head_payload()
Initiates async-write over m_peer_socket of the low-level payload at the head of out-queue m_snd_pend...
size_t send_meta_blob_max_size() const
See Native_socket_stream counterpart.
bool async_end_sending_impl(Error_code *sync_err_code_ptr_or_null, flow::async::Task_asio_err &&on_done_func_or_empty)
*end_sending() body.
bool start_send_native_handle_ops(util::sync_io::Event_wait_func &&ev_wait_func)
See Native_socket_stream counterpart.
bool start_send_blob_ops(util::sync_io::Event_wait_func &&ev_wait_func)
See Native_socket_stream counterpart.
uint16_t low_lvl_payload_blob_length_t
The type used to encode the meta-blob length; this puts a cap on how long the meta-blobs can be.
bool send_blob(const util::Blob_const &blob, Error_code *err_code)
See Native_socket_stream counterpart.
bool auto_ping(util::Fine_duration period)
See Native_socket_stream counterpart.
bool end_sending()
See Native_socket_stream counterpart.
size_t snd_nb_write_low_lvl_payload(Native_handle hndl_or_null, const util::Blob_const &blob, Error_code *err_code)
Utility that sends non-empty blob, and (unless null) hndl_or_null associated with its 1st byte,...
size_t send_blob_max_size() const
See Native_socket_stream counterpart.
static const size_t & S_MAX_META_BLOB_LENGTH
The maximum length of a blob that can be sent by this protocol.
bool start_send_native_handle_ops(Event_wait_func_t &&ev_wait_func)
Implements Native_handle_sender API per contract.
size_t send_meta_blob_max_size() const
Implements Native_handle_sender API per contract.
bool send_native_handle(Native_handle hndl_or_null, const util::Blob_const &meta_blob, Error_code *err_code=0)
Implements Native_handle_sender API per contract.
flow::log::Logger * get_logger() const
Returns logger (possibly null).
size_t nb_write_some_with_native_handle(flow::log::Logger *logger_ptr, Peer_socket *peer_socket_ptr, Native_handle payload_hndl, const util::Blob_const &payload_blob, Error_code *err_code)
boost.asio extension similar to peer_socket->non_blocking(true); auto n = peer_socket->write_some(pay...
void async_write_with_native_handle(flow::log::Logger *logger_ptr, Peer_socket *peer_socket_ptr, Native_handle payload_hndl, const util::Blob_const &payload_blob, Task_err &&on_sent_or_error)
boost.asio extension similar to boost::asio::async_write(Peer_socket&, Blob_const,...
@ S_SENDS_FINISHED_CANNOT_SEND
Will not send message: local user already ended sending via API marking this.
@ S_SYNC_IO_WOULD_BLOCK
A sync_io operation could not immediately complete; it will complete contingent on active async-wait ...
@ S_INVALID_ARGUMENT
User called an API with 1 or more arguments against the API spec.
@ S_LOW_LVL_TRANSPORT_HOSED_CANNOT_SEND
Unable to send outgoing traffic: an earlier-reported, or at least logged, system error had hosed the ...
sync_io-pattern counterparts to async-I/O-pattern object types in parent namespace ipc::transport.
util::Native_handle Native_handle
Convenience alias for the commonly used type util::Native_handle.
Function< void(Asio_waitable_native_handle *hndl_of_interest, bool ev_of_interest_snd_else_rcv, Task_ptr &&on_active_ev_func)> Event_wait_func
In sync_io pattern, concrete type storing user-supplied function invoked by pattern-implementing ipc:...
flow::async::Task Task
Short-hand for polymorphic function (a-la std::function<>) that takes no arguments and returns nothin...
flow::Fine_duration Fine_duration
Short-hand for Flow's Fine_duration.
boost::asio::const_buffer Blob_const
Short-hand for an immutable blob somewhere in memory, stored as exactly a void const * and a size_t.
flow::Error_code Error_code
Short-hand for flow::Error_code which is very common.
A monolayer-thin wrapper around a native handle, a/k/a descriptor a/k/a FD.
bool null() const
Returns true if and only if m_native_handle equals S_NULL_HANDLE.