29#include <boost/interprocess/sync/named_mutex.hpp> 
   30#include <boost/interprocess/sync/scoped_lock.hpp> 
   31#include <boost/move/make_unique.hpp> 
  142template<
typename Session_server_t, 
typename Server_session_t>
 
  144  public flow::log::Log_context,
 
  145  private boost::noncopyable
 
  160  using Channels = 
typename Server_session_obj::Channels;
 
  196  template<
typename Per_app_setup_func>
 
  201                               Error_code* err_code, Per_app_setup_func&& per_app_setup_func);
 
  234  template<
typename Task_err,
 
  235           typename N_init_channels_by_srv_req_func, 
typename Mdt_load_func>
 
  240                    N_init_channels_by_srv_req_func&& n_init_channels_by_srv_req_func,
 
  241                    Mdt_load_func&& mdt_load_func,
 
  242                    Task_err&& on_done_func);
 
  309  template<
typename Task>
 
  347  using Mutex = flow::util::Mutex_non_recursive;
 
  429#define TEMPLATE_SESSION_SERVER_IMPL \ 
  430  template<typename Session_server_t, typename Server_session_t> 
  432#define CLASS_SESSION_SERVER_IMPL \ 
  433  Session_server_impl<Session_server_t, Server_session_t> 
  436template<
typename Per_app_setup_func>
 
  437CLASS_SESSION_SERVER_IMPL::Session_server_impl
 
  440   Per_app_setup_func&& per_app_setup_func) :
 
  442  flow::log::Log_context(logger_ptr, 
Log_component::S_SESSION),
 
  443  m_srv_app_ref(srv_app_ref_arg), 
 
  444  m_this_session_srv(this_session_srv_arg),
 
  445  m_cli_app_master_set_ref(cli_app_master_set_ref), 
 
  446  m_per_app_setup_func(std::move(per_app_setup_func)),
 
  447  m_state(std::in_place) 
 
  451  using flow::error::Runtime_error;
 
  452  using boost::movelib::make_unique;
 
  453  using boost::system::system_category;
 
  454  using boost::io::ios_all_saver;
 
  456  using Named_sh_mutex = boost::interprocess::named_mutex;
 
  457  using Named_sh_mutex_ptr = boost::movelib::unique_ptr<Named_sh_mutex>;
 
  458  using Sh_lock_guard = boost::interprocess::scoped_lock<Named_sh_mutex>;
 
  462  m_state->m_last_cli_namespace = 0;
 
  463  m_state->m_incomplete_session_graveyard = boost::movelib::make_unique<flow::async::Single_thread_task_loop>
 
  465                                               flow::util::ostream_op_string(
"srv_sess_acc_graveyard[", *
this, 
"]"));
 
  491  if constexpr(Server_session_dtl_obj::Session_base_obj::S_MQS_ENABLED)
 
  494    using Mq = 
typename Server_session_dtl_obj::Session_base_obj::Persistent_mq_handle_from_cfg;
 
  496    util::remove_each_persistent_with_name_prefix<Blob_stream_mq_base<Mq>>
 
  579  const auto own_creds = Process_credentials::own_process_credentials();
 
  582    FLOW_LOG_WARNING(
"Session acceptor [" << *
this << 
"]: Creation underway.  However, just before writing " 
  583                     "CNS (Current Namespace Store), a/k/a PID file, we determined that " 
  584                     "the `user` aspect of our effective credentials [" << own_creds << 
"] do not match " 
  585                     "the hard-configured value passed to this ctor: " 
  587                     "We cannot proceed, as this would violate the security/safety model of ipc::session.  " 
  593    const auto mutex_name = empty_session.
base().cur_ns_store_mutex_absolute_name();
 
  595    const auto cns_path = empty_session.
base().cur_ns_store_absolute_path();
 
  599      ios_all_saver saver(*(get_logger()->this_thread_ostream())); 
 
  600      FLOW_LOG_INFO(
"Session acceptor [" << *
this << 
"]: Created.  Writing CNS (Current Namespace Store), a/k/a PID " 
  601                    "file [" << cns_path << 
"] (perms " 
  602                    "[" << std::setfill(
'0')
 
  604                        << std::oct << cns_perms.get_permissions() << 
"], " 
  605                    "shared-mutex name [" << mutex_name << 
"], shared-mutex perms " 
  607                        << mutex_perms.get_permissions() << 
"]); " 
  608                    "then listening for incoming master socket stream " 
  609                    "connects (through Native_socket_stream_acceptor that was just cted) to address " 
  610                    "based partially on the namespace (PID) written to that file.");
 
  615    Named_sh_mutex_ptr sh_mutex;
 
  617                                          "Server_session_impl::ctor:named-mutex-open-or-create", [&]()
 
  619      sh_mutex = make_unique<Named_sh_mutex>(
util::OPEN_OR_CREATE, mutex_name.native_str(), mutex_perms);
 
  629      Sh_lock_guard sh_lock(*sh_mutex);
 
  638      const bool we_created_cns = !fs::exists(cns_path, dummy);
 
  639      ofstream cns_file(cns_path);
 
  657      if ((!our_err_code) && we_created_cns)
 
  668        cns_file << empty_session.
base().srv_namespace().str() << 
'\n';
 
  670        if (!cns_file.good())
 
  672          const auto sys_err_code = our_err_code = 
Error_code(errno, system_category());
 
  673          FLOW_LOG_WARNING(
"Session acceptor [" << *
this << 
"]: Could not open or write CNS (PID) file " 
  674                           "file [" << cns_path << 
"]; system error details follow.");
 
  675          FLOW_ERROR_SYS_ERROR_LOG_WARNING(); 
 
  690      *err_code = our_err_code;
 
  694    throw Runtime_error(our_err_code, FLOW_UTIL_WHERE_AM_I_STR());
 
  700  m_state->m_master_sock_acceptor
 
  701    = make_unique<transport::Native_socket_stream_acceptor>
 
  703         empty_session.
base().session_master_socket_stream_acceptor_absolute_name(),
 
  707  if (err_code && (!*err_code))
 
  709    m_state->m_incomplete_session_graveyard->start();
 
  715CLASS_SESSION_SERVER_IMPL::~Session_server_impl()
 
  717  FLOW_LOG_INFO(
"Session acceptor [" << *
this << 
"]: Shutting down.  The Native_socket_stream_acceptor will now " 
  718                "shut down, and all outstanding Native_socket_stream_acceptor handlers and Server_session " 
  719                "handlers shall fire with operation-aborted error codes.");
 
  733  if (!m_deinit_func_or_empty.empty())
 
  735    FLOW_LOG_INFO(
"Session acceptor [" << *
this << 
"]: Continuing shutdown.  A sub-class desires final de-init " 
  736                  "work, once everything else is stopped (might be persistent resource cleanup); invoking " 
  738    m_deinit_func_or_empty();
 
  739    FLOW_LOG_TRACE(
"De-init work finished.");
 
  744template<
typename Task_err,
 
  745         typename N_init_channels_by_srv_req_func, 
typename Mdt_load_func>
 
  750                                             N_init_channels_by_srv_req_func&& n_init_channels_by_srv_req_func,
 
  751                                             Mdt_load_func&& mdt_load_func,
 
  752                                             Task_err&& on_done_handler)
 
  755  using flow::async::Task_asio_err;
 
  756  using boost::make_shared;
 
  757  using std::to_string;
 
  760  assert(target_session);
 
  761  assert(m_state->m_master_sock_acceptor && 
"By contract do not invoke async_accept() if ctor failed.");
 
  768  Task_asio_err on_done_func(std::move(on_done_handler));
 
  769  auto sock_stm = make_shared<transport::sync_io::Native_socket_stream>(); 
 
  770  const auto sock_stm_raw = sock_stm.get();
 
  772  FLOW_LOG_INFO(
"Session acceptor [" << *
this << 
"]: Async-accept request: Immediately issuing socket stream " 
  773                "acceptor async-accept as step 1.  If that async-succeeds, we will complete the login asynchronously; " 
  774                "if that succeeds we will finally emit the ready-to-go session to user via handler.");
 
  775  m_state->m_master_sock_acceptor->async_accept
 
  777     [
this, target_session, init_channels_by_srv_req, mdt_from_cli_or_null,
 
  778      init_channels_by_cli_req,
 
  779      n_init_channels_by_srv_req_func = std::move(n_init_channels_by_srv_req_func),
 
  780      mdt_load_func = std::move(mdt_load_func),
 
  781      sock_stm = std::move(sock_stm),
 
  782      on_done_func = std::move(on_done_func)]
 
  816    FLOW_LOG_INFO(
"Session acceptor [" << *
this << 
"]: Async-accept request: Socket stream " 
  817                  "acceptor async-accept succeeded resulting in socket stream [" << *sock_stm << 
"]; that was step " 
  818                  "1.  Now we create the server session and have it undergo async-login; if that succeeds we will " 
  819                  "finally emit the ready-to-go session to user via handler.");
 
  822      = make_shared<Server_session_dtl_obj>(get_logger(),
 
  825                                            std::move(*sock_stm));
 
  832      Lock_guard incomplete_sessions_lock(m_state->m_mutex);
 
  833      m_state->m_incomplete_sessions.insert(incomplete_session);
 
  839      const auto cli_it = m_cli_app_master_set_ref.find(
string(cli_app_name));
 
  840      return (cli_it == m_cli_app_master_set_ref.end()) ? 
static_cast<const Client_app*
>(0) : &cli_it->second;
 
  852    auto pre_rsp_setup_func = [
this,
 
  859      auto incomplete_session = incomplete_session_observer.lock();
 
  860      assert(incomplete_session
 
  861             && 
"The Server_session_dtl_obj cannot be dead (dtor ran), if it's invoking its async handlers OK " 
  862                  "(and thus calling us in its async_accept_log_in() success path.");
 
  865      const auto cli_app_ptr = incomplete_session->base().cli_app_ptr();
 
  866      assert(cli_app_ptr && 
"async_accept_log_in() contract is to call pre_rsp_setup_func() once all " 
  867                              "the basic elements of Session_base are known (including Client_app&).");
 
  869      return m_per_app_setup_func(*cli_app_ptr);
 
  886    incomplete_session->async_accept_log_in
 
  888       init_channels_by_srv_req, 
 
  889       mdt_from_cli_or_null, 
 
  890       init_channels_by_cli_req, 
 
  892       std::move(cli_app_lookup_func), 
 
  893       std::move(cli_namespace_func), 
 
  894       std::move(pre_rsp_setup_func), 
 
  896       std::move(n_init_channels_by_srv_req_func), 
 
  897       std::move(mdt_load_func), 
 
  900        target_session, on_done_func = std::move(on_done_func)]
 
  905      auto incomplete_session = incomplete_session_observer.lock();
 
  906      if (incomplete_session)
 
  911          Lock_guard incomplete_sessions_lock(m_state->m_mutex);
 
  913          const bool erased_ok = 1 ==
 
  915          m_state->m_incomplete_sessions.erase(incomplete_session);
 
  916          assert(erased_ok && 
"Who else would have erased it?!");
 
  926          m_state->m_incomplete_session_graveyard->post([incomplete_session = std::move(incomplete_session)]
 
  930            incomplete_session.reset(); 
 
  933          assert((!incomplete_session) && 
"It should've been nullified by being moved-from into the captures.");
 
  942               && 
"The incomplete-session Server_session_dtl can only disappear under us if *this is destroyed " 
  943                    "which can only occur <=> operation-aborted is emitted due to *this destruction destroying that " 
  944                    "incomplete-session.");
 
  952        on_done_func(async_err_code);
 
  956      assert(incomplete_session);
 
  959      FLOW_LOG_INFO(
"Session acceptor [" << *
this << 
"]: Async-accept request: Successfully resulted in logged-in " 
  960                    "server session [" << *incomplete_session << 
"].  Feeding to user via callback.");
 
  964      *target_session = std::move(*(
static_cast<Server_session_obj*
>(incomplete_session.get())));
 
  967      on_done_func(async_err_code); 
 
  968      FLOW_LOG_TRACE(
"Handler finished.");
 
  974void CLASS_SESSION_SERVER_IMPL::to_ostream(std::ostream* os)
 const 
  976  *os << 
'[' << m_srv_app_ref << 
"]@" << 
static_cast<const void*
>(
this);
 
  980typename CLASS_SESSION_SERVER_IMPL::Session_server_obj* CLASS_SESSION_SERVER_IMPL::this_session_srv()
 
  982  return m_this_session_srv;
 
  986template<
typename Task>
 
  987void CLASS_SESSION_SERVER_IMPL::sub_class_set_deinit_func(
Task&& task)
 
  989  m_deinit_func_or_empty = std::move(task);
 
 1000#undef CLASS_SESSION_SERVER_IMPL 
 1001#undef TEMPLATE_SESSION_SERVER_IMPL 
This is the data-less sub-class of Server_session or any more-advanced (e.g., SHM-capable) variant th...
const Session_base_obj & base() const
Provides const access to Session_base super-object.
Internal class template comprising API/logic common to every Session_server variant,...
void to_ostream(std::ostream *os) const
See Server_session method.
boost::shared_ptr< Server_session_dtl_obj > Incomplete_session
Internally used ref-counted handle to a Server_session_dtl_obj, suitable for capturing and passing ar...
void sub_class_set_deinit_func(Task &&task)
Utility for sub-classes: ensures that task() is invoked near the end of *this dtor's execution,...
Session_server_t Session_server_obj
See this_session_srv().
boost::unordered_set< Incomplete_session > Incomplete_sessions
Short-hand for set of Incomplete_session, with fast insertion and removal by key Incomplete_session i...
flow::util::Lock_guard< Mutex > Lock_guard
Short-hand for Mutex lock.
void async_accept(Server_session_obj *target_session, Channels *init_channels_by_srv_req, Mdt_reader_ptr *mdt_from_cli_or_null, Channels *init_channels_by_cli_req, N_init_channels_by_srv_req_func &&n_init_channels_by_srv_req_func, Mdt_load_func &&mdt_load_func, Task_err &&on_done_func)
See Session_server method.
const Client_app::Master_set & m_cli_app_master_set_ref
See ctor.
const Server_app & m_srv_app_ref
See Session_server public data member.
typename Server_session_obj::Mdt_reader_ptr Mdt_reader_ptr
Short-hand for Session_mv::Mdt_reader_ptr.
Function< void()> m_deinit_func_or_empty
See sub_class_set_deinit_func(). .empty() unless that was called at least once.
typename Server_session_obj::Channels Channels
Short-hand for Session_mv::Channels.
Session_server_impl(flow::log::Logger *logger_ptr, Session_server_obj *this_session_srv_arg, const Server_app &srv_app_ref, const Client_app::Master_set &cli_app_master_set_ref, Error_code *err_code, Per_app_setup_func &&per_app_setup_func)
See Session_server ctor; it does that.
Session_server_obj *const m_this_session_srv
See this_session_srv().
std::optional< State > m_state
See State.
Session_server_obj * this_session_srv()
Returns pointer to the object that is privately sub-classing us.
const Function< Error_code(const Client_app &client_app)> m_per_app_setup_func
See ctor.
~Session_server_impl()
See Session_server dtor.
boost::weak_ptr< Server_session_dtl_obj > Incomplete_session_observer
weak_ptr observer of an Incomplete_session.
flow::util::Mutex_non_recursive Mutex
Short-hand for State::m_mutex type.
To be instantiated typically once in a given process, an object of this type asynchronously listens f...
Base of Blob_stream_mq_sender and Blob_stream_mq_receiver containing certain static facilities,...
Implements both sync_io::Native_handle_sender and sync_io::Native_handle_receiver concepts by using a...
A process's credentials (PID, UID, GID as of this writing).
String-wrapping abstraction representing a name uniquely distinguishing a kernel-persistent entity fr...
static Shared_name ct(const Source &src)
Copy-constructs from a char-sequence container (including string, util::String_view,...
@ S_RESOURCE_OWNER_UNEXPECTED
A resource in the file system (file, SHM pool, MQ, etc.) has or could have unexpected owner; ipc::ses...
@ S_MUTEX_BIPC_MISC_LIBRARY_ERROR
Low-level boost.ipc.mutex: boost.interprocess emitted miscellaneous library exception sans a system c...
@ S_OBJECT_SHUTDOWN_ABORTED_COMPLETION_HANDLER
Async completion handler is being called prematurely, because underlying object is shutting down,...
Flow-IPC module providing the broad lifecycle and shared-resource organization – via the session conc...
void ensure_resource_owner_is_app(flow::log::Logger *logger_ptr, const fs::path &path, const App &app, Error_code *err_code)
Utility, used internally but exposed in public API in case it is of general use, that checks that the...
Shared_name build_conventional_shared_name_prefix(const Shared_name &resource_type, const Shared_name &srv_app_name)
Return the prefix common to all calls to either build_conventional_shared_name() overload with the ar...
std::ostream & operator<<(std::ostream &os, const App &val)
Prints string representation of the given App to the given ostream.
@ S_OBJECT_SHUTDOWN_ABORTED_COMPLETION_HANDLER
Async completion handler is being called prematurely, because underlying object is shutting down,...
flow::async::Task Task
Short-hand for polymorphic function (a-la std::function<>) that takes no arguments and returns nothin...
const Open_or_create OPEN_OR_CREATE
Tag value indicating an open-if-exists-else-create operation.
const boost::array< Permissions, size_t(Permissions_level::S_END_SENTINEL)> PRODUCER_CONSUMER_RESOURCE_PERMISSIONS_LVL_MAP
Maps general Permissions_level specifier to low-level Permissions value, when the underlying resource...
void set_resource_permissions(flow::log::Logger *logger_ptr, const fs::path &path, const Permissions &perms, Error_code *err_code)
Utility that sets the permissions of the given resource (at the supplied file system path) to specifi...
void op_with_possible_bipc_exception(flow::log::Logger *logger_ptr, Error_code *err_code, const Error_code &misc_bipc_lib_error, String_view context, const Func &func)
Internal (to ipc) utility that invokes the given function that invokes a boost.interprocess operation...
Permissions shared_resource_permissions(Permissions_level permissions_lvl)
Maps general Permissions_level specifier to low-level Permissions value, when the underlying resource...
flow::util::String_view String_view
Short-hand for Flow's String_view.
Log_component
The flow::log::Component payload enumeration containing various log components used by Flow-IPC inter...
flow::Error_code Error_code
Short-hand for flow::Error_code which is very common.
flow::Function< Signature > Function
Short-hand for polymorphic functor holder which is very common. This is essentially std::function.
#define TEMPLATE_SESSION_SERVER_IMPL
Internally used macro; public API users should disregard (same deal as in struc/channel....
#define CLASS_SESSION_SERVER_IMPL
Internally used macro; public API users should disregard (same deal as in struc/channel....
util::group_id_t m_group_id
The application must run as this group ID (GID). Files and other shared resources shall have this own...
std::string m_name
Brief application name, readable to humans and unique across all other applications' names; used both...
util::user_id_t m_user_id
The application must run as this user ID (UID). Files and other shared resources shall have this owne...
An App that is used as a client in at least one client-server IPC split.
boost::unordered_map< std::string, Client_app > Master_set
Suggested type for storing master repository or all Client_appss. See App doc header for discussion.
An App that is used as a server in at least one client-server IPC split.
util::Permissions_level m_permissions_level_for_client_apps
Specifies level of access for Client_apps (which must, also, be in m_allowed_client_apps at any rate)...
All internal mutable state of Session_server_impl.
std::atomic< uint64_t > m_last_cli_namespace
The ID used in generating the last Server_session::cli_namespace(); so the next one = this plus 1....
boost::movelib::unique_ptr< transport::Native_socket_stream_acceptor > m_master_sock_acceptor
transport::Native_socket_stream acceptor avail throughout *this to accept init please-open-session re...
Mutex m_mutex
Protects m_incomplete_sessions. See class doc header impl section for discussion of thread design.
Incomplete_sessions m_incomplete_sessions
The set of all Incomplete_session objects such that each one comes from a distinct async_accept() req...
boost::movelib::unique_ptr< flow::async::Single_thread_task_loop > m_incomplete_session_graveyard
Mostly-idle thread that solely destroys objects removed from m_incomplete_sessions in the case where ...