23#include <flow/log/log.hpp> 
   24#include <flow/async/single_thread_task_loop.hpp> 
   25#include <boost/move/make_unique.hpp> 
   83template<
typename Core_t>
 
   85  public flow::log::Log_context,
 
   86  private boost::noncopyable
 
  112                         flow::async::Single_thread_task_loop* worker, 
Core* sync_io);
 
  139                                   flow::async::Task_asio_err_sz&& on_done_func);
 
  151                          flow::async::Task_asio_err_sz&& on_done_func);
 
  178    using Ptr = boost::movelib::unique_ptr<User_request>;
 
  208                                        flow::async::Task_asio_err_sz&& on_done_func);
 
  282  mutable flow::util::Mutex_non_recursive 
m_mutex;
 
  302template<
typename Core_t>
 
  305                                                       flow::async::Single_thread_task_loop* worker,
 
  307  flow::log::Log_context(logger_ptr, 
Log_component::S_TRANSPORT),
 
  314  using flow::util::Lock_guard;
 
  330  m_sync_io.start_receive_blob_ops([
this](Asio_waitable_native_handle* hndl_of_interest,
 
  331                                          bool ev_of_interest_snd_else_rcv,
 
  337    FLOW_LOG_TRACE(
m_log_pfx << 
": Sync-IO receive-ops event-wait request: " 
  338                   "descriptor [" << hndl_of_interest->native_handle() << 
"], " 
  339                   "writable-else-readable [" << ev_of_interest_snd_else_rcv << 
"].");
 
  342    assert(hndl_of_interest);
 
  343    hndl_of_interest->async_wait(ev_of_interest_snd_else_rcv
 
  344                                   ? Asio_waitable_native_handle::Base::wait_write
 
  345                                   : Asio_waitable_native_handle::Base::wait_read,
 
  346                                 [
this, on_active_ev_func = std::move(on_active_ev_func)]
 
  351      if (err_code == boost::asio::error::operation_aborted)
 
  366      (*on_active_ev_func)();
 
  375template<
typename Core_t>
 
  378  using flow::async::Single_thread_task_loop;
 
  379  using flow::async::reset_thread_pinning;
 
  380  using flow::util::ostream_op_string;
 
  391    Single_thread_task_loop one_thread(get_logger(),
 
  392                                       ostream_op_string(
"ARcDeinit-", m_log_pfx));
 
  393    one_thread.start([&]()
 
  395      reset_thread_pinning(get_logger()); 
 
  397      FLOW_LOG_TRACE(
"Running head slot async-receive completion handler.");
 
  399      FLOW_LOG_TRACE(
"User receive handler finished.");
 
  401      while (!m_pending_user_requests_q.empty())
 
  403        FLOW_LOG_TRACE(
"Running a queued async-receive completion handler.");
 
  404        m_pending_user_requests_q.front()
 
  406        m_pending_user_requests_q.pop();
 
  407        FLOW_LOG_TRACE(
"User receive handler finished.  Popped from user request deficit queue.");
 
  413    assert(m_pending_user_requests_q.empty()); 
 
  417template<
typename Core_t>
 
  420                                                                 flow::async::Task_asio_err_sz&& on_done_func)
 
  422  assert(target_hndl && 
"Native_socket_stream::async_receive_native_handle() must take non-null Native_handle ptr.");
 
  423  async_receive_native_handle_impl(target_hndl, target_meta_blob, std::move(on_done_func));
 
  431template<
typename Core_t>
 
  433                                                        flow::async::Task_asio_err_sz&& on_done_func)
 
  435  async_receive_native_handle_impl(
nullptr, target_blob, std::move(on_done_func));
 
  438template<
typename Core_t>
 
  441                                                                      flow::async::Task_asio_err_sz&& on_done_func)
 
  443  using flow::util::Lock_guard;
 
  444  using boost::movelib::make_unique;
 
  451  Lock_guard<
decltype(m_mutex)> lock(m_mutex);
 
  482  FLOW_LOG_TRACE(m_log_pfx << 
": Incoming user async-receive request for " 
  483                 "possible native handle and meta-blob (located @ [" << target_meta_blob.data() << 
"] of " 
  484                 "max size [" << target_meta_blob.size() << 
"]).  In worker now? = [" << m_worker.in_thread() << 
"].");
 
  486  auto new_user_request = make_unique<User_request>();
 
  487  new_user_request->m_target_hndl_ptr = target_hndl_or_null;
 
  488  new_user_request->m_target_meta_blob = target_meta_blob;
 
  489  new_user_request->m_on_done_func = std::move(on_done_func);
 
  493    m_pending_user_requests_q.emplace(std::move(new_user_request));
 
  494    FLOW_LOG_TRACE(
"At least 1 async-receive request is already in progress.  " 
  495                   "After registering the new async-receive request: Head slot is non-empty; and " 
  496                   "subsequently-pending deficit queue has size [" << m_pending_user_requests_q.size() << 
"].  " 
  497                   "Will sync-IO-receive to handle this request once it reaches the front of that queue.");
 
  502  FLOW_LOG_TRACE(
"No async-receive request is currently in progress.  Starting sync-IO-receive chain to service the " 
  503                 "new request and any further-queued requests that might appear in the meantime.");
 
  504  m_user_request = std::move(new_user_request);
 
  515  if (m_user_request->m_target_hndl_ptr)
 
  517    if constexpr(Core::S_TRANSMIT_NATIVE_HANDLES) 
 
  522      m_sync_io.async_receive_native_handle(m_user_request->m_target_hndl_ptr, m_user_request->m_target_meta_blob,
 
  523                                            &sync_err_code, &sync_sz,
 
  527        on_sync_io_rcv_done(err_code, sz); 
 
  532      assert(
false && 
"This code should never be reached.");
 
  540    m_sync_io.async_receive_blob(m_user_request->m_target_meta_blob, &sync_err_code, &sync_sz,
 
  542      { on_sync_io_rcv_done(err_code, sz); });
 
  545  assert(ok && 
"We are by definition in PEER state, and ctor starts receive-ops, and we never start a " 
  546               "sync_io async-receive before ensuring previous one has executed; so that should never " 
  547               "return false.  Bug somewhere?");
 
  558  process_msg_or_error(sync_err_code, sync_sz);
 
  560  FLOW_LOG_TRACE(
"Message was immediately available; synchronously returned to user; handler posted onto " 
  561                 "async worker thread.  Done until next request.");
 
  564template<
typename Core_t>
 
  567  using flow::util::Lock_guard;
 
  572  assert(err_code != boost::asio::error::operation_aborted);
 
  575  FLOW_LOG_TRACE(m_log_pfx << 
": Earlier async-wait => event active => rcv-mutex lock => " 
  576                 "on-active-event-func => sync_io module => on-rcv-done handler => here.  " 
  577                 "Or else: rcv-mutex lock => sync_io module => no async-wait needed => here.");
 
  611  assert(m_user_request);
 
  613  assert(!m_user_request);
 
  615  queue<typename User_request::Ptr> ex_pending_user_requests_q_or_none;
 
  618    FLOW_LOG_TRACE(
"Error emitted by sync-IO => all [" << m_pending_user_requests_q.size() << 
"] pending " 
  619                   "handlers (may well be none) will be invoked with error in addition to the head handler.");
 
  621    ex_pending_user_requests_q_or_none = std::move(m_pending_user_requests_q);
 
  622    assert(m_pending_user_requests_q.empty());
 
  624  else if (!m_pending_user_requests_q.empty()) 
 
  626    FLOW_LOG_TRACE(
"Success emitted by sync-IO => lead handler will be invoked; next pending request will " 
  627                   "be serviced by sync-IO; pending request count has become (after popping from queue into lead " 
  628                   "slot): [" << (m_pending_user_requests_q.size() - 1) << 
"].\n");
 
  630    m_user_request = std::move(m_pending_user_requests_q.front());
 
  632    m_pending_user_requests_q.pop();
 
  636    FLOW_LOG_TRACE(
"Success emitted by sync-IO => lead handler will be invoked; no pending requests queued up.\n");
 
  640  m_worker.post([
this, err_code, sz,
 
  643                 ex_user_request = boost::shared_ptr<User_request>(std::move(ex_user_request)),
 
  644                 ex_pending_user_requests_q_or_none
 
  645                   = boost::make_shared<
decltype(ex_pending_user_requests_q_or_none)>
 
  646                       (std::move(ex_pending_user_requests_q_or_none))]()
 
  650    assert(ex_user_request);
 
  651    FLOW_LOG_TRACE(m_log_pfx << 
": Invoking head handler.");
 
  652    (ex_user_request->m_on_done_func)(err_code, sz);
 
  653    FLOW_LOG_TRACE(
"Handler completed.");
 
  654    if (!ex_pending_user_requests_q_or_none->empty())
 
  659      FLOW_LOG_TRACE(m_log_pfx << 
": Invoking [" << ex_pending_user_requests_q_or_none->size() << 
"] " 
  660                     "pending handlers in one shot (due to error).");
 
  661      while (!ex_pending_user_requests_q_or_none->empty())
 
  663        ex_pending_user_requests_q_or_none->front()->m_on_done_func(err_code, 0);
 
  664        ex_pending_user_requests_q_or_none->pop();
 
  665        FLOW_LOG_TRACE(
"In-queue handler finished.");
 
  671  assert(!ex_user_request); 
 
  672  assert(ex_pending_user_requests_q_or_none.empty());
 
  675template<
typename Core_t>
 
  678  using flow::util::Lock_guard;
 
  683  assert(err_code != boost::asio::error::operation_aborted);
 
  686  FLOW_LOG_TRACE(m_log_pfx << 
": Earlier async-wait => event active => rcv-mutex lock => " 
  687                 "on-active-event-func => sync_io module => here (on-rcv-done handler).");
 
  698  process_msg_or_error(err_code, sz);
 
  711    if (m_user_request->m_target_hndl_ptr)
 
  713      if constexpr(Core::S_TRANSMIT_NATIVE_HANDLES)
 
  718        m_sync_io.async_receive_native_handle(m_user_request->m_target_hndl_ptr, m_user_request->m_target_meta_blob,
 
  719                                              &sync_err_code, &sync_sz,
 
  722          on_sync_io_rcv_done(err_code, sz); 
 
  727        assert(
false && 
"This code should never be reached.");
 
  735      m_sync_io.async_receive_blob(m_user_request->m_target_meta_blob, &sync_err_code, &sync_sz,
 
  737        { on_sync_io_rcv_done(err_code, sz); });
 
  739    assert(ok && 
"We are by definition in PEER state, and ctor starts receive-ops, and we never start a " 
  740                 "sync_io async-receive before ensuring previous one has executed; so that should never " 
  741                 "return false.  Bug somewhere?");
 
  752    process_msg_or_error(sync_err_code, sync_sz);
 
  761template<
typename Core_t>
 
  764  using flow::util::Lock_guard;
 
  768  Lock_guard<
decltype(m_mutex)> lock(m_mutex);
 
  769  return m_sync_io.idle_timer_run(timeout);
 
Internal-use type that adapts a given PEER-state sync_io::Native_handle_receiver or sync_io::Blob_rec...
flow::async::Single_thread_task_loop & m_worker
Single-thread worker pool for all internal async work. Referred to as thread W in comments.
flow::util::Mutex_non_recursive m_mutex
Protects m_user_request, m_pending_user_requests_q, and receive-ops data of m_sync_io.
std::queue< typename User_request::Ptr > m_pending_user_requests_q
Queue storing deficit async-receive requests queued up due to m_user_request being not null while mor...
bool idle_timer_run(util::Fine_duration timeout)
See Native_handle_receiver counterpart.
Async_adapter_receiver(flow::log::Logger *logger_ptr, util::String_view log_pfx, flow::async::Single_thread_task_loop *worker, Core *sync_io)
Constructs the adapter around sync_io::X object *sync_io.
const std::string m_log_pfx
See log_pfx arg of ctor.
void async_receive_native_handle_impl(Native_handle *target_hndl_or_null, const util::Blob_mutable &target_meta_blob, flow::async::Task_asio_err_sz &&on_done_func)
Body of async_receive_native_handle() and async_receive_blob(); with target_hndl null if and only if ...
User_request::Ptr m_user_request
The head slot containing the currently-being-serviced "deficit" async-receive request,...
~Async_adapter_receiver()
To be invoked after ->stop()ping *worker (from ctor), as well as flushing any still-queued tasks in i...
void on_sync_io_rcv_done(const Error_code &err_code, size_t sz)
Invoked via active-event API, handles the async completion of m_sync_io.async_receive_*() operation.
void async_receive_native_handle(Native_handle *target_hndl, const util::Blob_mutable &target_meta_blob, flow::async::Task_asio_err_sz &&on_done_func)
See Native_handle_sender counterpart.
void async_receive_blob(const util::Blob_mutable &target_blob, flow::async::Task_asio_err_sz &&on_done_func)
See Blob_receiver counterpart.
Core_t Core
The sync_io::X type being adapted into async-I/O-pattern X.
Core & m_sync_io
The core Core engine, implementing the sync_io pattern (see util::sync_io doc header).
void process_msg_or_error(const Error_code &err_code, size_t sz)
Invoked from thread U/W (async_receive_native_handle_impl()) or thread W (active-event API),...
Useful if using the sync_io pattern within a user event loop built on boost.asio (optionally with flo...
@ S_SYNC_IO_WOULD_BLOCK
A sync_io operation could not immediately complete; it will complete contingent on active async-wait ...
@ S_OBJECT_SHUTDOWN_ABORTED_COMPLETION_HANDLER
Async completion handler is being called prematurely, because underlying object is shutting down,...
sync_io-pattern counterparts to async-I/O-pattern object types in parent namespace ipc::transport.
boost::shared_ptr< Task > Task_ptr
Short-hand for ref-counted pointer to a Function<> that takes no arguments and returns nothing; in pa...
boost::asio::mutable_buffer Blob_mutable
Short-hand for an mutable blob somewhere in memory, stored as exactly a void* and a size_t.
flow::Fine_duration Fine_duration
Short-hand for Flow's Fine_duration.
flow::util::String_view String_view
Short-hand for Flow's String_view.
Log_component
The flow::log::Component payload enumeration containing various log components used by Flow-IPC inter...
flow::Error_code Error_code
Short-hand for flow::Error_code which is very common.
Data store representing a deficit user async-receive request: either one being currently handled by m...
Native_handle * m_target_hndl_ptr
See async_receive_native_handle() target_hndl. Null for async_receive_blob().
util::Blob_mutable m_target_meta_blob
See async_receive_native_handle() target_meta_blob. Or see async_receive_blob() target_blob.
boost::movelib::unique_ptr< User_request > Ptr
Short-hand for unique_ptr to this.
flow::async::Task_asio_err_sz m_on_done_func
See async_receive_native_handle() or async_receive_blob() on_done_func.
A monolayer-thin wrapper around a native handle, a/k/a descriptor a/k/a FD.