29#include <boost/interprocess/sync/named_mutex.hpp>
30#include <boost/interprocess/sync/scoped_lock.hpp>
31#include <boost/move/make_unique.hpp>
142template<
typename Session_server_t,
typename Server_session_t>
144 public flow::log::Log_context,
145 private boost::noncopyable
160 using Channels =
typename Server_session_obj::Channels;
196 template<
typename Per_app_setup_func>
201 Error_code* err_code, Per_app_setup_func&& per_app_setup_func);
234 template<
typename Task_err,
235 typename N_init_channels_by_srv_req_func,
typename Mdt_load_func>
240 N_init_channels_by_srv_req_func&& n_init_channels_by_srv_req_func,
241 Mdt_load_func&& mdt_load_func,
242 Task_err&& on_done_func);
309 template<
typename Task>
347 using Mutex = flow::util::Mutex_non_recursive;
430#define TEMPLATE_SESSION_SERVER_IMPL \
431 template<typename Session_server_t, typename Server_session_t>
433#define CLASS_SESSION_SERVER_IMPL \
434 Session_server_impl<Session_server_t, Server_session_t>
437template<
typename Per_app_setup_func>
438CLASS_SESSION_SERVER_IMPL::Session_server_impl
441 Per_app_setup_func&& per_app_setup_func) :
443 flow::log::Log_context(logger_ptr,
Log_component::S_SESSION),
444 m_srv_app_ref(srv_app_ref_arg),
445 m_this_session_srv(this_session_srv_arg),
446 m_cli_app_master_set_ref(cli_app_master_set_ref),
447 m_per_app_setup_func(std::move(per_app_setup_func)),
448 m_state(std::in_place)
452 using flow::error::Runtime_error;
453 using boost::movelib::make_unique;
454 using boost::system::system_category;
455 using boost::io::ios_all_saver;
457 using Named_sh_mutex = boost::interprocess::named_mutex;
458 using Named_sh_mutex_ptr = boost::movelib::unique_ptr<Named_sh_mutex>;
459 using Sh_lock_guard = boost::interprocess::scoped_lock<Named_sh_mutex>;
463 m_state->m_last_cli_namespace = 0;
464 m_state->m_incomplete_session_graveyard = boost::movelib::make_unique<flow::async::Single_thread_task_loop>
466 flow::util::ostream_op_string(
"srv_sess_acc_graveyard[", *
this,
"]"));
492 if constexpr(Server_session_dtl_obj::Session_base_obj::S_MQS_ENABLED)
495 using Mq =
typename Server_session_dtl_obj::Session_base_obj::Persistent_mq_handle_from_cfg;
497 util::remove_each_persistent_with_name_prefix<Blob_stream_mq_base<Mq>>
580 const auto own_creds = Process_credentials::own_process_credentials();
583 FLOW_LOG_WARNING(
"Session acceptor [" << *
this <<
"]: Creation underway. However, just before writing "
584 "CNS (Current Namespace Store), a/k/a PID file, we determined that "
585 "the `user` aspect of our effective credentials [" << own_creds <<
"] do not match "
586 "the hard-configured value passed to this ctor: "
588 "We cannot proceed, as this would violate the security/safety model of ipc::session. "
594 const auto mutex_name = empty_session.
base().cur_ns_store_mutex_absolute_name();
596 const auto cns_path = empty_session.
base().cur_ns_store_absolute_path();
600 ios_all_saver saver(*(get_logger()->this_thread_ostream()));
601 FLOW_LOG_INFO(
"Session acceptor [" << *
this <<
"]: Created. Writing CNS (Current Namespace Store), a/k/a PID "
602 "file [" << cns_path <<
"] (perms "
603 "[" << std::setfill(
'0')
605 << std::oct << cns_perms.get_permissions() <<
"], "
606 "shared-mutex name [" << mutex_name <<
"], shared-mutex perms "
608 << mutex_perms.get_permissions() <<
"]); "
609 "then listening for incoming master socket stream "
610 "connects (through Native_socket_stream_acceptor that was just cted) to address "
611 "based partially on the namespace (PID) written to that file.");
616 Named_sh_mutex_ptr sh_mutex;
618 "Server_session_impl::ctor:named-mutex-open-or-create", [&]()
620 sh_mutex = make_unique<Named_sh_mutex>(
util::OPEN_OR_CREATE, mutex_name.native_str(), mutex_perms);
630 Sh_lock_guard sh_lock(*sh_mutex);
639 const bool we_created_cns = !fs::exists(cns_path, dummy);
640 ofstream cns_file(cns_path);
658 if ((!our_err_code) && we_created_cns)
669 cns_file << empty_session.
base().srv_namespace().str() <<
'\n';
671 if (!cns_file.good())
673 const auto sys_err_code = our_err_code =
Error_code(errno, system_category());
674 FLOW_LOG_WARNING(
"Session acceptor [" << *
this <<
"]: Could not open or write CNS (PID) file "
675 "file [" << cns_path <<
"]; system error details follow.");
676 FLOW_ERROR_SYS_ERROR_LOG_WARNING();
691 *err_code = our_err_code;
695 throw Runtime_error(our_err_code, FLOW_UTIL_WHERE_AM_I_STR());
701 m_state->m_master_sock_acceptor
702 = make_unique<transport::Native_socket_stream_acceptor>
704 empty_session.
base().session_master_socket_stream_acceptor_absolute_name(),
708 if (err_code && (!*err_code))
710 m_state->m_incomplete_session_graveyard->start();
716CLASS_SESSION_SERVER_IMPL::~Session_server_impl()
718 FLOW_LOG_INFO(
"Session acceptor [" << *
this <<
"]: Shutting down. The Native_socket_stream_acceptor will now "
719 "shut down, and all outstanding Native_socket_stream_acceptor handlers and Server_session "
720 "handlers shall fire with operation-aborted error codes.");
734 if (!m_deinit_func_or_empty.empty())
736 FLOW_LOG_INFO(
"Session acceptor [" << *
this <<
"]: Continuing shutdown. A sub-class desires final de-init "
737 "work, once everything else is stopped (might be persistent resource cleanup); invoking "
739 m_deinit_func_or_empty();
740 FLOW_LOG_TRACE(
"De-init work finished.");
745template<
typename Task_err,
746 typename N_init_channels_by_srv_req_func,
typename Mdt_load_func>
751 N_init_channels_by_srv_req_func&& n_init_channels_by_srv_req_func,
752 Mdt_load_func&& mdt_load_func,
753 Task_err&& on_done_handler)
756 using flow::async::Task_asio_err;
757 using boost::make_shared;
758 using std::to_string;
761 assert(target_session);
762 assert(m_state->m_master_sock_acceptor &&
"By contract do not invoke async_accept() if ctor failed.");
769 Task_asio_err on_done_func(std::move(on_done_handler));
770 auto sock_stm = make_shared<transport::sync_io::Native_socket_stream>();
771 const auto sock_stm_raw = sock_stm.get();
773 FLOW_LOG_INFO(
"Session acceptor [" << *
this <<
"]: Async-accept request: Immediately issuing socket stream "
774 "acceptor async-accept as step 1. If that async-succeeds, we will complete the login asynchronously; "
775 "if that succeeds we will finally emit the ready-to-go session to user via handler.");
776 m_state->m_master_sock_acceptor->async_accept
778 [
this, target_session, init_channels_by_srv_req, mdt_from_cli_or_null,
779 init_channels_by_cli_req,
780 n_init_channels_by_srv_req_func = std::move(n_init_channels_by_srv_req_func),
781 mdt_load_func = std::move(mdt_load_func),
782 sock_stm = std::move(sock_stm),
783 on_done_func = std::move(on_done_func)]
817 FLOW_LOG_INFO(
"Session acceptor [" << *
this <<
"]: Async-accept request: Socket stream "
818 "acceptor async-accept succeeded resulting in socket stream [" << *sock_stm <<
"]; that was step "
819 "1. Now we create the server session and have it undergo async-login; if that succeeds we will "
820 "finally emit the ready-to-go session to user via handler.");
823 = make_shared<Server_session_dtl_obj>(get_logger(),
826 std::move(*sock_stm));
833 Lock_guard incomplete_sessions_lock(m_state->m_mutex);
834 m_state->m_incomplete_sessions.insert(incomplete_session);
840 const auto cli_it = m_cli_app_master_set_ref.find(
string(cli_app_name));
841 return (cli_it == m_cli_app_master_set_ref.end()) ?
static_cast<const Client_app*
>(0) : &cli_it->second;
853 auto pre_rsp_setup_func = [
this,
860 auto incomplete_session = incomplete_session_observer.lock();
861 assert(incomplete_session
862 &&
"The Server_session_dtl_obj cannot be dead (dtor ran), if it's invoking its async handlers OK "
863 "(and thus calling us in its async_accept_log_in() success path.");
866 const auto cli_app_ptr = incomplete_session->base().cli_app_ptr();
867 assert(cli_app_ptr &&
"async_accept_log_in() contract is to call pre_rsp_setup_func() once all "
868 "the basic elements of Session_base are known (including Client_app&).");
870 return m_per_app_setup_func(*cli_app_ptr);
887 incomplete_session->async_accept_log_in
889 init_channels_by_srv_req,
890 mdt_from_cli_or_null,
891 init_channels_by_cli_req,
893 std::move(cli_app_lookup_func),
894 std::move(cli_namespace_func),
895 std::move(pre_rsp_setup_func),
897 std::move(n_init_channels_by_srv_req_func),
898 std::move(mdt_load_func),
901 target_session, on_done_func = std::move(on_done_func)]
906 auto incomplete_session = incomplete_session_observer.lock();
907 if (incomplete_session)
912 Lock_guard incomplete_sessions_lock(m_state->m_mutex);
914 const bool erased_ok = 1 ==
916 m_state->m_incomplete_sessions.erase(incomplete_session);
917 assert(erased_ok &&
"Who else would have erased it?!");
927 m_state->m_incomplete_session_graveyard->post([incomplete_session = std::move(incomplete_session)]
931 incomplete_session.reset();
934 assert((!incomplete_session) &&
"It should've been nullified by being moved-from into the captures.");
943 &&
"The incomplete-session Server_session_dtl can only disappear under us if *this is destroyed "
944 "which can only occur <=> operation-aborted is emitted due to *this destruction destroying that "
945 "incomplete-session.");
953 on_done_func(async_err_code);
957 assert(incomplete_session);
960 FLOW_LOG_INFO(
"Session acceptor [" << *
this <<
"]: Async-accept request: Successfully resulted in logged-in "
961 "server session [" << *incomplete_session <<
"]. Feeding to user via callback.");
965 *target_session = std::move(*(
static_cast<Server_session_obj*
>(incomplete_session.get())));
968 on_done_func(async_err_code);
969 FLOW_LOG_TRACE(
"Handler finished.");
975void CLASS_SESSION_SERVER_IMPL::to_ostream(std::ostream* os)
const
977 *os <<
'[' << m_srv_app_ref <<
"]@" <<
static_cast<const void*
>(
this);
981typename CLASS_SESSION_SERVER_IMPL::Session_server_obj* CLASS_SESSION_SERVER_IMPL::this_session_srv()
983 return m_this_session_srv;
987template<
typename Task>
988void CLASS_SESSION_SERVER_IMPL::sub_class_set_deinit_func(
Task&& task)
990 m_deinit_func_or_empty = std::move(task);
1001#undef CLASS_SESSION_SERVER_IMPL
1002#undef TEMPLATE_SESSION_SERVER_IMPL
This is the data-less sub-class of Server_session or any more-advanced (e.g., SHM-capable) variant th...
const Session_base_obj & base() const
Provides const access to Session_base super-object.
Internal class template comprising API/logic common to every Session_server variant,...
void to_ostream(std::ostream *os) const
See Server_session method.
boost::shared_ptr< Server_session_dtl_obj > Incomplete_session
Internally used ref-counted handle to a Server_session_dtl_obj, suitable for capturing and passing ar...
void sub_class_set_deinit_func(Task &&task)
Utility for sub-classes: ensures that task() is invoked near the end of *this dtor's execution,...
Session_server_t Session_server_obj
See this_session_srv().
boost::unordered_set< Incomplete_session > Incomplete_sessions
Short-hand for set of Incomplete_session, with fast insertion and removal by key Incomplete_session i...
flow::util::Lock_guard< Mutex > Lock_guard
Short-hand for Mutex lock.
void async_accept(Server_session_obj *target_session, Channels *init_channels_by_srv_req, Mdt_reader_ptr *mdt_from_cli_or_null, Channels *init_channels_by_cli_req, N_init_channels_by_srv_req_func &&n_init_channels_by_srv_req_func, Mdt_load_func &&mdt_load_func, Task_err &&on_done_func)
See Session_server method.
const Client_app::Master_set & m_cli_app_master_set_ref
See ctor.
const Server_app & m_srv_app_ref
See Session_server public data member.
typename Server_session_obj::Mdt_reader_ptr Mdt_reader_ptr
Short-hand for Session_mv::Mdt_reader_ptr.
Function< void()> m_deinit_func_or_empty
See sub_class_set_deinit_func(). .empty() unless that was called at least once.
typename Server_session_obj::Channels Channels
Short-hand for Session_mv::Channels.
Session_server_impl(flow::log::Logger *logger_ptr, Session_server_obj *this_session_srv_arg, const Server_app &srv_app_ref, const Client_app::Master_set &cli_app_master_set_ref, Error_code *err_code, Per_app_setup_func &&per_app_setup_func)
See Session_server ctor; it does that.
Session_server_obj *const m_this_session_srv
See this_session_srv().
std::optional< State > m_state
See State.
Session_server_obj * this_session_srv()
Returns pointer to the object that is privately sub-classing us.
const Function< Error_code(const Client_app &client_app)> m_per_app_setup_func
See ctor.
~Session_server_impl()
See Session_server dtor.
boost::weak_ptr< Server_session_dtl_obj > Incomplete_session_observer
weak_ptr observer of an Incomplete_session.
flow::util::Mutex_non_recursive Mutex
Short-hand for State::m_mutex type.
To be instantiated typically once in a given process, an object of this type asynchronously listens f...
Base of Blob_stream_mq_sender and Blob_stream_mq_receiver containing certain static facilities,...
Implements both sync_io::Native_handle_sender and sync_io::Native_handle_receiver concepts by using a...
A process's credentials (PID, UID, GID as of this writing).
String-wrapping abstraction representing a name uniquely distinguishing a kernel-persistent entity fr...
static Shared_name ct(const Source &src)
Copy-constructs from a char-sequence container (including string, util::String_view,...
@ S_RESOURCE_OWNER_UNEXPECTED
A resource in the file system (file, SHM pool, MQ, etc.) has or could have unexpected owner; ipc::ses...
@ S_MUTEX_BIPC_MISC_LIBRARY_ERROR
Low-level boost.ipc.mutex: boost.interprocess emitted miscellaneous library exception sans a system c...
@ S_OBJECT_SHUTDOWN_ABORTED_COMPLETION_HANDLER
Async completion handler is being called prematurely, because underlying object is shutting down,...
Flow-IPC module providing the broad lifecycle and shared-resource organization – via the session conc...
void ensure_resource_owner_is_app(flow::log::Logger *logger_ptr, const fs::path &path, const App &app, Error_code *err_code)
Utility, used internally but exposed in public API in case it is of general use, that checks that the...
Shared_name build_conventional_shared_name_prefix(const Shared_name &resource_type, const Shared_name &srv_app_name)
Return the prefix common to all calls to either build_conventional_shared_name() overload with the ar...
std::ostream & operator<<(std::ostream &os, const App &val)
Prints string representation of the given App to the given ostream.
@ S_OBJECT_SHUTDOWN_ABORTED_COMPLETION_HANDLER
Async completion handler is being called prematurely, because underlying object is shutting down,...
flow::async::Task Task
Short-hand for polymorphic function (a-la std::function<>) that takes no arguments and returns nothin...
const Open_or_create OPEN_OR_CREATE
Tag value indicating an open-if-exists-else-create operation.
const boost::array< Permissions, size_t(Permissions_level::S_END_SENTINEL)> PRODUCER_CONSUMER_RESOURCE_PERMISSIONS_LVL_MAP
Maps general Permissions_level specifier to low-level Permissions value, when the underlying resource...
void set_resource_permissions(flow::log::Logger *logger_ptr, const fs::path &path, const Permissions &perms, Error_code *err_code)
Utility that sets the permissions of the given resource (at the supplied file system path) to specifi...
void op_with_possible_bipc_exception(flow::log::Logger *logger_ptr, Error_code *err_code, const Error_code &misc_bipc_lib_error, String_view context, const Func &func)
Internal (to ipc) utility that invokes the given function that invokes a boost.interprocess operation...
Permissions shared_resource_permissions(Permissions_level permissions_lvl)
Maps general Permissions_level specifier to low-level Permissions value, when the underlying resource...
flow::util::String_view String_view
Short-hand for Flow's String_view.
Log_component
The flow::log::Component payload enumeration containing various log components used by Flow-IPC inter...
flow::Error_code Error_code
Short-hand for flow::Error_code which is very common.
flow::Function< Signature > Function
Short-hand for polymorphic functor holder which is very common. This is essentially std::function.
#define TEMPLATE_SESSION_SERVER_IMPL
Internally used macro; public API users should disregard (same deal as in struc/channel....
#define CLASS_SESSION_SERVER_IMPL
Internally used macro; public API users should disregard (same deal as in struc/channel....
util::group_id_t m_group_id
The application must run as this group ID (GID). Files and other shared resources shall have this own...
std::string m_name
Brief application name, readable to humans and unique across all other applications' names; used both...
util::user_id_t m_user_id
The application must run as this user ID (UID). Files and other shared resources shall have this owne...
An App that is used as a client in at least one client-server IPC split.
boost::unordered_map< std::string, Client_app > Master_set
Suggested type for storing master repository or all Client_appss. See App doc header for discussion.
An App that is used as a server in at least one client-server IPC split.
util::Permissions_level m_permissions_level_for_client_apps
Specifies level of access for Client_apps (which must, also, be in m_allowed_client_apps at any rate)...
All internal mutable state of Session_server_impl.
std::atomic< uint64_t > m_last_cli_namespace
The ID used in generating the last Server_session::cli_namespace(); so the next one = this plus 1....
boost::movelib::unique_ptr< transport::Native_socket_stream_acceptor > m_master_sock_acceptor
transport::Native_socket_stream acceptor avail throughout *this to accept init please-open-session re...
Mutex m_mutex
Protects m_incomplete_sessions. See class doc header impl section for discussion of thread design.
Incomplete_sessions m_incomplete_sessions
The set of all Incomplete_session objects such that each one comes from a distinct async_accept() req...
boost::movelib::unique_ptr< flow::async::Single_thread_task_loop > m_incomplete_session_graveyard
Mostly-idle thread that solely destroys objects removed from m_incomplete_sessions in the case where ...