24#include <flow/log/log.hpp> 
   25#include <flow/async/single_thread_task_loop.hpp> 
   72template<
typename Core_t>
 
   74  public flow::log::Log_context,
 
   75  private boost::noncopyable
 
  101                       flow::async::Single_thread_task_loop* worker, 
Core* sync_io);
 
  198  mutable flow::util::Mutex_non_recursive 
m_mutex;
 
  218template<
typename Core_t>
 
  221                                                   flow::async::Single_thread_task_loop* worker,
 
  223  flow::log::Log_context(logger_ptr, 
Log_component::S_TRANSPORT),
 
  230  using flow::util::Lock_guard;
 
  246  m_sync_io.start_send_blob_ops([
this](Asio_waitable_native_handle* hndl_of_interest,
 
  247                                       bool ev_of_interest_snd_else_rcv,
 
  253    FLOW_LOG_TRACE(
m_log_pfx << 
": Sync-IO send-ops event-wait request: " 
  254                   "descriptor [" << hndl_of_interest->native_handle() << 
"], " 
  255                   "writable-else-readable [" << ev_of_interest_snd_else_rcv << 
"].");
 
  258    assert(hndl_of_interest);
 
  259    hndl_of_interest->async_wait(ev_of_interest_snd_else_rcv
 
  260                                   ? Asio_waitable_native_handle::Base::wait_write
 
  261                                   : Asio_waitable_native_handle::Base::wait_read,
 
  262                                 [
this, on_active_ev_func = std::move(on_active_ev_func)]
 
  267      if (err_code == boost::asio::error::operation_aborted)
 
  282      (*on_active_ev_func)();
 
  291template<
typename Core_t>
 
  294  using flow::async::Single_thread_task_loop;
 
  295  using flow::util::ostream_op_string;
 
  305  if (!m_end_sending_on_done_func_or_empty.empty())
 
  307    Single_thread_task_loop one_thread(get_logger(), ostream_op_string(m_log_pfx, 
"-snd-temp_deinit"));
 
  308    one_thread.start([&]()
 
  310      FLOW_LOG_INFO(m_log_pfx << 
": In transient snd-finisher thread: " 
  311                    "Shall run pending graceful-sends-close completion handler.");
 
  313      FLOW_LOG_INFO(
"Transient finisher exiting.");
 
  318template<
typename Core_t>
 
  321  using flow::util::Lock_guard;
 
  328  Lock_guard<
decltype(m_mutex)> lock(m_mutex);
 
  332  m_sync_io.send_blob(blob, err_code);
 
  333  assert(ok && 
"We are by definition in PEER state, and ctor starts send-ops; so that should never " 
  334                 "return false.  Bug somewhere?");
 
  337template<
typename Core_t>
 
  341  using flow::util::Lock_guard;
 
  357  Lock_guard<
decltype(m_mutex)> lock(m_mutex);
 
  361  m_sync_io.send_native_handle(hndl, meta_blob, err_code);
 
  362  assert(ok && 
"We are by definition in PEER state, and ctor starts send-ops; so that should never " 
  363                 "return false.  Bug somewhere?");
 
  366template<
typename Core_t>
 
  369  using flow::util::Lock_guard;
 
  376  Lock_guard<
decltype(m_mutex)> lock(m_mutex);
 
  378  if (on_done_func_or_empty.empty())
 
  380    return m_sync_io.end_sending();
 
  384  if (!m_end_sending_on_done_func_or_empty.empty())
 
  386    FLOW_LOG_WARNING(m_log_pfx << 
": async_end_sending(F) invoked; but we have a saved " 
  387                     "completion handler -- which has not yet fired -- for it already, so it must be a dupe-call.  " 
  396  m_end_sending_on_done_func_or_empty = std::move(on_done_func_or_empty);
 
  399  const bool ok = m_sync_io.async_end_sending(&sync_err_code, [
this](
const Error_code& err_code)
 
  403    on_sync_io_end_sending_done(err_code);
 
  409    m_end_sending_on_done_func_or_empty.clear();
 
  422  FLOW_LOG_INFO(m_log_pfx << 
": Sync-IO async-end-sending completed immediately.  " 
  423                "Posting handler onto async-worker thread.");
 
  425  on_sync_io_end_sending_done(sync_err_code);
 
  431template<
typename Core_t>
 
  434  using flow::util::Lock_guard;
 
  439  assert(err_code != boost::asio::error::operation_aborted);
 
  441  FLOW_LOG_TRACE(m_log_pfx << 
":: Earlier async-wait => event active => snd-mutex lock => " 
  442                 "on-active-event-func => sync_io module => here (on-end-sending-done handler).  Or else " 
  443                 "snd-mutex lock => no async-wait needed => here... (ditto).");
 
  450  m_worker.post([
this, err_code, on_done_func = std::move(m_end_sending_on_done_func_or_empty)]()
 
  453    FLOW_LOG_TRACE(m_log_pfx << 
": Invoking on-end-sending-done handler.");
 
  454    on_done_func(err_code);
 
  455    FLOW_LOG_TRACE(
"Handler completed.");
 
  458  m_end_sending_on_done_func_or_empty.clear(); 
 
  461template<
typename Core_t>
 
  464  using flow::util::Lock_guard;
 
  468  Lock_guard<
decltype(m_mutex)> lock(m_mutex);
 
  469  return m_sync_io.auto_ping(period);
 
Internal-use type that adapts a given PEER-state sync_io::Native_handle_sender or sync_io::Blob_sende...
~Async_adapter_sender()
To be invoked after ->stop()ping *worker (from ctor), as well as flushing any still-queued tasks in i...
void send_native_handle(Native_handle hndl_or_null, const util::Blob_const &meta_blob, Error_code *err_code)
See Native_handle_sender counterpart.
flow::async::Task_asio_err m_end_sending_on_done_func_or_empty
The on_done_func argument to async_end_sending(), possibly .empty() if originally user invoked end_se...
Core_t Core
The sync_io::X type being adapted into async-I/O-pattern X.
void on_sync_io_end_sending_done(const Error_code &err_code)
Handles the completion of m_sync_io.async_end_sending() operation whether synchronously or asynchrono...
bool async_end_sending(flow::async::Task_asio_err &&on_done_func_or_empty)
See Native_handle_sender counterpart; or leave on_done_func_or_empty.empty() for Native_handle_sender...
void send_blob(const util::Blob_const &blob, Error_code *err_code)
See Blob_sender counterpart.
flow::util::Mutex_non_recursive m_mutex
Protects m_end_sending_on_done_func_or_empty and, more importantly, send-ops data of m_sync_io.
flow::async::Single_thread_task_loop & m_worker
Single-thread worker pool for all internal async work. Referred to as thread W in comments.
Core & m_sync_io
The core Core engine, implementing the sync_io pattern (see util::sync_io doc header).
const std::string m_log_pfx
See log_pfx arg of ctor.
bool auto_ping(util::Fine_duration period)
See Native_handle_sender counterpart.
Async_adapter_sender(flow::log::Logger *logger_ptr, util::String_view log_pfx, flow::async::Single_thread_task_loop *worker, Core *sync_io)
Constructs the adapter around sync_io::X object *sync_io.
Useful if using the sync_io pattern within a user event loop built on boost.asio (optionally with flo...
@ S_SYNC_IO_WOULD_BLOCK
A sync_io operation could not immediately complete; it will complete contingent on active async-wait ...
@ S_OBJECT_SHUTDOWN_ABORTED_COMPLETION_HANDLER
Async completion handler is being called prematurely, because underlying object is shutting down,...
sync_io-pattern counterparts to async-I/O-pattern object types in parent namespace ipc::transport.
boost::shared_ptr< Task > Task_ptr
Short-hand for ref-counted pointer to a Function<> that takes no arguments and returns nothing; in pa...
flow::Fine_duration Fine_duration
Short-hand for Flow's Fine_duration.
boost::asio::const_buffer Blob_const
Short-hand for an immutable blob somewhere in memory, stored as exactly a void const * and a size_t.
flow::util::String_view String_view
Short-hand for Flow's String_view.
Log_component
The flow::log::Component payload enumeration containing various log components used by Flow-IPC inter...
flow::Error_code Error_code
Short-hand for flow::Error_code which is very common.
A monolayer-thin wrapper around a native handle, a/k/a descriptor a/k/a FD.