24#include <boost/move/make_unique.hpp>
50template<
typename Session>
99 template<
typename On_passive_open_channel_handler,
typename Task_err>
102 Task_err&& on_err_func,
103 On_passive_open_channel_handler&& on_passive_open_channel_func);
123 template<
typename Task_err>
126 Task_err&& on_err_func);
140 template<
typename Event_wait_func_t>
141 bool start_ops(Event_wait_func_t&& ev_wait_func);
152 template<
typename Create_ev_wait_hndl_func>
170 template<
typename Task_err,
typename On_passive_open_channel_handler>
171 bool init_handlers(Task_err&& on_err_func_arg, On_passive_open_channel_handler&& on_passive_open_channel_func_arg);
184 template<
typename Task_err>
210 template<
typename... Args>
251 using Ptr = boost::movelib::unique_ptr<Channel_open_result>;
421template<
typename Session>
423 m_ready_reader_err(m_nb_task_engine),
424 m_ready_writer_err(m_nb_task_engine),
425 m_ev_wait_hndl_err(m_ev_hndl_task_engine_unused),
426 m_ready_reader_chan(m_nb_task_engine),
427 m_ready_writer_chan(m_nb_task_engine),
428 m_ev_wait_hndl_chan(m_ev_hndl_task_engine_unused)
434template<
typename Session>
435template<
typename On_passive_open_channel_handler,
typename Task_err>
438 Task_err&& on_err_func,
439 On_passive_open_channel_handler&& on_passive_open_channel_func) :
440 m_ready_reader_err(m_nb_task_engine),
441 m_ready_writer_err(m_nb_task_engine),
442 m_ev_wait_hndl_err(m_ev_hndl_task_engine_unused),
443 m_ready_reader_chan(m_nb_task_engine),
444 m_ready_writer_chan(m_nb_task_engine),
445 m_ev_wait_hndl_chan(m_ev_hndl_task_engine_unused),
446 m_on_err_func(std::move(on_err_func)),
447 m_on_channel_func_or_empty(std::move(on_passive_open_channel_func)),
448 m_async_io(logger_ptr, cli_app_ref, srv_app_ref, on_err_func_sio(), on_channel_func_sio())
454template<
typename Session>
455template<
typename Task_err>
458 Task_err&& on_err_func) :
459 m_ready_reader_err(m_nb_task_engine),
460 m_ready_writer_err(m_nb_task_engine),
461 m_ev_wait_hndl_err(m_ev_hndl_task_engine_unused),
462 m_ready_reader_chan(m_nb_task_engine),
463 m_ready_writer_chan(m_nb_task_engine),
464 m_ev_wait_hndl_chan(m_ev_hndl_task_engine_unused),
465 m_on_err_func(std::move(on_err_func)),
466 m_async_io(logger_ptr, cli_app_ref, srv_app_ref, on_err_func_sio())
472template<
typename Session>
473template<
typename Task_err,
typename On_passive_open_channel_handler>
475 On_passive_open_channel_handler&& on_passive_open_channel_func_arg)
477 if (!m_on_err_func.empty())
479 FLOW_LOG_WARNING(
"Session_adapter [" << m_async_io <<
"]: init_handlers() called duplicately. Ignoring.");
483 assert(m_on_channel_func_or_empty.empty());
485 m_on_err_func = std::move(on_err_func_arg);
486 m_on_channel_func_or_empty = std::move(on_passive_open_channel_func_arg);
491 core()->init_handlers(on_err_func_sio(), on_channel_func_sio());
493 assert(ok &&
"We should have caught this with the above guard.");
497template<
typename Session>
498template<
typename Task_err>
501 if (!m_on_err_func.empty())
503 FLOW_LOG_WARNING(
"Session_adapter [" << m_async_io <<
"]: init_handlers() called duplicately. Ignoring.");
508 m_on_err_func = std::move(on_err_func_arg);
513 core()->init_handlers(on_err_func_sio());
515 assert(ok &&
"We should have caught this with the above guard.");
519template<
typename Session>
524 using boost::asio::connect_pipe;
528 connect_pipe(*reader, *writer, sys_err_code);
531 FLOW_LOG_FATAL(
"Session_adapter [" << m_async_io <<
"]: Constructing: connect-pipe failed. Details follow.");
532 FLOW_ERROR_SYS_ERROR_LOG_FATAL();
533 assert(
false &&
"We chose not to complicate the code given how unlikely this is, and how hosed you'd have to be.");
540template<
typename Session>
541template<
typename Event_wait_func_t>
546 if (!m_ev_wait_func.empty())
548 FLOW_LOG_WARNING(
"Session_adapter [" << m_async_io <<
"]: Start-ops requested, "
549 "but we are already started. Probably a user bug, but it is not for us to judge.");
554 m_ev_wait_func = std::move(ev_wait_func);
559 async_wait(&m_ev_wait_hndl_err,
561 boost::make_shared<Task>([
this]()
563 FLOW_LOG_INFO(
"Session_adapter [" << m_async_io <<
"]: Async-IO core on-error event: informed via IPC-pipe; "
564 "invoking handler.");
567 auto on_done_func = std::move(m_on_err_func);
568 m_on_err_func.clear();
570 on_done_func(m_target_err_code_err);
571 FLOW_LOG_TRACE(
"Handler completed.");
579 async_wait(&m_ev_wait_hndl_chan,
581 boost::make_shared<Task>([
this]() { on_ev_channel_open(); }));
583 FLOW_LOG_INFO(
"Session_adapter [" << m_async_io <<
"]: Start-ops requested. Done.");
587template<
typename Session>
591 using flow::util::Lock_guard;
597 Lock_guard<
decltype(m_target_channel_open_q_mutex)> lock(m_target_channel_open_q_mutex);
599 FLOW_LOG_INFO(
"Session_adapter [" << m_async_io <<
"]: Async-IO core passively-opened channel event: "
600 "informed via IPC-pipe; invoking handler. Including this one "
601 "[" << m_target_channel_open_q.size() <<
"] are pending.");
603 assert((!m_target_channel_open_q.empty())
604 &&
"Algorithm bug? Result-queue elements and pipe signal bytes must be 1-1, so either something "
605 "failed to correctly push, or something overzealously popped.");
607 result = std::move(m_target_channel_open_q.front());
608 m_target_channel_open_q.pop();
611 m_on_channel_func_or_empty(std::move(result->m_channel), std::move(result->m_mdt_reader_ptr));
612 FLOW_LOG_TRACE(
"Handler completed. Beginning next async-wait immediately. If more is/are pending "
613 "it/they'll be popped quickly due to immediately-completing async-wait(s).");
616 async_wait(&m_ev_wait_hndl_chan,
618 boost::make_shared<Task>([
this]() { on_ev_channel_open(); }));
621template<
typename Session>
622template<
typename Create_ev_wait_hndl_func>
627 if (!m_ev_wait_func.empty())
629 FLOW_LOG_WARNING(
"Session_adapter [" << m_async_io <<
"]: Cannot replace event-wait handles after "
630 "a start-*-ops procedure has been executed. Ignoring.");
635 FLOW_LOG_INFO(
"Session_adapter [" << m_async_io <<
"]: Replacing event-wait handles (probably to replace underlying "
636 "execution context without outside event loop's boost.asio Task_engine or similar).");
638 assert(m_ev_wait_hndl_err.is_open());
639 assert(m_ev_wait_hndl_chan.is_open());
642 m_ev_wait_hndl_err = create_ev_wait_hndl_func();
643 m_ev_wait_hndl_err.assign(saved);
645 saved = m_ev_wait_hndl_chan.release();
646 m_ev_wait_hndl_chan = create_ev_wait_hndl_func();
647 m_ev_wait_hndl_chan.assign(saved);
652template<
typename Session>
655 using flow::async::Task_asio_err;
661 FLOW_LOG_INFO(
"Session_adapter [" << m_async_io <<
"]: Async-IO core reports on-error event: tickling IPC-pipe to "
664 assert((!m_target_err_code_err)
665 &&
"Error handler must never fire more than once per Session! Bug in the particular Session_obj type?");
666 m_target_err_code_err = err_code;
674template<
typename Session>
678 using flow::util::Lock_guard;
686 Lock_guard<
decltype(m_target_channel_open_q_mutex)> lock(m_target_channel_open_q_mutex);
688 FLOW_LOG_INFO(
"Session_adapter [" << m_async_io <<
"]: Async-IO core reports passively-opened channel event: "
689 "tickling IPC-pipe to inform user. This will make the # of pending such events "
690 "[" << (m_target_channel_open_q.size() + 1) <<
"].");
691 m_target_channel_open_q.emplace(boost::movelib::make_unique<Channel_open_result>());
692 auto& result = *(m_target_channel_open_q.back());
693 result.m_channel = std::move(new_channel);
694 result.m_mdt_reader_ptr = std::move(new_channel_mdt);
707template<
typename Session>
708template<
typename... Args>
711 m_ev_wait_func(std::forward<Args>(args)...);
714template<
typename Session>
720template<
typename Session>
726template<
typename Session>
729 return core()->get_logger();
732template<
typename Session>
735 return core()->get_log_component();
A documentation-only concept defining the local side of an IPC conversation (session) with another en...
unspecified Channel_obj
Each successful open_channel() and on-passive-open handler firing shall yield a concrete transport::C...
shared_ptr< typename transport::struc::schema::Metadata< Mdt_payload_obj >::Reader > Mdt_reader_ptr
Ref-counted handle to a capnp-generated Reader (and the payload it accesses) through which the user s...
typename Base::Session_obj Session_obj
Short-hand, for generic programming et al, for template parameter Session.
Internal-use workhorse containing common elements of Client_session_adapter and Server_session_adapte...
flow::async::Task_asio_err m_on_err_func
on_err_func from init_handlers(); .empty() until then.
const flow::log::Component & get_log_component() const
See flow::log::Log_context.
util::Pipe_writer m_ready_writer_err
Write-end of IPC-pipe together with m_ready_reader_err.
util::sync_io::Asio_waitable_native_handle m_ev_wait_hndl_err
Descriptor waitable by outside event loop async-waits – storing the same Native_handle as (and thus b...
typename Session_obj::Channel_obj Channel_obj
Short-hand for session-openable Channel type.
typename Session_obj::Mdt_reader_ptr Mdt_reader_ptr
Short-hand for session-open metadata reader.
void init_pipe(util::Pipe_reader *reader, util::Pipe_writer *writer, util::sync_io::Asio_waitable_native_handle *ev_wait_hndl)
Utility that sets up an IPC-pipe in the given peer objects as well as loading a watcher-descriptor ob...
void on_ev_channel_open()
Signaled by the function returned by on_channel_func_sio(), it returns the IPC-pipe to steady-state (...
On_channel_func on_channel_func_sio()
Returns the proper on-passive-channel-open handler to set up on the underlying Session_obj (Client_se...
flow::async::Task_asio_err on_err_func_sio()
Returns the proper on-error handler to set up on the underlying Session_obj (Client_session: via ctor...
bool replace_event_wait_handles(const Create_ev_wait_hndl_func &create_ev_wait_hndl_func)
See, e.g., Client_session_adapter.
Error_code m_target_err_code_err
Result given to (or about to be given to) m_on_err_func.
void async_wait(Args &&... args)
Forwards to the util::sync_io::Event_wait_func saved in start_ops().
util::Pipe_reader m_ready_reader_err
Read-end of IPC-pipe used by *this used to detect that the error-wait has completed.
flow::util::Mutex_non_recursive m_target_channel_open_q_mutex
Protects m_target_channel_open_q, accessed from user async-wait-reporter thread; and Session_obj work...
Session_adapter()
Forwards to the Session_obj default ctor.
util::sync_io::Asio_waitable_native_handle m_ev_wait_hndl_chan
Descriptor waitable by outside event loop async-waits – storing the same Native_handle as (and thus b...
Session_obj * core()
The adapted mutable Session_obj.
util::Pipe_reader m_ready_reader_chan
Read-end of IPC-pipe used by *this used to detect that a channel-open-wait has completed.
Function< void(Channel_obj &&, Mdt_reader_ptr &&)> On_channel_func
Short-hand for passive-channel-open handler.
flow::log::Logger * get_logger() const
See flow::log::Log_context.
Session Session_obj
See, e.g., Client_session_adapter.
bool start_ops(Event_wait_func_t &&ev_wait_func)
See, e.g., Client_session_adapter.
Async_io_obj m_async_io
This guy does all the work. In our dtor this will be destroyed (hence thread stopped) first-thing.
util::sync_io::Event_wait_func m_ev_wait_func
Function (set forever in start_ops()) through which we invoke the outside event loop's async-wait fac...
Channel_open_result_q m_target_channel_open_q
Queue of On_channel_func handler arg sets received from async-I/O Session_obj – meaning the Session,...
std::queue< typename Channel_open_result::Ptr > Channel_open_result_q
Queue of Channel_open_result.
bool init_handlers(Task_err &&on_err_func_arg, On_passive_open_channel_handler &&on_passive_open_channel_func_arg)
Compilable only when Session_obj is a Server_session variant, forwards to its method of identical for...
util::Pipe_writer m_ready_writer_chan
Write-end of IPC-pipe together with m_ready_reader_chan.
flow::util::Task_engine m_nb_task_engine
The Task_engine for m_ready_*.
On_channel_func m_on_channel_func_or_empty
on_passive_open_channel_func_or_empty from init_handlers() (possibly .empty() if not supplied); until...
flow::util::Task_engine m_ev_hndl_task_engine_unused
The Task_engine for m_ev_wait_hndl_*, unless it is replaced via replace_event_wait_handles().
Dummy type for use as a template param to Channel when either the blobs pipe or handles pipe is disab...
Useful if using the sync_io pattern within a user event loop built on boost.asio (optionally with flo...
void assign(Native_handle hndl)
Loads value to be returned by native_handle().
sync_io-pattern counterparts to async-I/O-pattern object types in parent namespace ipc::session.
util::Native_handle Native_handle
Convenience alias for the commonly used type util::Native_handle.
Function< void(Asio_waitable_native_handle *hndl_of_interest, bool ev_of_interest_snd_else_rcv, Task_ptr &&on_active_ev_func)> Event_wait_func
In sync_io pattern, concrete type storing user-supplied function invoked by pattern-implementing ipc:...
flow::async::Task Task
Short-hand for polymorphic function (a-la std::function<>) that takes no arguments and returns nothin...
void pipe_produce(flow::log::Logger *logger_ptr, Pipe_writer *pipe)
Writes a byte to the given pipe writer.
void pipe_consume(flow::log::Logger *logger_ptr, Pipe_reader *pipe)
Reads a byte via the given pipe reader.
boost::asio::writable_pipe Pipe_writer
Short-hand for anonymous pipe write end.
boost::asio::readable_pipe Pipe_reader
Short-hand for anonymous pipe read end.
flow::Error_code Error_code
Short-hand for flow::Error_code which is very common.
flow::Function< Signature > Function
Short-hand for polymorphic functor holder which is very common. This is essentially std::function.
An App that is used as a client in at least one client-server IPC split.
An App that is used as a server in at least one client-server IPC split.
Set of result arg values from a successful passive-channel-open from a Session_obj invoking On_channe...
Channel_obj m_channel
Result 1/2 given about to be given to m_on_channel_func_or_empty.
Mdt_reader_ptr m_mdt_reader_ptr
Result 2/2 given about to be given to m_on_channel_func_or_empty.
boost::movelib::unique_ptr< Channel_open_result > Ptr
Short-hand for pointer wrapper around a *this.
A monolayer-thin wrapper around a native handle, a/k/a descriptor a/k/a FD.