23#include <flow/log/log.hpp> 
   24#include <flow/async/single_thread_task_loop.hpp> 
   25#include <boost/move/make_unique.hpp> 
   83template<
typename Core_t>
 
   85  public flow::log::Log_context,
 
   86  private boost::noncopyable
 
  112                         flow::async::Single_thread_task_loop* worker, 
Core* sync_io);
 
  139                                   flow::async::Task_asio_err_sz&& on_done_func);
 
  151                          flow::async::Task_asio_err_sz&& on_done_func);
 
  178    using Ptr = boost::movelib::unique_ptr<User_request>;
 
  208                                        flow::async::Task_asio_err_sz&& on_done_func);
 
  282  mutable flow::util::Mutex_non_recursive 
m_mutex;
 
  302template<
typename Core_t>
 
  305                                                       flow::async::Single_thread_task_loop* worker,
 
  307  flow::log::Log_context(logger_ptr, 
Log_component::S_TRANSPORT),
 
  314  using flow::util::Lock_guard;
 
  330  m_sync_io.start_receive_blob_ops([
this](Asio_waitable_native_handle* hndl_of_interest,
 
  331                                          bool ev_of_interest_snd_else_rcv,
 
  337    FLOW_LOG_TRACE(
m_log_pfx << 
": Sync-IO receive-ops event-wait request: " 
  338                   "descriptor [" << hndl_of_interest->native_handle() << 
"], " 
  339                   "writable-else-readable [" << ev_of_interest_snd_else_rcv << 
"].");
 
  342    assert(hndl_of_interest);
 
  343    hndl_of_interest->async_wait(ev_of_interest_snd_else_rcv
 
  344                                   ? Asio_waitable_native_handle::Base::wait_write
 
  345                                   : Asio_waitable_native_handle::Base::wait_read,
 
  346                                 [
this, on_active_ev_func = std::move(on_active_ev_func)]
 
  351      if (err_code == boost::asio::error::operation_aborted)
 
  366      (*on_active_ev_func)();
 
  375template<
typename Core_t>
 
  378  using flow::async::Single_thread_task_loop;
 
  379  using flow::util::ostream_op_string;
 
  390    Single_thread_task_loop one_thread(get_logger(), ostream_op_string(m_log_pfx, 
"-rcv-temp_deinit"));
 
  391    one_thread.start([&]()
 
  393      FLOW_LOG_TRACE(
"Running head slot async-receive completion handler.");
 
  395      FLOW_LOG_TRACE(
"User receive handler finished.");
 
  397      while (!m_pending_user_requests_q.empty())
 
  399        FLOW_LOG_TRACE(
"Running a queued async-receive completion handler.");
 
  400        m_pending_user_requests_q.front()
 
  402        m_pending_user_requests_q.pop();
 
  403        FLOW_LOG_TRACE(
"User receive handler finished.  Popped from user request deficit queue.");
 
  409    assert(m_pending_user_requests_q.empty()); 
 
  413template<
typename Core_t>
 
  416                                                                 flow::async::Task_asio_err_sz&& on_done_func)
 
  418  assert(target_hndl && 
"Native_socket_stream::async_receive_native_handle() must take non-null Native_handle ptr.");
 
  419  async_receive_native_handle_impl(target_hndl, target_meta_blob, std::move(on_done_func));
 
  427template<
typename Core_t>
 
  429                                                        flow::async::Task_asio_err_sz&& on_done_func)
 
  431  async_receive_native_handle_impl(
nullptr, target_blob, std::move(on_done_func));
 
  434template<
typename Core_t>
 
  437                                                                      flow::async::Task_asio_err_sz&& on_done_func)
 
  439  using flow::util::Lock_guard;
 
  440  using boost::movelib::make_unique;
 
  447  Lock_guard<
decltype(m_mutex)> lock(m_mutex);
 
  478  FLOW_LOG_TRACE(m_log_pfx << 
": Incoming user async-receive request for " 
  479                 "possible native handle and meta-blob (located @ [" << target_meta_blob.data() << 
"] of " 
  480                 "max size [" << target_meta_blob.size() << 
"]).  In worker now? = [" << m_worker.in_thread() << 
"].");
 
  482  auto new_user_request = make_unique<User_request>();
 
  483  new_user_request->m_target_hndl_ptr = target_hndl_or_null;
 
  484  new_user_request->m_target_meta_blob = target_meta_blob;
 
  485  new_user_request->m_on_done_func = std::move(on_done_func);
 
  489    m_pending_user_requests_q.emplace(std::move(new_user_request));
 
  490    FLOW_LOG_TRACE(
"At least 1 async-receive request is already in progress.  " 
  491                   "After registering the new async-receive request: Head slot is non-empty; and " 
  492                   "subsequently-pending deficit queue has size [" << m_pending_user_requests_q.size() << 
"].  " 
  493                   "Will sync-IO-receive to handle this request once it reaches the front of that queue.");
 
  498  FLOW_LOG_TRACE(
"No async-receive request is currently in progress.  Starting sync-IO-receive chain to service the " 
  499                 "new request and any further-queued requests that might appear in the meantime.");
 
  500  m_user_request = std::move(new_user_request);
 
  511  if (m_user_request->m_target_hndl_ptr)
 
  513    if constexpr(Core::S_TRANSMIT_NATIVE_HANDLES) 
 
  518      m_sync_io.async_receive_native_handle(m_user_request->m_target_hndl_ptr, m_user_request->m_target_meta_blob,
 
  519                                            &sync_err_code, &sync_sz,
 
  523        on_sync_io_rcv_done(err_code, sz); 
 
  528      assert(
false && 
"This code should never be reached.");
 
  536    m_sync_io.async_receive_blob(m_user_request->m_target_meta_blob, &sync_err_code, &sync_sz,
 
  538      { on_sync_io_rcv_done(err_code, sz); });
 
  541  assert(ok && 
"We are by definition in PEER state, and ctor starts receive-ops, and we never start a " 
  542               "sync_io async-receive before ensuring previous one has executed; so that should never " 
  543               "return false.  Bug somewhere?");
 
  554  process_msg_or_error(sync_err_code, sync_sz);
 
  556  FLOW_LOG_TRACE(
"Message was immediately available; synchronously returned to user; handler posted onto " 
  557                 "async worker thread.  Done until next request.");
 
  560template<
typename Core_t>
 
  563  using flow::util::Lock_guard;
 
  568  assert(err_code != boost::asio::error::operation_aborted);
 
  571  FLOW_LOG_TRACE(m_log_pfx << 
": Earlier async-wait => event active => rcv-mutex lock => " 
  572                 "on-active-event-func => sync_io module => on-rcv-done handler => here.  " 
  573                 "Or else: rcv-mutex lock => sync_io module => no async-wait needed => here.");
 
  607  assert(m_user_request);
 
  609  assert(!m_user_request);
 
  611  queue<typename User_request::Ptr> ex_pending_user_requests_q_or_none;
 
  614    FLOW_LOG_TRACE(
"Error emitted by sync-IO => all [" << m_pending_user_requests_q.size() << 
"] pending " 
  615                   "handlers (may well be none) will be invoked with error in addition to the head handler.");
 
  617    ex_pending_user_requests_q_or_none = std::move(m_pending_user_requests_q);
 
  618    assert(m_pending_user_requests_q.empty());
 
  620  else if (!m_pending_user_requests_q.empty()) 
 
  622    FLOW_LOG_TRACE(
"Success emitted by sync-IO => lead handler will be invoked; next pending request will " 
  623                   "be serviced by sync-IO; pending request count has become (after popping from queue into lead " 
  624                   "slot): [" << (m_pending_user_requests_q.size() - 1) << 
"].\n");
 
  626    m_user_request = std::move(m_pending_user_requests_q.front());
 
  628    m_pending_user_requests_q.pop();
 
  632    FLOW_LOG_TRACE(
"Success emitted by sync-IO => lead handler will be invoked; no pending requests queued up.\n");
 
  636  m_worker.post([
this, err_code, sz,
 
  639                 ex_user_request = boost::shared_ptr<User_request>(std::move(ex_user_request)),
 
  640                 ex_pending_user_requests_q_or_none
 
  641                   = boost::make_shared<
decltype(ex_pending_user_requests_q_or_none)>
 
  642                       (std::move(ex_pending_user_requests_q_or_none))]()
 
  646    assert(ex_user_request);
 
  647    FLOW_LOG_TRACE(m_log_pfx << 
": Invoking head handler.");
 
  648    (ex_user_request->m_on_done_func)(err_code, sz);
 
  649    FLOW_LOG_TRACE(
"Handler completed.");
 
  650    if (!ex_pending_user_requests_q_or_none->empty())
 
  655      FLOW_LOG_TRACE(m_log_pfx << 
": Invoking [" << ex_pending_user_requests_q_or_none->size() << 
"] " 
  656                     "pending handlers in one shot (due to error).");
 
  657      while (!ex_pending_user_requests_q_or_none->empty())
 
  659        ex_pending_user_requests_q_or_none->front()->m_on_done_func(err_code, 0);
 
  660        ex_pending_user_requests_q_or_none->pop();
 
  661        FLOW_LOG_TRACE(
"In-queue handler finished.");
 
  667  assert(!ex_user_request); 
 
  668  assert(ex_pending_user_requests_q_or_none.empty());
 
  671template<
typename Core_t>
 
  674  using flow::util::Lock_guard;
 
  679  assert(err_code != boost::asio::error::operation_aborted);
 
  682  FLOW_LOG_TRACE(m_log_pfx << 
": Earlier async-wait => event active => rcv-mutex lock => " 
  683                 "on-active-event-func => sync_io module => here (on-rcv-done handler).");
 
  694  process_msg_or_error(err_code, sz);
 
  707    if (m_user_request->m_target_hndl_ptr)
 
  709      if constexpr(Core::S_TRANSMIT_NATIVE_HANDLES)
 
  714        m_sync_io.async_receive_native_handle(m_user_request->m_target_hndl_ptr, m_user_request->m_target_meta_blob,
 
  715                                              &sync_err_code, &sync_sz,
 
  718          on_sync_io_rcv_done(err_code, sz); 
 
  723        assert(
false && 
"This code should never be reached.");
 
  731      m_sync_io.async_receive_blob(m_user_request->m_target_meta_blob, &sync_err_code, &sync_sz,
 
  733        { on_sync_io_rcv_done(err_code, sz); });
 
  735    assert(ok && 
"We are by definition in PEER state, and ctor starts receive-ops, and we never start a " 
  736                 "sync_io async-receive before ensuring previous one has executed; so that should never " 
  737                 "return false.  Bug somewhere?");
 
  748    process_msg_or_error(sync_err_code, sync_sz);
 
  757template<
typename Core_t>
 
  760  using flow::util::Lock_guard;
 
  764  Lock_guard<
decltype(m_mutex)> lock(m_mutex);
 
  765  return m_sync_io.idle_timer_run(timeout);
 
Internal-use type that adapts a given PEER-state sync_io::Native_handle_receiver or sync_io::Blob_rec...
flow::async::Single_thread_task_loop & m_worker
Single-thread worker pool for all internal async work. Referred to as thread W in comments.
flow::util::Mutex_non_recursive m_mutex
Protects m_user_request, m_pending_user_requests_q, and receive-ops data of m_sync_io.
std::queue< typename User_request::Ptr > m_pending_user_requests_q
Queue storing deficit async-receive requests queued up due to m_user_request being not null while mor...
bool idle_timer_run(util::Fine_duration timeout)
See Native_handle_receiver counterpart.
Async_adapter_receiver(flow::log::Logger *logger_ptr, util::String_view log_pfx, flow::async::Single_thread_task_loop *worker, Core *sync_io)
Constructs the adapter around sync_io::X object *sync_io.
const std::string m_log_pfx
See log_pfx arg of ctor.
void async_receive_native_handle_impl(Native_handle *target_hndl_or_null, const util::Blob_mutable &target_meta_blob, flow::async::Task_asio_err_sz &&on_done_func)
Body of async_receive_native_handle() and async_receive_blob(); with target_hndl null if and only if ...
User_request::Ptr m_user_request
The head slot containing the currently-being-serviced "deficit" async-receive request,...
~Async_adapter_receiver()
To be invoked after ->stop()ping *worker (from ctor), as well as flushing any still-queued tasks in i...
void on_sync_io_rcv_done(const Error_code &err_code, size_t sz)
Invoked via active-event API, handles the async completion of m_sync_io.async_receive_*() operation.
void async_receive_native_handle(Native_handle *target_hndl, const util::Blob_mutable &target_meta_blob, flow::async::Task_asio_err_sz &&on_done_func)
See Native_handle_sender counterpart.
void async_receive_blob(const util::Blob_mutable &target_blob, flow::async::Task_asio_err_sz &&on_done_func)
See Blob_receiver counterpart.
Core_t Core
The sync_io::X type being adapted into async-I/O-pattern X.
Core & m_sync_io
The core Core engine, implementing the sync_io pattern (see util::sync_io doc header).
void process_msg_or_error(const Error_code &err_code, size_t sz)
Invoked from thread U/W (async_receive_native_handle_impl()) or thread W (active-event API),...
Useful if using the sync_io pattern within a user event loop built on boost.asio (optionally with flo...
@ S_SYNC_IO_WOULD_BLOCK
A sync_io operation could not immediately complete; it will complete contingent on active async-wait ...
@ S_OBJECT_SHUTDOWN_ABORTED_COMPLETION_HANDLER
Async completion handler is being called prematurely, because underlying object is shutting down,...
sync_io-pattern counterparts to async-I/O-pattern object types in parent namespace ipc::transport.
boost::shared_ptr< Task > Task_ptr
Short-hand for ref-counted pointer to a Function<> that takes no arguments and returns nothing; in pa...
boost::asio::mutable_buffer Blob_mutable
Short-hand for an mutable blob somewhere in memory, stored as exactly a void* and a size_t.
flow::Fine_duration Fine_duration
Short-hand for Flow's Fine_duration.
flow::util::String_view String_view
Short-hand for Flow's String_view.
Log_component
The flow::log::Component payload enumeration containing various log components used by Flow-IPC inter...
flow::Error_code Error_code
Short-hand for flow::Error_code which is very common.
Data store representing a deficit user async-receive request: either one being currently handled by m...
Native_handle * m_target_hndl_ptr
See async_receive_native_handle() target_hndl. Null for async_receive_blob().
util::Blob_mutable m_target_meta_blob
See async_receive_native_handle() target_meta_blob. Or see async_receive_blob() target_blob.
boost::movelib::unique_ptr< User_request > Ptr
Short-hand for unique_ptr to this.
flow::async::Task_asio_err_sz m_on_done_func
See async_receive_native_handle() or async_receive_blob() on_done_func.
A monolayer-thin wrapper around a native handle, a/k/a descriptor a/k/a FD.