23#include "ipc/session/schema/detail/session_master_channel.capnp.h" 
   29#include <boost/thread/future.hpp> 
   66template<schema::MqType S_MQ_TYPE_OR_NONE, 
bool S_TRANSMIT_NATIVE_HANDLES, 
typename Mdt_payload>
 
   68  private boost::noncopyable
 
   74  static constexpr bool S_MQS_ENABLED = S_MQ_TYPE_OR_NONE != schema::MqType::NONE;
 
   81  static_assert(S_MQ_TYPE_OR_NONE != schema::MqType::END_SENTINEL,
 
   82                "Do not use the value END_SENTINEL for S_MQ_TYPE_OR_NONE; it is only a sentinel.  Did you mean NONE?");
 
   85  static_assert(std::is_enum_v<schema::MqType>,
 
   86                "Sanity-checking capnp-generated MqType enum (must be an enum).");
 
   87  static_assert(std::is_unsigned_v<std::underlying_type_t<schema::MqType>>,
 
   88                "Sanity-checking capnp-generated MqType enum (backing type must be unsigned).");
 
   89  static_assert((int(schema::MqType::END_SENTINEL) - 1) == 2,
 
   90                "Code apparently must be updated -- expected exactly 2 MqType enum values plus NONE + sentinel.");
 
   91  static_assert((int(schema::MqType::POSIX) != int(schema::MqType::BIPC))
 
   92                 && (
int(schema::MqType::POSIX) > int(schema::MqType::NONE))
 
   93                 && (
int(schema::MqType::BIPC) > int(schema::MqType::NONE))
 
   94                 && (
int(schema::MqType::POSIX) < int(schema::MqType::END_SENTINEL))
 
   95                 && (
int(schema::MqType::BIPC) < int(schema::MqType::END_SENTINEL)),
 
   96                "Code apparently must be updated -- " 
   97                  "expected exactly 2 particular MqType enum values plus NONE + sentinel.");
 
  106                         std::conditional_t<S_MQ_TYPE_OR_NONE == schema::MqType::POSIX,
 
  113                                           <S_TRANSMIT_NATIVE_HANDLES,
 
  117                                           <S_TRANSMIT_NATIVE_HANDLES,
 
  354                                         schema::detail::SessionMasterChannelMessageBody
 
  576    public flow::log::Log_context,
 
  577    private boost::noncopyable
 
  607                               flow::async::Single_thread_task_loop* async_worker,
 
  667                        flow::async::Task_asio_err&& on_err_func,
 
  810#define TEMPLATE_SESSION_BASE \ 
  811  template<schema::MqType S_MQ_TYPE_OR_NONE, bool S_TRANSMIT_NATIVE_HANDLES, typename Mdt_payload> 
  813#define CLASS_SESSION_BASE \ 
  814  Session_base<S_MQ_TYPE_OR_NONE, S_TRANSMIT_NATIVE_HANDLES, Mdt_payload> 
  820                                 flow::async::Task_asio_err&& on_err_func,
 
  828  m_srv_app_ref(srv_app_ref), 
 
  829  m_cli_app_ptr(&cli_app_ref), 
 
  830  m_on_err_func(std::move(on_err_func)),
 
  831  m_on_passive_open_channel_func_or_empty(std::move(on_passive_open_channel_func_or_empty_arg))
 
  838CLASS_SESSION_BASE::Session_base(
const Server_app& srv_app_ref) :
 
  847  m_srv_app_ref(srv_app_ref), 
 
  850  m_srv_namespace(
Shared_name::ct(std::to_string(util::Process_credentials::own_process_id())))
 
  857void CLASS_SESSION_BASE::set_cli_app_ptr(
const Client_app* cli_app_ptr_new)
 
  859  assert(cli_app_ptr_new && 
"As of this writing cli_app_ptr should be set at most once, from null to non-null.");
 
  862  const auto prev = m_cli_app_ptr.exchange(cli_app_ptr_new);
 
  863  assert((!prev) && 
"As of this writing cli_app_ptr should be set at most once, from null to non-null.");
 
  865  m_cli_app_ptr = cli_app_ptr_new;
 
  870void CLASS_SESSION_BASE::set_srv_namespace(
Shared_name&& srv_namespace_new)
 
  872  assert(!srv_namespace_new.empty());
 
  874  assert(m_srv_namespace.empty() && 
"As of this writing srv_namespace should be set at most once, from empty.");
 
  876  m_srv_namespace = std::move(srv_namespace_new);
 
  880void CLASS_SESSION_BASE::set_cli_namespace(
Shared_name&& cli_namespace_new)
 
  882  assert(!cli_namespace_new.empty());
 
  884  assert(m_cli_namespace.empty() && 
"As of this writing cli_namespace should be set at most once, from empty.");
 
  886  m_cli_namespace = std::move(cli_namespace_new);
 
  892  assert(m_on_passive_open_channel_func_or_empty.empty());
 
  893  m_on_passive_open_channel_func_or_empty = std::move(on_passive_open_channel_func);
 
  897void CLASS_SESSION_BASE::set_on_err_func(flow::async::Task_asio_err&& on_err_func_arg)
 
  899  assert(!on_err_func_arg.empty());
 
  900  assert(m_on_err_func.empty() && 
"Call set_on_err_func() once at most and only if not already set through ctor.");
 
  902  m_on_err_func = std::move(on_err_func_arg);
 
  908  return m_cli_app_ptr.load(); 
 
  914  return m_srv_namespace;
 
  920  return m_cli_namespace;
 
  924const typename CLASS_SESSION_BASE::On_passive_open_channel_func&
 
  925  CLASS_SESSION_BASE::on_passive_open_channel_func_or_empty()
 const 
  927  return m_on_passive_open_channel_func_or_empty;
 
  931bool CLASS_SESSION_BASE::on_err_func_set()
 const 
  933  return !m_on_err_func.empty();
 
  939  assert((!hosed()) && 
"By contract do not call unless hosed() is false.");
 
  941  m_peer_state_err_code_or_ok = err_code;
 
  942  m_on_err_func(err_code);
 
  946bool CLASS_SESSION_BASE::hosed()
 const 
  948  assert(on_err_func_set() && 
"By contract do not call until PEER state -- when on-error handler must be known.");
 
  949  return bool(m_peer_state_err_code_or_ok);
 
  953Shared_name CLASS_SESSION_BASE::cur_ns_store_mutex_absolute_name()
 const 
  955  using std::to_string;
 
  965  mutex_name /= 
"cur_ns_store";
 
  976  mutex_name += to_string(m_srv_app_ref.m_user_id);
 
  978  mutex_name += to_string(m_srv_app_ref.m_group_id);
 
  980  mutex_name += to_string(
int(m_srv_app_ref.m_permissions_level_for_client_apps));
 
  986fs::path CLASS_SESSION_BASE::cur_ns_store_absolute_path()
 const 
  988  using Path = fs::path;
 
  990  Path path(m_srv_app_ref.m_kernel_persistent_run_dir_override.empty()
 
  992              : m_srv_app_ref.m_kernel_persistent_run_dir_override);
 
  993  path /= m_srv_app_ref.m_name;
 
  994  path += 
".libipc-cns.pid";
 
  999Shared_name CLASS_SESSION_BASE::session_master_socket_stream_acceptor_absolute_name()
 const 
 1001  assert((!m_srv_namespace.empty()) && 
"Serv-namespace must have been set by set_srv_namespace() by now.");
 
 1019typename CLASS_SESSION_BASE::Structured_msg_builder_config
 
 1020  CLASS_SESSION_BASE::heap_fixed_builder_config(flow::log::Logger* logger_ptr) 
 
 1051  if constexpr(S_MQS_ENABLED && (!S_SOCKET_STREAM_ENABLED))
 
 1053    sz = S_MQS_MAX_MSG_SZ;
 
 1055  else if constexpr((!S_MQS_ENABLED) && S_SOCKET_STREAM_ENABLED)
 
 1057    sz = Native_socket_stream::S_MAX_META_BLOB_LENGTH;
 
 1061    static_assert(S_MQS_ENABLED && S_SOCKET_STREAM_ENABLED, 
"There must be *some* transport mechanism.");
 
 1063    sz = std::min(S_MQS_MAX_MSG_SZ, Native_socket_stream::S_MAX_META_BLOB_LENGTH);
 
 1066  return Heap_fixed_builder::Config{ logger_ptr, sz, 0, 0 };
 
 1070CLASS_SESSION_BASE::Graceful_finisher::Graceful_finisher(flow::log::Logger* logger_ptr, 
Session_base* this_session,
 
 1071                                                         flow::async::Single_thread_task_loop* async_worker,
 
 1073  flow::log::Log_context(logger_ptr, 
Log_component::S_SESSION),
 
 1074  m_this_session(this_session),
 
 1075  m_async_worker(async_worker),
 
 1076  m_master_channel(master_channel)
 
 1089      FLOW_LOG_INFO(
"Received GracefulSessionEnd from opposing Session object along session master channel " 
 1091                    "error; and mark it down.  If our Session dtor is running, it shall return soon.  If it has " 
 1092                    "not yet run, it shall return immediately upon starting to execute.");
 
 1110void CLASS_SESSION_BASE::Graceful_finisher::on_master_channel_hosed()
 
 1112  using boost::promise_already_satisfied;
 
 1117    m_opposing_session_done.set_value();
 
 1119  catch (
const promise_already_satisfied&)
 
 1126void CLASS_SESSION_BASE::Graceful_finisher::on_dtor_start()
 
 1128  using flow::async::Synchronicity;
 
 1134  m_async_worker->post([&]()
 
 1136    FLOW_LOG_INFO(
"In Session object dtor sending GracefulSessionEnd to opposing Session object along " 
 1137                  "session master channel [" << *m_master_channel << 
"] -- if at all possible.");
 
 1139    auto msg = m_master_channel->create_msg();
 
 1140    msg.body_root()->initGracefulSessionEnd();
 
 1143    m_master_channel->send(msg, 
nullptr, &err_code_ignored);
 
 1147  }, Synchronicity::S_ASYNC_AND_AWAIT_CONCURRENT_COMPLETION); 
 
 1151  FLOW_LOG_INFO(
"In Session object dtor we not await sign that the opposing Session object dtor has also started; " 
 1152                "only then will we proceed with destroying our Session object: e.g., maybe they hold handles " 
 1153                "to objects in a SHM-arena we would deinitialize.  If their dtor has already been called, we will " 
 1154                "proceed immediately.  If not, we will now wait for that.  This would only block if the opposing " 
 1155                "side's user code neglects to destroy Session object on error; or has some kind of blocking " 
 1156                "operation in progress before destroying Session object.  " 
 1157                "Session master channel: [" << *m_master_channel << 
"].");
 
 1158  m_opposing_session_done.get_future().wait();
 
 1159  FLOW_LOG_INFO(
"In Session object dtor: Done awaiting sign that the opposing Session object dtor has also started.  " 
 1160                "Will now proceed with our Session object destruction.");
 
 1164typename CLASS_SESSION_BASE::Structured_msg_reader_config
 
 1165  CLASS_SESSION_BASE::heap_reader_config(flow::log::Logger* logger_ptr) 
 
 1170#undef CLASS_SESSION_BASE 
 1171#undef TEMPLATE_SESSION_BASE 
Optional to use by subclasses, this operates a simple state machine that carries out a graceful-sessi...
Graceful_finisher(flow::log::Logger *logger_ptr, Session_base *this_session, flow::async::Single_thread_task_loop *async_worker, Master_structured_channel *master_channel)
You must invoke this ctor (instantiate us) if and only if synchronized dtor execution is indeed requi...
flow::async::Single_thread_task_loop * m_async_worker
The containing Session thread W loop. It shall exist until *this is gone.
Master_structured_channel * m_master_channel
The containing Session master channel. It shall exist until *this is gone.
boost::promise< void > m_opposing_session_done
A promise whose fulfillment is a necessary and sufficient condition for on_dtor_start() returning (le...
Session_base *const m_this_session
The containing Session_base. It shall exist until *this is gone.
void on_dtor_start()
The reason Graceful_finisher exists, this method must be called at the start of *_session_impl dtor; ...
void on_master_channel_hosed()
Must be invoked if the *_session_impl detects that the master channel has emitted channel-hosing erro...
Internal type containing data and types common to internal types Server_session_impl and Client_sessi...
static Structured_msg_builder_config heap_fixed_builder_config(flow::log::Logger *logger_ptr)
See Session_mv::heap_fixed_builder_config() (1-arg).
Shared_name cur_ns_store_mutex_absolute_name() const
Computes the name of the interprocess named-mutex used to control reading/writing to the file storing...
Error_code m_peer_state_err_code_or_ok
Starts falsy; becomes forever truthy (with a specific Error_code that will not change thereafter) onc...
static constexpr bool S_SOCKET_STREAM_ENABLED
See Session_mv.
const On_passive_open_channel_func & on_passive_open_channel_func_or_empty() const
The on-passive-open handler (may be empty even in final state, meaning user wants passive-opens disab...
static constexpr bool S_MQS_ENABLED
See Session_mv.
void set_srv_namespace(Shared_name &&srv_namespace_new)
Sets srv_namespace() (do not call if already set).
Shared_name m_cli_namespace
See cli_namespace().
Mdt_payload Mdt_payload_obj
See Session_mv (or Session concept).
bool hosed() const
Returns true if and only if hose() has been called.
void set_on_passive_open_channel_func(On_passive_open_channel_func &&on_passive_open_channel_func)
Sets on_passive_open_channel_func_or_empty() (do not call if already set; do not call if user intends...
Function< void(Channel_obj &&new_channel, Mdt_reader_ptr &&new_channel_mdt)> On_passive_open_channel_func
Concrete function type for the on-passive-open handler (if any), used for storage.
Shared_name session_master_socket_stream_acceptor_absolute_name() const
Computes the absolute name at which the server shall set up a transport::Native_socket_stream_accepto...
const Client_app * cli_app_ptr() const
See Server_session_impl, Client_session_impl.
static Structured_msg_reader_config heap_reader_config(flow::log::Logger *logger_ptr)
See Session_mv::heap_reader_config() (1-arg).
boost::shared_ptr< Mdt_builder > Mdt_builder_ptr
See Session_mv (or Session concept).
const Shared_name & srv_namespace() const
See Server_session_impl, Client_session_impl.
void set_cli_app_ptr(const Client_app *cli_app_ptr_new)
Sets cli_app_ptr() (do not call if already set).
Shared_name m_srv_namespace
See srv_namespace().
std::vector< Channel_obj > Channels
See Session_mv. Note: If changed from vector please update those doc headers too.
std::conditional_t< S_MQS_ENABLED, std::conditional_t< S_TRANSMIT_NATIVE_HANDLES, transport::Mqs_socket_stream_channel< true, Persistent_mq_handle_from_cfg >, transport::Mqs_channel< true, Persistent_mq_handle_from_cfg > >, std::conditional_t< S_TRANSMIT_NATIVE_HANDLES, transport::Socket_stream_channel< true >, transport::Socket_stream_channel_of_blobs< true > > > Channel_obj
See Session_mv (or Session concept).
flow::async::Task_asio_err m_on_err_func
See set_on_err_func().
typename transport::struc::schema::Metadata< Mdt_payload_obj >::Builder Mdt_builder
See Session_mv (or Session concept).
void hose(const Error_code &err_code)
Marks this session as hosed for (truthy) reason err_code; and synchronously invokes on-error handler;...
std::conditional_t<!S_MQS_ENABLED, transport::Null_peer, std::conditional_t< S_MQ_TYPE_OR_NONE==schema::MqType::POSIX, transport::Posix_mq_handle, transport::Bipc_mq_handle > > Persistent_mq_handle_from_cfg
Relevant only if S_MQS_ENABLED, this is the Persistent_mq_handle-concept impl type specified by the u...
void set_cli_namespace(Shared_name &&cli_namespace_new)
Sets cli_namespace() (do not call if already set).
std::atomic< const Client_app * > m_cli_app_ptr
See cli_app_ptr().
Session_base(const Client_app &cli_app_ref, const Server_app &srv_app_ref, flow::async::Task_asio_err &&on_err_func, On_passive_open_channel_func &&on_passive_open_channel_func_or_empty_arg)
Constructs: Client_session_impl form (the user is the one constructing the object,...
const Shared_name & cli_namespace() const
See Server_session_impl, Client_session_impl.
bool on_err_func_set() const
Returns true if and only if set_on_err_func() has been called.
boost::shared_ptr< typename transport::struc::schema::Metadata< Mdt_payload_obj >::Reader > Mdt_reader_ptr
See Session_mv (or Session concept).
On_passive_open_channel_func m_on_passive_open_channel_func_or_empty
See on_passive_open_channel_func_or_empty().
boost::weak_ptr< Master_structured_channel > Master_structured_channel_observer
Observer of Master_structured_channel_ptr. See its doc header.
static constexpr util::Fine_duration S_OPEN_CHANNEL_TIMEOUT
Internal timeout for open_channel().
void set_on_err_func(flow::async::Task_asio_err &&on_err_func_arg)
Sets on_err_func() (do not call if already set).
fs::path cur_ns_store_absolute_path() const
Computes the absolute path to file storing (written by server, read by client) the value for srv_name...
static constexpr size_t S_MQS_MAX_MSG_SZ
The max sendable MQ message size as decided by Server_session_impl::make_channel_mqs() (and imposed o...
boost::shared_ptr< Master_structured_channel > Master_structured_channel_ptr
Handle to Master_structured_channel.
const Server_app & m_srv_app_ref
Reference to Server_app (referring to local process in Server_session_impl, opposing process in Clien...
Implements the Persistent_mq_handle concept by thinly wrapping bipc::message_queue,...
A Channel with at least a blobs pipe consisting of two MQs of type Persistent_mq_handle (template arg...
A Channel with a blobs pipe consisting of 2 MQs of type Persistent_mq_handle (template arg); and a ha...
static const Shared_name & S_RESOURCE_TYPE_ID
Shared_name relative-folder fragment (no separators) identifying this resource type.
Dummy type for use as a template param to Channel when either the blobs pipe or handles pipe is disab...
Implements the Persistent_mq_handle concept by wrapping the POSIX message queue API (see man mq_overv...
A Channel with a blobs pipe only (no handles pipe) that uses a Unix domain socket connection as the u...
A Channel with a handles pipe only (no blobs pipe) that uses a Unix domain socket connection as the u...
Owning and wrapping a pre-connected transport::Channel peer (an endpoint of an established channel ov...
bool expect_msg(Msg_which_in which, On_msg_handler &&on_msg_func)
Registers the expectation of up to 1 notification in-message whose Msg_which equals which.
Implements Struct_builder concept by straightforwardly allocating fixed-size segments on-demand in th...
Implements both sync_io::Native_handle_sender and sync_io::Native_handle_receiver concepts by using a...
String-wrapping abstraction representing a name uniquely distinguishing a kernel-persistent entity fr...
static const Shared_name S_SENTINEL
A Shared_name fragment, with no S_SEPARATOR characters inside, that represents a path component that ...
static Shared_name ct(const Source &src)
Copy-constructs from a char-sequence container (including string, util::String_view,...
static const Shared_name S_RESOURCE_TYPE_ID_MUTEX
Relative-folder fragment (no separators) identifying the resource type for: boost....
@ S_SESSION_FINISHED
The opposing end of the session in question has been closed gracefully by the user invoking the end-s...
Flow-IPC module providing the broad lifecycle and shared-resource organization – via the session conc...
Shared_name build_conventional_shared_name(const Shared_name &resource_type, const Shared_name &srv_app_name, const Shared_name &srv_namespace, const Shared_name &cli_app_name, const Shared_name &cli_namespace_or_sentinel)
Builds an absolute name according to the path convention explained in Shared_name class doc header; t...
Builder< ipc::shm::classic::Pool_arena > Builder
Convenience alias: transport::struc::shm::Builder that works with boost.ipc.shm pools from ipc::shm::...
Reader< ipc::shm::classic::Pool_arena > Reader
Convenience alias: transport::struc::shm::Reader that works with boost.ipc.shm pools from ipc::shm::c...
const fs::path IPC_KERNEL_PERSISTENT_RUN_DIR
Absolute path to the directory (without trailing separator) in the file system where kernel-persisten...
flow::Fine_duration Fine_duration
Short-hand for Flow's Fine_duration.
Log_component
The flow::log::Component payload enumeration containing various log components used by Flow-IPC inter...
flow::Error_code Error_code
Short-hand for flow::Error_code which is very common.
flow::Function< Signature > Function
Short-hand for polymorphic functor holder which is very common. This is essentially std::function.
#define TEMPLATE_SESSION_BASE
Internally used macro; public API users should disregard (same deal as in struc/channel....
An App that is used as a client in at least one client-server IPC split.
An App that is used as a server in at least one client-server IPC split.
Implements Struct_builder::Config sub-concept.
Implements Struct_reader::Config sub-concept.