23#include "ipc/session/schema/detail/session_master_channel.capnp.h"
29#include <boost/thread/future.hpp>
66template<schema::MqType S_MQ_TYPE_OR_NONE,
bool S_TRANSMIT_NATIVE_HANDLES,
typename Mdt_payload>
68 private boost::noncopyable
74 static constexpr bool S_MQS_ENABLED = S_MQ_TYPE_OR_NONE != schema::MqType::NONE;
81 static_assert(S_MQ_TYPE_OR_NONE != schema::MqType::END_SENTINEL,
82 "Do not use the value END_SENTINEL for S_MQ_TYPE_OR_NONE; it is only a sentinel. Did you mean NONE?");
85 static_assert(std::is_enum_v<schema::MqType>,
86 "Sanity-checking capnp-generated MqType enum (must be an enum).");
87 static_assert(std::is_unsigned_v<std::underlying_type_t<schema::MqType>>,
88 "Sanity-checking capnp-generated MqType enum (backing type must be unsigned).");
89 static_assert((int(schema::MqType::END_SENTINEL) - 1) == 2,
90 "Code apparently must be updated -- expected exactly 2 MqType enum values plus NONE + sentinel.");
91 static_assert((int(schema::MqType::POSIX) != int(schema::MqType::BIPC))
92 && (
int(schema::MqType::POSIX) > int(schema::MqType::NONE))
93 && (
int(schema::MqType::BIPC) > int(schema::MqType::NONE))
94 && (
int(schema::MqType::POSIX) < int(schema::MqType::END_SENTINEL))
95 && (
int(schema::MqType::BIPC) < int(schema::MqType::END_SENTINEL)),
96 "Code apparently must be updated -- "
97 "expected exactly 2 particular MqType enum values plus NONE + sentinel.");
106 std::conditional_t<S_MQ_TYPE_OR_NONE == schema::MqType::POSIX,
113 <S_TRANSMIT_NATIVE_HANDLES,
117 <S_TRANSMIT_NATIVE_HANDLES,
354 schema::detail::SessionMasterChannelMessageBody
576 public flow::log::Log_context,
577 private boost::noncopyable
607 flow::async::Single_thread_task_loop* async_worker,
667 flow::async::Task_asio_err&& on_err_func,
810#define TEMPLATE_SESSION_BASE \
811 template<schema::MqType S_MQ_TYPE_OR_NONE, bool S_TRANSMIT_NATIVE_HANDLES, typename Mdt_payload>
813#define CLASS_SESSION_BASE \
814 Session_base<S_MQ_TYPE_OR_NONE, S_TRANSMIT_NATIVE_HANDLES, Mdt_payload>
820 flow::async::Task_asio_err&& on_err_func,
828 m_srv_app_ref(srv_app_ref),
829 m_cli_app_ptr(&cli_app_ref),
830 m_on_err_func(std::move(on_err_func)),
831 m_on_passive_open_channel_func_or_empty(std::move(on_passive_open_channel_func_or_empty_arg))
838CLASS_SESSION_BASE::Session_base(
const Server_app& srv_app_ref) :
847 m_srv_app_ref(srv_app_ref),
850 m_srv_namespace(
Shared_name::ct(std::to_string(util::Process_credentials::own_process_id())))
857void CLASS_SESSION_BASE::set_cli_app_ptr(
const Client_app* cli_app_ptr_new)
859 assert(cli_app_ptr_new &&
"As of this writing cli_app_ptr should be set at most once, from null to non-null.");
862 const auto prev = m_cli_app_ptr.exchange(cli_app_ptr_new);
863 assert((!prev) &&
"As of this writing cli_app_ptr should be set at most once, from null to non-null.");
865 m_cli_app_ptr = cli_app_ptr_new;
870void CLASS_SESSION_BASE::set_srv_namespace(
Shared_name&& srv_namespace_new)
872 assert(!srv_namespace_new.empty());
874 assert(m_srv_namespace.empty() &&
"As of this writing srv_namespace should be set at most once, from empty.");
876 m_srv_namespace = std::move(srv_namespace_new);
880void CLASS_SESSION_BASE::set_cli_namespace(
Shared_name&& cli_namespace_new)
882 assert(!cli_namespace_new.empty());
884 assert(m_cli_namespace.empty() &&
"As of this writing cli_namespace should be set at most once, from empty.");
886 m_cli_namespace = std::move(cli_namespace_new);
892 assert(m_on_passive_open_channel_func_or_empty.empty());
893 m_on_passive_open_channel_func_or_empty = std::move(on_passive_open_channel_func);
897void CLASS_SESSION_BASE::set_on_err_func(flow::async::Task_asio_err&& on_err_func_arg)
899 assert(!on_err_func_arg.empty());
900 assert(m_on_err_func.empty() &&
"Call set_on_err_func() once at most and only if not already set through ctor.");
902 m_on_err_func = std::move(on_err_func_arg);
908 return m_cli_app_ptr.load();
914 return m_srv_namespace;
920 return m_cli_namespace;
924const typename CLASS_SESSION_BASE::On_passive_open_channel_func&
925 CLASS_SESSION_BASE::on_passive_open_channel_func_or_empty()
const
927 return m_on_passive_open_channel_func_or_empty;
931bool CLASS_SESSION_BASE::on_err_func_set()
const
933 return !m_on_err_func.empty();
939 assert((!hosed()) &&
"By contract do not call unless hosed() is false.");
941 m_peer_state_err_code_or_ok = err_code;
942 m_on_err_func(err_code);
946bool CLASS_SESSION_BASE::hosed()
const
948 assert(on_err_func_set() &&
"By contract do not call until PEER state -- when on-error handler must be known.");
949 return bool(m_peer_state_err_code_or_ok);
953Shared_name CLASS_SESSION_BASE::cur_ns_store_mutex_absolute_name()
const
955 using std::to_string;
965 mutex_name /=
"cur_ns_store";
976 mutex_name += to_string(m_srv_app_ref.m_user_id);
978 mutex_name += to_string(m_srv_app_ref.m_group_id);
980 mutex_name += to_string(
int(m_srv_app_ref.m_permissions_level_for_client_apps));
986fs::path CLASS_SESSION_BASE::cur_ns_store_absolute_path()
const
988 using Path = fs::path;
990 Path path(m_srv_app_ref.m_kernel_persistent_run_dir_override.empty()
992 : m_srv_app_ref.m_kernel_persistent_run_dir_override);
993 path /= m_srv_app_ref.m_name;
994 path +=
".libipc-cns.pid";
999Shared_name CLASS_SESSION_BASE::session_master_socket_stream_acceptor_absolute_name()
const
1001 assert((!m_srv_namespace.empty()) &&
"Serv-namespace must have been set by set_srv_namespace() by now.");
1019typename CLASS_SESSION_BASE::Structured_msg_builder_config
1020 CLASS_SESSION_BASE::heap_fixed_builder_config(flow::log::Logger* logger_ptr)
1051 if constexpr(S_MQS_ENABLED && (!S_SOCKET_STREAM_ENABLED))
1053 sz = S_MQS_MAX_MSG_SZ;
1055 else if constexpr((!S_MQS_ENABLED) && S_SOCKET_STREAM_ENABLED)
1057 sz = Native_socket_stream::S_MAX_META_BLOB_LENGTH;
1061 static_assert(S_MQS_ENABLED && S_SOCKET_STREAM_ENABLED,
"There must be *some* transport mechanism.");
1063 sz = std::min(S_MQS_MAX_MSG_SZ, Native_socket_stream::S_MAX_META_BLOB_LENGTH);
1066 return Heap_fixed_builder::Config{ logger_ptr, sz, 0, 0 };
1070CLASS_SESSION_BASE::Graceful_finisher::Graceful_finisher(flow::log::Logger* logger_ptr,
Session_base* this_session,
1071 flow::async::Single_thread_task_loop* async_worker,
1073 flow::log::Log_context(logger_ptr,
Log_component::S_SESSION),
1074 m_this_session(this_session),
1075 m_async_worker(async_worker),
1076 m_master_channel(master_channel)
1089 FLOW_LOG_INFO(
"Received GracefulSessionEnd from opposing Session object along session master channel "
1091 "error; and mark it down. If our Session dtor is running, it shall return soon. If it has "
1092 "not yet run, it shall return immediately upon starting to execute.");
1110void CLASS_SESSION_BASE::Graceful_finisher::on_master_channel_hosed()
1112 using boost::promise_already_satisfied;
1117 m_opposing_session_done.set_value();
1119 catch (
const promise_already_satisfied&)
1126void CLASS_SESSION_BASE::Graceful_finisher::on_dtor_start()
1128 using flow::async::Synchronicity;
1134 m_async_worker->post([&]()
1136 FLOW_LOG_INFO(
"In Session object dtor sending GracefulSessionEnd to opposing Session object along "
1137 "session master channel [" << *m_master_channel <<
"] -- if at all possible.");
1139 auto msg = m_master_channel->create_msg();
1140 msg.body_root()->initGracefulSessionEnd();
1143 m_master_channel->send(msg,
nullptr, &err_code_ignored);
1147 }, Synchronicity::S_ASYNC_AND_AWAIT_CONCURRENT_COMPLETION);
1151 FLOW_LOG_INFO(
"In Session object dtor we not await sign that the opposing Session object dtor has also started; "
1152 "only then will we proceed with destroying our Session object: e.g., maybe they hold handles "
1153 "to objects in a SHM-arena we would deinitialize. If their dtor has already been called, we will "
1154 "proceed immediately. If not, we will now wait for that. This would only block if the opposing "
1155 "side's user code neglects to destroy Session object on error; or has some kind of blocking "
1156 "operation in progress before destroying Session object. "
1157 "Session master channel: [" << *m_master_channel <<
"].");
1158 m_opposing_session_done.get_future().wait();
1159 FLOW_LOG_INFO(
"In Session object dtor: Done awaiting sign that the opposing Session object dtor has also started. "
1160 "Will now proceed with our Session object destruction.");
1164typename CLASS_SESSION_BASE::Structured_msg_reader_config
1165 CLASS_SESSION_BASE::heap_reader_config(flow::log::Logger* logger_ptr)
1170#undef CLASS_SESSION_BASE
1171#undef TEMPLATE_SESSION_BASE
Optional to use by subclasses, this operates a simple state machine that carries out a graceful-sessi...
Graceful_finisher(flow::log::Logger *logger_ptr, Session_base *this_session, flow::async::Single_thread_task_loop *async_worker, Master_structured_channel *master_channel)
You must invoke this ctor (instantiate us) if and only if synchronized dtor execution is indeed requi...
flow::async::Single_thread_task_loop * m_async_worker
The containing Session thread W loop. It shall exist until *this is gone.
Master_structured_channel * m_master_channel
The containing Session master channel. It shall exist until *this is gone.
boost::promise< void > m_opposing_session_done
A promise whose fulfillment is a necessary and sufficient condition for on_dtor_start() returning (le...
Session_base *const m_this_session
The containing Session_base. It shall exist until *this is gone.
void on_dtor_start()
The reason Graceful_finisher exists, this method must be called at the start of *_session_impl dtor; ...
void on_master_channel_hosed()
Must be invoked if the *_session_impl detects that the master channel has emitted channel-hosing erro...
Internal type containing data and types common to internal types Server_session_impl and Client_sessi...
static Structured_msg_builder_config heap_fixed_builder_config(flow::log::Logger *logger_ptr)
See Session_mv::heap_fixed_builder_config() (1-arg).
Shared_name cur_ns_store_mutex_absolute_name() const
Computes the name of the interprocess named-mutex used to control reading/writing to the file storing...
Error_code m_peer_state_err_code_or_ok
Starts falsy; becomes forever truthy (with a specific Error_code that will not change thereafter) onc...
static constexpr bool S_SOCKET_STREAM_ENABLED
See Session_mv.
const On_passive_open_channel_func & on_passive_open_channel_func_or_empty() const
The on-passive-open handler (may be empty even in final state, meaning user wants passive-opens disab...
static constexpr bool S_MQS_ENABLED
See Session_mv.
void set_srv_namespace(Shared_name &&srv_namespace_new)
Sets srv_namespace() (do not call if already set).
Shared_name m_cli_namespace
See cli_namespace().
Mdt_payload Mdt_payload_obj
See Session_mv (or Session concept).
bool hosed() const
Returns true if and only if hose() has been called.
void set_on_passive_open_channel_func(On_passive_open_channel_func &&on_passive_open_channel_func)
Sets on_passive_open_channel_func_or_empty() (do not call if already set; do not call if user intends...
Function< void(Channel_obj &&new_channel, Mdt_reader_ptr &&new_channel_mdt)> On_passive_open_channel_func
Concrete function type for the on-passive-open handler (if any), used for storage.
Shared_name session_master_socket_stream_acceptor_absolute_name() const
Computes the absolute name at which the server shall set up a transport::Native_socket_stream_accepto...
const Client_app * cli_app_ptr() const
See Server_session_impl, Client_session_impl.
static Structured_msg_reader_config heap_reader_config(flow::log::Logger *logger_ptr)
See Session_mv::heap_reader_config() (1-arg).
boost::shared_ptr< Mdt_builder > Mdt_builder_ptr
See Session_mv (or Session concept).
const Shared_name & srv_namespace() const
See Server_session_impl, Client_session_impl.
void set_cli_app_ptr(const Client_app *cli_app_ptr_new)
Sets cli_app_ptr() (do not call if already set).
Shared_name m_srv_namespace
See srv_namespace().
std::vector< Channel_obj > Channels
See Session_mv. Note: If changed from vector please update those doc headers too.
std::conditional_t< S_MQS_ENABLED, std::conditional_t< S_TRANSMIT_NATIVE_HANDLES, transport::Mqs_socket_stream_channel< true, Persistent_mq_handle_from_cfg >, transport::Mqs_channel< true, Persistent_mq_handle_from_cfg > >, std::conditional_t< S_TRANSMIT_NATIVE_HANDLES, transport::Socket_stream_channel< true >, transport::Socket_stream_channel_of_blobs< true > > > Channel_obj
See Session_mv (or Session concept).
flow::async::Task_asio_err m_on_err_func
See set_on_err_func().
typename transport::struc::schema::Metadata< Mdt_payload_obj >::Builder Mdt_builder
See Session_mv (or Session concept).
void hose(const Error_code &err_code)
Marks this session as hosed for (truthy) reason err_code; and synchronously invokes on-error handler;...
std::conditional_t<!S_MQS_ENABLED, transport::Null_peer, std::conditional_t< S_MQ_TYPE_OR_NONE==schema::MqType::POSIX, transport::Posix_mq_handle, transport::Bipc_mq_handle > > Persistent_mq_handle_from_cfg
Relevant only if S_MQS_ENABLED, this is the Persistent_mq_handle-concept impl type specified by the u...
void set_cli_namespace(Shared_name &&cli_namespace_new)
Sets cli_namespace() (do not call if already set).
std::atomic< const Client_app * > m_cli_app_ptr
See cli_app_ptr().
Session_base(const Client_app &cli_app_ref, const Server_app &srv_app_ref, flow::async::Task_asio_err &&on_err_func, On_passive_open_channel_func &&on_passive_open_channel_func_or_empty_arg)
Constructs: Client_session_impl form (the user is the one constructing the object,...
const Shared_name & cli_namespace() const
See Server_session_impl, Client_session_impl.
bool on_err_func_set() const
Returns true if and only if set_on_err_func() has been called.
boost::shared_ptr< typename transport::struc::schema::Metadata< Mdt_payload_obj >::Reader > Mdt_reader_ptr
See Session_mv (or Session concept).
On_passive_open_channel_func m_on_passive_open_channel_func_or_empty
See on_passive_open_channel_func_or_empty().
boost::weak_ptr< Master_structured_channel > Master_structured_channel_observer
Observer of Master_structured_channel_ptr. See its doc header.
static constexpr util::Fine_duration S_OPEN_CHANNEL_TIMEOUT
Internal timeout for open_channel().
void set_on_err_func(flow::async::Task_asio_err &&on_err_func_arg)
Sets on_err_func() (do not call if already set).
fs::path cur_ns_store_absolute_path() const
Computes the absolute path to file storing (written by server, read by client) the value for srv_name...
static constexpr size_t S_MQS_MAX_MSG_SZ
The max sendable MQ message size as decided by Server_session_impl::make_channel_mqs() (and imposed o...
boost::shared_ptr< Master_structured_channel > Master_structured_channel_ptr
Handle to Master_structured_channel.
const Server_app & m_srv_app_ref
Reference to Server_app (referring to local process in Server_session_impl, opposing process in Clien...
Implements the Persistent_mq_handle concept by thinly wrapping bipc::message_queue,...
A Channel with at least a blobs pipe consisting of two MQs of type Persistent_mq_handle (template arg...
A Channel with a blobs pipe consisting of 2 MQs of type Persistent_mq_handle (template arg); and a ha...
static const Shared_name & S_RESOURCE_TYPE_ID
Shared_name relative-folder fragment (no separators) identifying this resource type.
Dummy type for use as a template param to Channel when either the blobs pipe or handles pipe is disab...
Implements the Persistent_mq_handle concept by wrapping the POSIX message queue API (see man mq_overv...
A Channel with a blobs pipe only (no handles pipe) that uses a Unix domain socket connection as the u...
A Channel with a handles pipe only (no blobs pipe) that uses a Unix domain socket connection as the u...
Owning and wrapping a pre-connected transport::Channel peer (an endpoint of an established channel ov...
bool expect_msg(Msg_which_in which, On_msg_handler &&on_msg_func)
Registers the expectation of up to 1 notification in-message whose Msg_which equals which.
Implements Struct_builder concept by straightforwardly allocating fixed-size segments on-demand in th...
Implements both sync_io::Native_handle_sender and sync_io::Native_handle_receiver concepts by using a...
String-wrapping abstraction representing a name uniquely distinguishing a kernel-persistent entity fr...
static const Shared_name S_SENTINEL
A Shared_name fragment, with no S_SEPARATOR characters inside, that represents a path component that ...
static Shared_name ct(const Source &src)
Copy-constructs from a char-sequence container (including string, util::String_view,...
static const Shared_name S_RESOURCE_TYPE_ID_MUTEX
Relative-folder fragment (no separators) identifying the resource type for: boost....
@ S_SESSION_FINISHED
The opposing end of the session in question has been closed gracefully by the user invoking the end-s...
Flow-IPC module providing the broad lifecycle and shared-resource organization – via the session conc...
Shared_name build_conventional_shared_name(const Shared_name &resource_type, const Shared_name &srv_app_name, const Shared_name &srv_namespace, const Shared_name &cli_app_name, const Shared_name &cli_namespace_or_sentinel)
Builds an absolute name according to the path convention explained in Shared_name class doc header; t...
Builder< ipc::shm::classic::Pool_arena > Builder
Convenience alias: transport::struc::shm::Builder that works with boost.ipc.shm pools from ipc::shm::...
Reader< ipc::shm::classic::Pool_arena > Reader
Convenience alias: transport::struc::shm::Reader that works with boost.ipc.shm pools from ipc::shm::c...
const fs::path IPC_KERNEL_PERSISTENT_RUN_DIR
Absolute path to the directory (without trailing separator) in the file system where kernel-persisten...
flow::Fine_duration Fine_duration
Short-hand for Flow's Fine_duration.
Log_component
The flow::log::Component payload enumeration containing various log components used by Flow-IPC inter...
flow::Error_code Error_code
Short-hand for flow::Error_code which is very common.
flow::Function< Signature > Function
Short-hand for polymorphic functor holder which is very common. This is essentially std::function.
#define TEMPLATE_SESSION_BASE
Internally used macro; public API users should disregard (same deal as in struc/channel....
An App that is used as a client in at least one client-server IPC split.
An App that is used as a server in at least one client-server IPC split.
Implements Struct_builder::Config sub-concept.
Implements Struct_reader::Config sub-concept.