23#include <flow/log/log.hpp>
24#include <flow/async/single_thread_task_loop.hpp>
25#include <boost/move/make_unique.hpp>
83template<
typename Core_t>
85 public flow::log::Log_context,
86 private boost::noncopyable
112 flow::async::Single_thread_task_loop* worker,
Core* sync_io);
139 flow::async::Task_asio_err_sz&& on_done_func);
151 flow::async::Task_asio_err_sz&& on_done_func);
178 using Ptr = boost::movelib::unique_ptr<User_request>;
208 flow::async::Task_asio_err_sz&& on_done_func);
282 mutable flow::util::Mutex_non_recursive
m_mutex;
302template<
typename Core_t>
305 flow::async::Single_thread_task_loop* worker,
307 flow::log::Log_context(logger_ptr,
Log_component::S_TRANSPORT),
314 using flow::util::Lock_guard;
330 m_sync_io.start_receive_blob_ops([
this](Asio_waitable_native_handle* hndl_of_interest,
331 bool ev_of_interest_snd_else_rcv,
337 FLOW_LOG_TRACE(
m_log_pfx <<
": Sync-IO receive-ops event-wait request: "
338 "descriptor [" << hndl_of_interest->native_handle() <<
"], "
339 "writable-else-readable [" << ev_of_interest_snd_else_rcv <<
"].");
342 assert(hndl_of_interest);
343 hndl_of_interest->async_wait(ev_of_interest_snd_else_rcv
344 ? Asio_waitable_native_handle::Base::wait_write
345 : Asio_waitable_native_handle::Base::wait_read,
346 [
this, on_active_ev_func = std::move(on_active_ev_func)]
351 if (err_code == boost::asio::error::operation_aborted)
366 (*on_active_ev_func)();
375template<
typename Core_t>
378 using flow::async::Single_thread_task_loop;
379 using flow::async::reset_thread_pinning;
380 using flow::util::ostream_op_string;
391 Single_thread_task_loop one_thread(get_logger(),
392 ostream_op_string(
"ARcDeinit-", m_log_pfx));
393 one_thread.start([&]()
395 reset_thread_pinning(get_logger());
397 FLOW_LOG_TRACE(
"Running head slot async-receive completion handler.");
399 FLOW_LOG_TRACE(
"User receive handler finished.");
401 while (!m_pending_user_requests_q.empty())
403 FLOW_LOG_TRACE(
"Running a queued async-receive completion handler.");
404 m_pending_user_requests_q.front()
406 m_pending_user_requests_q.pop();
407 FLOW_LOG_TRACE(
"User receive handler finished. Popped from user request deficit queue.");
413 assert(m_pending_user_requests_q.empty());
417template<
typename Core_t>
420 flow::async::Task_asio_err_sz&& on_done_func)
422 assert(target_hndl &&
"Native_socket_stream::async_receive_native_handle() must take non-null Native_handle ptr.");
423 async_receive_native_handle_impl(target_hndl, target_meta_blob, std::move(on_done_func));
431template<
typename Core_t>
433 flow::async::Task_asio_err_sz&& on_done_func)
435 async_receive_native_handle_impl(
nullptr, target_blob, std::move(on_done_func));
438template<
typename Core_t>
441 flow::async::Task_asio_err_sz&& on_done_func)
443 using flow::util::Lock_guard;
444 using boost::movelib::make_unique;
451 Lock_guard<
decltype(m_mutex)> lock(m_mutex);
482 FLOW_LOG_TRACE(m_log_pfx <<
": Incoming user async-receive request for "
483 "possible native handle and meta-blob (located @ [" << target_meta_blob.data() <<
"] of "
484 "max size [" << target_meta_blob.size() <<
"]). In worker now? = [" << m_worker.in_thread() <<
"].");
486 auto new_user_request = make_unique<User_request>();
487 new_user_request->m_target_hndl_ptr = target_hndl_or_null;
488 new_user_request->m_target_meta_blob = target_meta_blob;
489 new_user_request->m_on_done_func = std::move(on_done_func);
493 m_pending_user_requests_q.emplace(std::move(new_user_request));
494 FLOW_LOG_TRACE(
"At least 1 async-receive request is already in progress. "
495 "After registering the new async-receive request: Head slot is non-empty; and "
496 "subsequently-pending deficit queue has size [" << m_pending_user_requests_q.size() <<
"]. "
497 "Will sync-IO-receive to handle this request once it reaches the front of that queue.");
502 FLOW_LOG_TRACE(
"No async-receive request is currently in progress. Starting sync-IO-receive chain to service the "
503 "new request and any further-queued requests that might appear in the meantime.");
504 m_user_request = std::move(new_user_request);
515 if (m_user_request->m_target_hndl_ptr)
517 if constexpr(Core::S_TRANSMIT_NATIVE_HANDLES)
522 m_sync_io.async_receive_native_handle(m_user_request->m_target_hndl_ptr, m_user_request->m_target_meta_blob,
523 &sync_err_code, &sync_sz,
527 on_sync_io_rcv_done(err_code, sz);
532 assert(
false &&
"This code should never be reached.");
540 m_sync_io.async_receive_blob(m_user_request->m_target_meta_blob, &sync_err_code, &sync_sz,
542 { on_sync_io_rcv_done(err_code, sz); });
545 assert(ok &&
"We are by definition in PEER state, and ctor starts receive-ops, and we never start a "
546 "sync_io async-receive before ensuring previous one has executed; so that should never "
547 "return false. Bug somewhere?");
558 process_msg_or_error(sync_err_code, sync_sz);
560 FLOW_LOG_TRACE(
"Message was immediately available; synchronously returned to user; handler posted onto "
561 "async worker thread. Done until next request.");
564template<
typename Core_t>
567 using flow::util::Lock_guard;
572 assert(err_code != boost::asio::error::operation_aborted);
575 FLOW_LOG_TRACE(m_log_pfx <<
": Earlier async-wait => event active => rcv-mutex lock => "
576 "on-active-event-func => sync_io module => on-rcv-done handler => here. "
577 "Or else: rcv-mutex lock => sync_io module => no async-wait needed => here.");
611 assert(m_user_request);
613 assert(!m_user_request);
615 queue<typename User_request::Ptr> ex_pending_user_requests_q_or_none;
618 FLOW_LOG_TRACE(
"Error emitted by sync-IO => all [" << m_pending_user_requests_q.size() <<
"] pending "
619 "handlers (may well be none) will be invoked with error in addition to the head handler.");
621 ex_pending_user_requests_q_or_none = std::move(m_pending_user_requests_q);
622 assert(m_pending_user_requests_q.empty());
624 else if (!m_pending_user_requests_q.empty())
626 FLOW_LOG_TRACE(
"Success emitted by sync-IO => lead handler will be invoked; next pending request will "
627 "be serviced by sync-IO; pending request count has become (after popping from queue into lead "
628 "slot): [" << (m_pending_user_requests_q.size() - 1) <<
"].\n");
630 m_user_request = std::move(m_pending_user_requests_q.front());
632 m_pending_user_requests_q.pop();
636 FLOW_LOG_TRACE(
"Success emitted by sync-IO => lead handler will be invoked; no pending requests queued up.\n");
640 m_worker.post([
this, err_code, sz,
643 ex_user_request = boost::shared_ptr<User_request>(std::move(ex_user_request)),
644 ex_pending_user_requests_q_or_none
645 = boost::make_shared<
decltype(ex_pending_user_requests_q_or_none)>
646 (std::move(ex_pending_user_requests_q_or_none))]()
650 assert(ex_user_request);
651 FLOW_LOG_TRACE(m_log_pfx <<
": Invoking head handler.");
652 (ex_user_request->m_on_done_func)(err_code, sz);
653 FLOW_LOG_TRACE(
"Handler completed.");
654 if (!ex_pending_user_requests_q_or_none->empty())
659 FLOW_LOG_TRACE(m_log_pfx <<
": Invoking [" << ex_pending_user_requests_q_or_none->size() <<
"] "
660 "pending handlers in one shot (due to error).");
661 while (!ex_pending_user_requests_q_or_none->empty())
663 ex_pending_user_requests_q_or_none->front()->m_on_done_func(err_code, 0);
664 ex_pending_user_requests_q_or_none->pop();
665 FLOW_LOG_TRACE(
"In-queue handler finished.");
671 assert(!ex_user_request);
672 assert(ex_pending_user_requests_q_or_none.empty());
675template<
typename Core_t>
678 using flow::util::Lock_guard;
683 assert(err_code != boost::asio::error::operation_aborted);
686 FLOW_LOG_TRACE(m_log_pfx <<
": Earlier async-wait => event active => rcv-mutex lock => "
687 "on-active-event-func => sync_io module => here (on-rcv-done handler).");
698 process_msg_or_error(err_code, sz);
711 if (m_user_request->m_target_hndl_ptr)
713 if constexpr(Core::S_TRANSMIT_NATIVE_HANDLES)
718 m_sync_io.async_receive_native_handle(m_user_request->m_target_hndl_ptr, m_user_request->m_target_meta_blob,
719 &sync_err_code, &sync_sz,
722 on_sync_io_rcv_done(err_code, sz);
727 assert(
false &&
"This code should never be reached.");
735 m_sync_io.async_receive_blob(m_user_request->m_target_meta_blob, &sync_err_code, &sync_sz,
737 { on_sync_io_rcv_done(err_code, sz); });
739 assert(ok &&
"We are by definition in PEER state, and ctor starts receive-ops, and we never start a "
740 "sync_io async-receive before ensuring previous one has executed; so that should never "
741 "return false. Bug somewhere?");
752 process_msg_or_error(sync_err_code, sync_sz);
761template<
typename Core_t>
764 using flow::util::Lock_guard;
768 Lock_guard<
decltype(m_mutex)> lock(m_mutex);
769 return m_sync_io.idle_timer_run(timeout);
Internal-use type that adapts a given PEER-state sync_io::Native_handle_receiver or sync_io::Blob_rec...
flow::async::Single_thread_task_loop & m_worker
Single-thread worker pool for all internal async work. Referred to as thread W in comments.
flow::util::Mutex_non_recursive m_mutex
Protects m_user_request, m_pending_user_requests_q, and receive-ops data of m_sync_io.
std::queue< typename User_request::Ptr > m_pending_user_requests_q
Queue storing deficit async-receive requests queued up due to m_user_request being not null while mor...
bool idle_timer_run(util::Fine_duration timeout)
See Native_handle_receiver counterpart.
Async_adapter_receiver(flow::log::Logger *logger_ptr, util::String_view log_pfx, flow::async::Single_thread_task_loop *worker, Core *sync_io)
Constructs the adapter around sync_io::X object *sync_io.
const std::string m_log_pfx
See log_pfx arg of ctor.
void async_receive_native_handle_impl(Native_handle *target_hndl_or_null, const util::Blob_mutable &target_meta_blob, flow::async::Task_asio_err_sz &&on_done_func)
Body of async_receive_native_handle() and async_receive_blob(); with target_hndl null if and only if ...
User_request::Ptr m_user_request
The head slot containing the currently-being-serviced "deficit" async-receive request,...
~Async_adapter_receiver()
To be invoked after ->stop()ping *worker (from ctor), as well as flushing any still-queued tasks in i...
void on_sync_io_rcv_done(const Error_code &err_code, size_t sz)
Invoked via active-event API, handles the async completion of m_sync_io.async_receive_*() operation.
void async_receive_native_handle(Native_handle *target_hndl, const util::Blob_mutable &target_meta_blob, flow::async::Task_asio_err_sz &&on_done_func)
See Native_handle_sender counterpart.
void async_receive_blob(const util::Blob_mutable &target_blob, flow::async::Task_asio_err_sz &&on_done_func)
See Blob_receiver counterpart.
Core_t Core
The sync_io::X type being adapted into async-I/O-pattern X.
Core & m_sync_io
The core Core engine, implementing the sync_io pattern (see util::sync_io doc header).
void process_msg_or_error(const Error_code &err_code, size_t sz)
Invoked from thread U/W (async_receive_native_handle_impl()) or thread W (active-event API),...
Useful if using the sync_io pattern within a user event loop built on boost.asio (optionally with flo...
@ S_SYNC_IO_WOULD_BLOCK
A sync_io operation could not immediately complete; it will complete contingent on active async-wait ...
@ S_OBJECT_SHUTDOWN_ABORTED_COMPLETION_HANDLER
Async completion handler is being called prematurely, because underlying object is shutting down,...
sync_io-pattern counterparts to async-I/O-pattern object types in parent namespace ipc::transport.
boost::shared_ptr< Task > Task_ptr
Short-hand for ref-counted pointer to a Function<> that takes no arguments and returns nothing; in pa...
boost::asio::mutable_buffer Blob_mutable
Short-hand for an mutable blob somewhere in memory, stored as exactly a void* and a size_t.
flow::Fine_duration Fine_duration
Short-hand for Flow's Fine_duration.
flow::util::String_view String_view
Short-hand for Flow's String_view.
Log_component
The flow::log::Component payload enumeration containing various log components used by Flow-IPC inter...
flow::Error_code Error_code
Short-hand for flow::Error_code which is very common.
Data store representing a deficit user async-receive request: either one being currently handled by m...
Native_handle * m_target_hndl_ptr
See async_receive_native_handle() target_hndl. Null for async_receive_blob().
util::Blob_mutable m_target_meta_blob
See async_receive_native_handle() target_meta_blob. Or see async_receive_blob() target_blob.
boost::movelib::unique_ptr< User_request > Ptr
Short-hand for unique_ptr to this.
flow::async::Task_asio_err_sz m_on_done_func
See async_receive_native_handle() or async_receive_blob() on_done_func.
A monolayer-thin wrapper around a native handle, a/k/a descriptor a/k/a FD.