24#include <boost/move/make_unique.hpp> 
   50template<
typename Session>
 
   99  template<
typename On_passive_open_channel_handler, 
typename Task_err>
 
  102                           Task_err&& on_err_func,
 
  103                           On_passive_open_channel_handler&& on_passive_open_channel_func);
 
  123  template<
typename Task_err>
 
  126                           Task_err&& on_err_func);
 
  140  template<
typename Event_wait_func_t>
 
  141  bool start_ops(Event_wait_func_t&& ev_wait_func);
 
  152  template<
typename Create_ev_wait_hndl_func>
 
  170  template<
typename Task_err, 
typename On_passive_open_channel_handler>
 
  171  bool init_handlers(Task_err&& on_err_func_arg, On_passive_open_channel_handler&& on_passive_open_channel_func_arg);
 
  184  template<
typename Task_err>
 
  210  template<
typename... Args>
 
  251    using Ptr = boost::movelib::unique_ptr<Channel_open_result>;
 
  421template<
typename Session>
 
  423  m_ready_reader_err(m_nb_task_engine), 
 
  424  m_ready_writer_err(m_nb_task_engine), 
 
  425  m_ev_wait_hndl_err(m_ev_hndl_task_engine_unused), 
 
  426  m_ready_reader_chan(m_nb_task_engine),
 
  427  m_ready_writer_chan(m_nb_task_engine),
 
  428  m_ev_wait_hndl_chan(m_ev_hndl_task_engine_unused)
 
  434template<
typename Session>
 
  435template<
typename On_passive_open_channel_handler, 
typename Task_err>
 
  438                                          Task_err&& on_err_func,
 
  439                                          On_passive_open_channel_handler&& on_passive_open_channel_func) :
 
  440  m_ready_reader_err(m_nb_task_engine), 
 
  441  m_ready_writer_err(m_nb_task_engine), 
 
  442  m_ev_wait_hndl_err(m_ev_hndl_task_engine_unused), 
 
  443  m_ready_reader_chan(m_nb_task_engine),
 
  444  m_ready_writer_chan(m_nb_task_engine),
 
  445  m_ev_wait_hndl_chan(m_ev_hndl_task_engine_unused),
 
  446  m_on_err_func(std::move(on_err_func)),
 
  447  m_on_channel_func_or_empty(std::move(on_passive_open_channel_func)),
 
  448  m_async_io(logger_ptr, cli_app_ref, srv_app_ref, on_err_func_sio(), on_channel_func_sio())
 
  454template<
typename Session>
 
  455template<
typename Task_err>
 
  458                                          Task_err&& on_err_func) :
 
  459  m_ready_reader_err(m_nb_task_engine), 
 
  460  m_ready_writer_err(m_nb_task_engine), 
 
  461  m_ev_wait_hndl_err(m_ev_hndl_task_engine_unused), 
 
  462  m_ready_reader_chan(m_nb_task_engine),
 
  463  m_ready_writer_chan(m_nb_task_engine),
 
  464  m_ev_wait_hndl_chan(m_ev_hndl_task_engine_unused),
 
  465  m_on_err_func(std::move(on_err_func)),
 
  466  m_async_io(logger_ptr, cli_app_ref, srv_app_ref, on_err_func_sio())
 
  472template<
typename Session>
 
  473template<
typename Task_err, 
typename On_passive_open_channel_handler>
 
  475                                             On_passive_open_channel_handler&& on_passive_open_channel_func_arg)
 
  477  if (!m_on_err_func.empty())
 
  479    FLOW_LOG_WARNING(
"Session_adapter [" << m_async_io << 
"]: init_handlers() called duplicately.  Ignoring.");
 
  483  assert(m_on_channel_func_or_empty.empty());
 
  485  m_on_err_func = std::move(on_err_func_arg);
 
  486  m_on_channel_func_or_empty = std::move(on_passive_open_channel_func_arg);
 
  491  core()->init_handlers(on_err_func_sio(), on_channel_func_sio());
 
  493  assert(ok && 
"We should have caught this with the above guard.");
 
  497template<
typename Session>
 
  498template<
typename Task_err>
 
  501  if (!m_on_err_func.empty())
 
  503    FLOW_LOG_WARNING(
"Session_adapter [" << m_async_io << 
"]: init_handlers() called duplicately.  Ignoring.");
 
  508  m_on_err_func = std::move(on_err_func_arg);
 
  513  core()->init_handlers(on_err_func_sio());
 
  515  assert(ok && 
"We should have caught this with the above guard.");
 
  519template<
typename Session>
 
  524  using boost::asio::connect_pipe;
 
  528  connect_pipe(*reader, *writer, sys_err_code);
 
  531    FLOW_LOG_FATAL(
"Session_adapter [" << m_async_io << 
"]: Constructing: connect-pipe failed.  Details follow.");
 
  532    FLOW_ERROR_SYS_ERROR_LOG_FATAL();
 
  533    assert(
false && 
"We chose not to complicate the code given how unlikely this is, and how hosed you'd have to be.");
 
  540template<
typename Session>
 
  541template<
typename Event_wait_func_t>
 
  546  if (!m_ev_wait_func.empty())
 
  548    FLOW_LOG_WARNING(
"Session_adapter [" << m_async_io << 
"]: Start-ops requested, " 
  549                     "but we are already started.  Probably a user bug, but it is not for us to judge.");
 
  554  m_ev_wait_func = std::move(ev_wait_func);
 
  559  async_wait(&m_ev_wait_hndl_err,
 
  561             boost::make_shared<Task>([
this]()
 
  563    FLOW_LOG_INFO(
"Session_adapter [" << m_async_io << 
"]: Async-IO core on-error event: informed via IPC-pipe; " 
  564                  "invoking handler.");
 
  567    auto on_done_func = std::move(m_on_err_func);
 
  568    m_on_err_func.clear(); 
 
  570    on_done_func(m_target_err_code_err);
 
  571    FLOW_LOG_TRACE(
"Handler completed.");
 
  579  async_wait(&m_ev_wait_hndl_chan,
 
  581             boost::make_shared<Task>([
this]() { on_ev_channel_open(); }));
 
  583  FLOW_LOG_INFO(
"Session_adapter [" << m_async_io << 
"]: Start-ops requested.  Done.");
 
  587template<
typename Session>
 
  591  using flow::util::Lock_guard;
 
  597    Lock_guard<
decltype(m_target_channel_open_q_mutex)> lock(m_target_channel_open_q_mutex);
 
  599    FLOW_LOG_INFO(
"Session_adapter [" << m_async_io << 
"]: Async-IO core passively-opened channel event: " 
  600                  "informed via IPC-pipe; invoking handler.  Including this one " 
  601                  "[" << m_target_channel_open_q.size() << 
"] are pending.");
 
  603    assert((!m_target_channel_open_q.empty())
 
  604           && 
"Algorithm bug?  Result-queue elements and pipe signal bytes must be 1-1, so either something " 
  605              "failed to correctly push, or something overzealously popped.");
 
  607    result = std::move(m_target_channel_open_q.front());
 
  608    m_target_channel_open_q.pop();
 
  611  m_on_channel_func_or_empty(std::move(result->m_channel), std::move(result->m_mdt_reader_ptr));
 
  612  FLOW_LOG_TRACE(
"Handler completed.  Beginning next async-wait immediately.  If more is/are pending " 
  613                 "it/they'll be popped quickly due to immediately-completing async-wait(s).");
 
  616  async_wait(&m_ev_wait_hndl_chan,
 
  618             boost::make_shared<Task>([
this]() { on_ev_channel_open(); }));
 
  621template<
typename Session>
 
  622template<
typename Create_ev_wait_hndl_func>
 
  627  if (!m_ev_wait_func.empty())
 
  629    FLOW_LOG_WARNING(
"Session_adapter [" << m_async_io << 
"]: Cannot replace event-wait handles after " 
  630                     "a start-*-ops procedure has been executed.  Ignoring.");
 
  635  FLOW_LOG_INFO(
"Session_adapter [" << m_async_io << 
"]: Replacing event-wait handles (probably to replace underlying " 
  636                "execution context without outside event loop's boost.asio Task_engine or similar).");
 
  638  assert(m_ev_wait_hndl_err.is_open());
 
  639  assert(m_ev_wait_hndl_chan.is_open());
 
  642  m_ev_wait_hndl_err = create_ev_wait_hndl_func();
 
  643  m_ev_wait_hndl_err.assign(saved);
 
  645  saved = m_ev_wait_hndl_chan.release();
 
  646  m_ev_wait_hndl_chan = create_ev_wait_hndl_func();
 
  647  m_ev_wait_hndl_chan.assign(saved);
 
  652template<
typename Session>
 
  655  using flow::async::Task_asio_err;
 
  661    FLOW_LOG_INFO(
"Session_adapter [" << m_async_io << 
"]: Async-IO core reports on-error event: tickling IPC-pipe to " 
  664    assert((!m_target_err_code_err)
 
  665           && 
"Error handler must never fire more than once per Session!  Bug in the particular Session_obj type?");
 
  666    m_target_err_code_err = err_code;
 
  674template<
typename Session>
 
  678  using flow::util::Lock_guard;
 
  686      Lock_guard<
decltype(m_target_channel_open_q_mutex)> lock(m_target_channel_open_q_mutex);
 
  688      FLOW_LOG_INFO(
"Session_adapter [" << m_async_io << 
"]: Async-IO core reports passively-opened channel event: " 
  689                    "tickling IPC-pipe to inform user.  This will make the # of pending such events " 
  690                    "[" << (m_target_channel_open_q.size() + 1) << 
"].");
 
  691      m_target_channel_open_q.emplace(boost::movelib::make_unique<Channel_open_result>());
 
  692      auto& result = *(m_target_channel_open_q.back());
 
  693      result.m_channel = std::move(new_channel);
 
  694      result.m_mdt_reader_ptr = std::move(new_channel_mdt);
 
  707template<
typename Session>
 
  708template<
typename... Args>
 
  711  m_ev_wait_func(std::forward<Args>(args)...);
 
  714template<
typename Session>
 
  720template<
typename Session>
 
  726template<
typename Session>
 
  729  return core()->get_logger();
 
  732template<
typename Session>
 
  735  return core()->get_log_component();
 
A documentation-only concept defining the local side of an IPC conversation (session) with another en...
unspecified Channel_obj
Each successful open_channel() and on-passive-open handler firing shall yield a concrete transport::C...
shared_ptr< typename transport::struc::schema::Metadata< Mdt_payload_obj >::Reader > Mdt_reader_ptr
Ref-counted handle to a capnp-generated Reader (and the payload it accesses) through which the user s...
typename Base::Session_obj Session_obj
Short-hand, for generic programming et al, for template parameter Session.
Internal-use workhorse containing common elements of Client_session_adapter and Server_session_adapte...
flow::async::Task_asio_err m_on_err_func
on_err_func from init_handlers(); .empty() until then.
const flow::log::Component & get_log_component() const
See flow::log::Log_context.
util::Pipe_writer m_ready_writer_err
Write-end of IPC-pipe together with m_ready_reader_err.
util::sync_io::Asio_waitable_native_handle m_ev_wait_hndl_err
Descriptor waitable by outside event loop async-waits – storing the same Native_handle as (and thus b...
typename Session_obj::Channel_obj Channel_obj
Short-hand for session-openable Channel type.
typename Session_obj::Mdt_reader_ptr Mdt_reader_ptr
Short-hand for session-open metadata reader.
void init_pipe(util::Pipe_reader *reader, util::Pipe_writer *writer, util::sync_io::Asio_waitable_native_handle *ev_wait_hndl)
Utility that sets up an IPC-pipe in the given peer objects as well as loading a watcher-descriptor ob...
void on_ev_channel_open()
Signaled by the function returned by on_channel_func_sio(), it returns the IPC-pipe to steady-state (...
On_channel_func on_channel_func_sio()
Returns the proper on-passive-channel-open handler to set up on the underlying Session_obj (Client_se...
flow::async::Task_asio_err on_err_func_sio()
Returns the proper on-error handler to set up on the underlying Session_obj (Client_session: via ctor...
bool replace_event_wait_handles(const Create_ev_wait_hndl_func &create_ev_wait_hndl_func)
See, e.g., Client_session_adapter.
Error_code m_target_err_code_err
Result given to (or about to be given to) m_on_err_func.
void async_wait(Args &&... args)
Forwards to the util::sync_io::Event_wait_func saved in start_ops().
util::Pipe_reader m_ready_reader_err
Read-end of IPC-pipe used by *this used to detect that the error-wait has completed.
flow::util::Mutex_non_recursive m_target_channel_open_q_mutex
Protects m_target_channel_open_q, accessed from user async-wait-reporter thread; and Session_obj work...
Session_adapter()
Forwards to the Session_obj default ctor.
util::sync_io::Asio_waitable_native_handle m_ev_wait_hndl_chan
Descriptor waitable by outside event loop async-waits – storing the same Native_handle as (and thus b...
Session_obj * core()
The adapted mutable Session_obj.
util::Pipe_reader m_ready_reader_chan
Read-end of IPC-pipe used by *this used to detect that a channel-open-wait has completed.
Function< void(Channel_obj &&, Mdt_reader_ptr &&)> On_channel_func
Short-hand for passive-channel-open handler.
flow::log::Logger * get_logger() const
See flow::log::Log_context.
Session Session_obj
See, e.g., Client_session_adapter.
bool start_ops(Event_wait_func_t &&ev_wait_func)
See, e.g., Client_session_adapter.
Async_io_obj m_async_io
This guy does all the work. In our dtor this will be destroyed (hence thread stopped) first-thing.
util::sync_io::Event_wait_func m_ev_wait_func
Function (set forever in start_ops()) through which we invoke the outside event loop's async-wait fac...
Channel_open_result_q m_target_channel_open_q
Queue of On_channel_func handler arg sets received from async-I/O Session_obj – meaning the Session,...
std::queue< typename Channel_open_result::Ptr > Channel_open_result_q
Queue of Channel_open_result.
bool init_handlers(Task_err &&on_err_func_arg, On_passive_open_channel_handler &&on_passive_open_channel_func_arg)
Compilable only when Session_obj is a Server_session variant, forwards to its method of identical for...
util::Pipe_writer m_ready_writer_chan
Write-end of IPC-pipe together with m_ready_reader_chan.
flow::util::Task_engine m_nb_task_engine
The Task_engine for m_ready_*.
On_channel_func m_on_channel_func_or_empty
on_passive_open_channel_func_or_empty from init_handlers() (possibly .empty() if not supplied); until...
flow::util::Task_engine m_ev_hndl_task_engine_unused
The Task_engine for m_ev_wait_hndl_*, unless it is replaced via replace_event_wait_handles().
Dummy type for use as a template param to Channel when either the blobs pipe or handles pipe is disab...
Useful if using the sync_io pattern within a user event loop built on boost.asio (optionally with flo...
void assign(Native_handle hndl)
Loads value to be returned by native_handle().
sync_io-pattern counterparts to async-I/O-pattern object types in parent namespace ipc::session.
util::Native_handle Native_handle
Convenience alias for the commonly used type util::Native_handle.
Function< void(Asio_waitable_native_handle *hndl_of_interest, bool ev_of_interest_snd_else_rcv, Task_ptr &&on_active_ev_func)> Event_wait_func
In sync_io pattern, concrete type storing user-supplied function invoked by pattern-implementing ipc:...
flow::async::Task Task
Short-hand for polymorphic function (a-la std::function<>) that takes no arguments and returns nothin...
void pipe_produce(flow::log::Logger *logger_ptr, Pipe_writer *pipe)
Writes a byte to the given pipe writer.
void pipe_consume(flow::log::Logger *logger_ptr, Pipe_reader *pipe)
Reads a byte via the given pipe reader.
boost::asio::writable_pipe Pipe_writer
Short-hand for anonymous pipe write end.
boost::asio::readable_pipe Pipe_reader
Short-hand for anonymous pipe read end.
flow::Error_code Error_code
Short-hand for flow::Error_code which is very common.
flow::Function< Signature > Function
Short-hand for polymorphic functor holder which is very common. This is essentially std::function.
An App that is used as a client in at least one client-server IPC split.
An App that is used as a server in at least one client-server IPC split.
Set of result arg values from a successful passive-channel-open from a Session_obj invoking On_channe...
Channel_obj m_channel
Result 1/2 given about to be given to m_on_channel_func_or_empty.
Mdt_reader_ptr m_mdt_reader_ptr
Result 2/2 given about to be given to m_on_channel_func_or_empty.
boost::movelib::unique_ptr< Channel_open_result > Ptr
Short-hand for pointer wrapper around a *this.
A monolayer-thin wrapper around a native handle, a/k/a descriptor a/k/a FD.