29#include <boost/interprocess/sync/named_mutex.hpp>
30#include <boost/interprocess/sync/scoped_lock.hpp>
31#include <boost/move/make_unique.hpp>
142template<
typename Session_server_t,
typename Server_session_t>
144 public flow::log::Log_context,
145 private boost::noncopyable
160 using Channels =
typename Server_session_obj::Channels;
196 template<
typename Per_app_setup_func>
201 Error_code* err_code, Per_app_setup_func&& per_app_setup_func);
234 template<
typename Task_err,
235 typename N_init_channels_by_srv_req_func,
typename Mdt_load_func>
240 N_init_channels_by_srv_req_func&& n_init_channels_by_srv_req_func,
241 Mdt_load_func&& mdt_load_func,
242 Task_err&& on_done_func);
309 template<
typename Task>
347 using Mutex = flow::util::Mutex_non_recursive;
429#define TEMPLATE_SESSION_SERVER_IMPL \
430 template<typename Session_server_t, typename Server_session_t>
432#define CLASS_SESSION_SERVER_IMPL \
433 Session_server_impl<Session_server_t, Server_session_t>
436template<
typename Per_app_setup_func>
437CLASS_SESSION_SERVER_IMPL::Session_server_impl
440 Per_app_setup_func&& per_app_setup_func) :
442 flow::log::Log_context(logger_ptr,
Log_component::S_SESSION),
443 m_srv_app_ref(srv_app_ref_arg),
444 m_this_session_srv(this_session_srv_arg),
445 m_cli_app_master_set_ref(cli_app_master_set_ref),
446 m_per_app_setup_func(std::move(per_app_setup_func)),
447 m_state(std::in_place)
451 using flow::error::Runtime_error;
452 using boost::movelib::make_unique;
453 using boost::system::system_category;
454 using boost::io::ios_all_saver;
456 using Named_sh_mutex = boost::interprocess::named_mutex;
457 using Named_sh_mutex_ptr = boost::movelib::unique_ptr<Named_sh_mutex>;
458 using Sh_lock_guard = boost::interprocess::scoped_lock<Named_sh_mutex>;
462 m_state->m_last_cli_namespace = 0;
463 m_state->m_incomplete_session_graveyard = boost::movelib::make_unique<flow::async::Single_thread_task_loop>
465 flow::util::ostream_op_string(
"srv_sess_acc_graveyard[", *
this,
"]"));
491 if constexpr(Server_session_dtl_obj::Session_base_obj::S_MQS_ENABLED)
494 using Mq =
typename Server_session_dtl_obj::Session_base_obj::Persistent_mq_handle_from_cfg;
496 util::remove_each_persistent_with_name_prefix<Blob_stream_mq_base<Mq>>
579 const auto own_creds = Process_credentials::own_process_credentials();
582 FLOW_LOG_WARNING(
"Session acceptor [" << *
this <<
"]: Creation underway. However, just before writing "
583 "CNS (Current Namespace Store), a/k/a PID file, we determined that "
584 "the `user` aspect of our effective credentials [" << own_creds <<
"] do not match "
585 "the hard-configured value passed to this ctor: "
587 "We cannot proceed, as this would violate the security/safety model of ipc::session. "
593 const auto mutex_name = empty_session.
base().cur_ns_store_mutex_absolute_name();
595 const auto cns_path = empty_session.
base().cur_ns_store_absolute_path();
599 ios_all_saver saver(*(get_logger()->this_thread_ostream()));
600 FLOW_LOG_INFO(
"Session acceptor [" << *
this <<
"]: Created. Writing CNS (Current Namespace Store), a/k/a PID "
601 "file [" << cns_path <<
"] (perms "
602 "[" << std::setfill(
'0')
604 << std::oct << cns_perms.get_permissions() <<
"], "
605 "shared-mutex name [" << mutex_name <<
"], shared-mutex perms "
607 << mutex_perms.get_permissions() <<
"]); "
608 "then listening for incoming master socket stream "
609 "connects (through Native_socket_stream_acceptor that was just cted) to address "
610 "based partially on the namespace (PID) written to that file.");
615 Named_sh_mutex_ptr sh_mutex;
617 "Server_session_impl::ctor:named-mutex-open-or-create", [&]()
619 sh_mutex = make_unique<Named_sh_mutex>(
util::OPEN_OR_CREATE, mutex_name.native_str(), mutex_perms);
629 Sh_lock_guard sh_lock(*sh_mutex);
638 const bool we_created_cns = !fs::exists(cns_path, dummy);
639 ofstream cns_file(cns_path);
657 if ((!our_err_code) && we_created_cns)
668 cns_file << empty_session.
base().srv_namespace().str() <<
'\n';
670 if (!cns_file.good())
672 const auto sys_err_code = our_err_code =
Error_code(errno, system_category());
673 FLOW_LOG_WARNING(
"Session acceptor [" << *
this <<
"]: Could not open or write CNS (PID) file "
674 "file [" << cns_path <<
"]; system error details follow.");
675 FLOW_ERROR_SYS_ERROR_LOG_WARNING();
690 *err_code = our_err_code;
694 throw Runtime_error(our_err_code, FLOW_UTIL_WHERE_AM_I_STR());
700 m_state->m_master_sock_acceptor
701 = make_unique<transport::Native_socket_stream_acceptor>
703 empty_session.
base().session_master_socket_stream_acceptor_absolute_name(),
707 if (err_code && (!*err_code))
709 m_state->m_incomplete_session_graveyard->start();
715CLASS_SESSION_SERVER_IMPL::~Session_server_impl()
717 FLOW_LOG_INFO(
"Session acceptor [" << *
this <<
"]: Shutting down. The Native_socket_stream_acceptor will now "
718 "shut down, and all outstanding Native_socket_stream_acceptor handlers and Server_session "
719 "handlers shall fire with operation-aborted error codes.");
733 if (!m_deinit_func_or_empty.empty())
735 FLOW_LOG_INFO(
"Session acceptor [" << *
this <<
"]: Continuing shutdown. A sub-class desires final de-init "
736 "work, once everything else is stopped (might be persistent resource cleanup); invoking "
738 m_deinit_func_or_empty();
739 FLOW_LOG_TRACE(
"De-init work finished.");
744template<
typename Task_err,
745 typename N_init_channels_by_srv_req_func,
typename Mdt_load_func>
750 N_init_channels_by_srv_req_func&& n_init_channels_by_srv_req_func,
751 Mdt_load_func&& mdt_load_func,
752 Task_err&& on_done_handler)
755 using flow::async::Task_asio_err;
756 using boost::make_shared;
757 using std::to_string;
760 assert(target_session);
761 assert(m_state->m_master_sock_acceptor &&
"By contract do not invoke async_accept() if ctor failed.");
768 Task_asio_err on_done_func(std::move(on_done_handler));
769 auto sock_stm = make_shared<transport::sync_io::Native_socket_stream>();
770 const auto sock_stm_raw = sock_stm.get();
772 FLOW_LOG_INFO(
"Session acceptor [" << *
this <<
"]: Async-accept request: Immediately issuing socket stream "
773 "acceptor async-accept as step 1. If that async-succeeds, we will complete the login asynchronously; "
774 "if that succeeds we will finally emit the ready-to-go session to user via handler.");
775 m_state->m_master_sock_acceptor->async_accept
777 [
this, target_session, init_channels_by_srv_req, mdt_from_cli_or_null,
778 init_channels_by_cli_req,
779 n_init_channels_by_srv_req_func = std::move(n_init_channels_by_srv_req_func),
780 mdt_load_func = std::move(mdt_load_func),
781 sock_stm = std::move(sock_stm),
782 on_done_func = std::move(on_done_func)]
816 FLOW_LOG_INFO(
"Session acceptor [" << *
this <<
"]: Async-accept request: Socket stream "
817 "acceptor async-accept succeeded resulting in socket stream [" << *sock_stm <<
"]; that was step "
818 "1. Now we create the server session and have it undergo async-login; if that succeeds we will "
819 "finally emit the ready-to-go session to user via handler.");
822 = make_shared<Server_session_dtl_obj>(get_logger(),
825 std::move(*sock_stm));
832 Lock_guard incomplete_sessions_lock(m_state->m_mutex);
833 m_state->m_incomplete_sessions.insert(incomplete_session);
839 const auto cli_it = m_cli_app_master_set_ref.find(
string(cli_app_name));
840 return (cli_it == m_cli_app_master_set_ref.end()) ?
static_cast<const Client_app*
>(0) : &cli_it->second;
852 auto pre_rsp_setup_func = [
this,
859 auto incomplete_session = incomplete_session_observer.lock();
860 assert(incomplete_session
861 &&
"The Server_session_dtl_obj cannot be dead (dtor ran), if it's invoking its async handlers OK "
862 "(and thus calling us in its async_accept_log_in() success path.");
865 const auto cli_app_ptr = incomplete_session->base().cli_app_ptr();
866 assert(cli_app_ptr &&
"async_accept_log_in() contract is to call pre_rsp_setup_func() once all "
867 "the basic elements of Session_base are known (including Client_app&).");
869 return m_per_app_setup_func(*cli_app_ptr);
886 incomplete_session->async_accept_log_in
888 init_channels_by_srv_req,
889 mdt_from_cli_or_null,
890 init_channels_by_cli_req,
892 std::move(cli_app_lookup_func),
893 std::move(cli_namespace_func),
894 std::move(pre_rsp_setup_func),
896 std::move(n_init_channels_by_srv_req_func),
897 std::move(mdt_load_func),
900 target_session, on_done_func = std::move(on_done_func)]
905 auto incomplete_session = incomplete_session_observer.lock();
906 if (incomplete_session)
911 Lock_guard incomplete_sessions_lock(m_state->m_mutex);
913 const bool erased_ok = 1 ==
915 m_state->m_incomplete_sessions.erase(incomplete_session);
916 assert(erased_ok &&
"Who else would have erased it?!");
926 m_state->m_incomplete_session_graveyard->post([incomplete_session = std::move(incomplete_session)]
930 incomplete_session.reset();
933 assert((!incomplete_session) &&
"It should've been nullified by being moved-from into the captures.");
942 &&
"The incomplete-session Server_session_dtl can only disappear under us if *this is destroyed "
943 "which can only occur <=> operation-aborted is emitted due to *this destruction destroying that "
944 "incomplete-session.");
952 on_done_func(async_err_code);
956 assert(incomplete_session);
959 FLOW_LOG_INFO(
"Session acceptor [" << *
this <<
"]: Async-accept request: Successfully resulted in logged-in "
960 "server session [" << *incomplete_session <<
"]. Feeding to user via callback.");
964 *target_session = std::move(*(
static_cast<Server_session_obj*
>(incomplete_session.get())));
967 on_done_func(async_err_code);
968 FLOW_LOG_TRACE(
"Handler finished.");
974void CLASS_SESSION_SERVER_IMPL::to_ostream(std::ostream* os)
const
976 *os <<
'[' << m_srv_app_ref <<
"]@" <<
static_cast<const void*
>(
this);
980typename CLASS_SESSION_SERVER_IMPL::Session_server_obj* CLASS_SESSION_SERVER_IMPL::this_session_srv()
982 return m_this_session_srv;
986template<
typename Task>
987void CLASS_SESSION_SERVER_IMPL::sub_class_set_deinit_func(
Task&& task)
989 m_deinit_func_or_empty = std::move(task);
1000#undef CLASS_SESSION_SERVER_IMPL
1001#undef TEMPLATE_SESSION_SERVER_IMPL
This is the data-less sub-class of Server_session or any more-advanced (e.g., SHM-capable) variant th...
const Session_base_obj & base() const
Provides const access to Session_base super-object.
Internal class template comprising API/logic common to every Session_server variant,...
void to_ostream(std::ostream *os) const
See Server_session method.
boost::shared_ptr< Server_session_dtl_obj > Incomplete_session
Internally used ref-counted handle to a Server_session_dtl_obj, suitable for capturing and passing ar...
void sub_class_set_deinit_func(Task &&task)
Utility for sub-classes: ensures that task() is invoked near the end of *this dtor's execution,...
Session_server_t Session_server_obj
See this_session_srv().
boost::unordered_set< Incomplete_session > Incomplete_sessions
Short-hand for set of Incomplete_session, with fast insertion and removal by key Incomplete_session i...
flow::util::Lock_guard< Mutex > Lock_guard
Short-hand for Mutex lock.
void async_accept(Server_session_obj *target_session, Channels *init_channels_by_srv_req, Mdt_reader_ptr *mdt_from_cli_or_null, Channels *init_channels_by_cli_req, N_init_channels_by_srv_req_func &&n_init_channels_by_srv_req_func, Mdt_load_func &&mdt_load_func, Task_err &&on_done_func)
See Session_server method.
const Client_app::Master_set & m_cli_app_master_set_ref
See ctor.
const Server_app & m_srv_app_ref
See Session_server public data member.
typename Server_session_obj::Mdt_reader_ptr Mdt_reader_ptr
Short-hand for Session_mv::Mdt_reader_ptr.
Function< void()> m_deinit_func_or_empty
See sub_class_set_deinit_func(). .empty() unless that was called at least once.
typename Server_session_obj::Channels Channels
Short-hand for Session_mv::Channels.
Session_server_impl(flow::log::Logger *logger_ptr, Session_server_obj *this_session_srv_arg, const Server_app &srv_app_ref, const Client_app::Master_set &cli_app_master_set_ref, Error_code *err_code, Per_app_setup_func &&per_app_setup_func)
See Session_server ctor; it does that.
Session_server_obj *const m_this_session_srv
See this_session_srv().
std::optional< State > m_state
See State.
Session_server_obj * this_session_srv()
Returns pointer to the object that is privately sub-classing us.
const Function< Error_code(const Client_app &client_app)> m_per_app_setup_func
See ctor.
~Session_server_impl()
See Session_server dtor.
boost::weak_ptr< Server_session_dtl_obj > Incomplete_session_observer
weak_ptr observer of an Incomplete_session.
flow::util::Mutex_non_recursive Mutex
Short-hand for State::m_mutex type.
To be instantiated typically once in a given process, an object of this type asynchronously listens f...
Base of Blob_stream_mq_sender and Blob_stream_mq_receiver containing certain static facilities,...
Implements both sync_io::Native_handle_sender and sync_io::Native_handle_receiver concepts by using a...
A process's credentials (PID, UID, GID as of this writing).
String-wrapping abstraction representing a name uniquely distinguishing a kernel-persistent entity fr...
static Shared_name ct(const Source &src)
Copy-constructs from a char-sequence container (including string, util::String_view,...
@ S_RESOURCE_OWNER_UNEXPECTED
A resource in the file system (file, SHM pool, MQ, etc.) has or could have unexpected owner; ipc::ses...
@ S_MUTEX_BIPC_MISC_LIBRARY_ERROR
Low-level boost.ipc.mutex: boost.interprocess emitted miscellaneous library exception sans a system c...
@ S_OBJECT_SHUTDOWN_ABORTED_COMPLETION_HANDLER
Async completion handler is being called prematurely, because underlying object is shutting down,...
Flow-IPC module providing the broad lifecycle and shared-resource organization – via the session conc...
void ensure_resource_owner_is_app(flow::log::Logger *logger_ptr, const fs::path &path, const App &app, Error_code *err_code)
Utility, used internally but exposed in public API in case it is of general use, that checks that the...
Shared_name build_conventional_shared_name_prefix(const Shared_name &resource_type, const Shared_name &srv_app_name)
Return the prefix common to all calls to either build_conventional_shared_name() overload with the ar...
std::ostream & operator<<(std::ostream &os, const App &val)
Prints string representation of the given App to the given ostream.
@ S_OBJECT_SHUTDOWN_ABORTED_COMPLETION_HANDLER
Async completion handler is being called prematurely, because underlying object is shutting down,...
flow::async::Task Task
Short-hand for polymorphic function (a-la std::function<>) that takes no arguments and returns nothin...
const Open_or_create OPEN_OR_CREATE
Tag value indicating an open-if-exists-else-create operation.
const boost::array< Permissions, size_t(Permissions_level::S_END_SENTINEL)> PRODUCER_CONSUMER_RESOURCE_PERMISSIONS_LVL_MAP
Maps general Permissions_level specifier to low-level Permissions value, when the underlying resource...
void set_resource_permissions(flow::log::Logger *logger_ptr, const fs::path &path, const Permissions &perms, Error_code *err_code)
Utility that sets the permissions of the given resource (at the supplied file system path) to specifi...
void op_with_possible_bipc_exception(flow::log::Logger *logger_ptr, Error_code *err_code, const Error_code &misc_bipc_lib_error, String_view context, const Func &func)
Internal (to ipc) utility that invokes the given function that invokes a boost.interprocess operation...
Permissions shared_resource_permissions(Permissions_level permissions_lvl)
Maps general Permissions_level specifier to low-level Permissions value, when the underlying resource...
flow::util::String_view String_view
Short-hand for Flow's String_view.
Log_component
The flow::log::Component payload enumeration containing various log components used by Flow-IPC inter...
flow::Error_code Error_code
Short-hand for flow::Error_code which is very common.
flow::Function< Signature > Function
Short-hand for polymorphic functor holder which is very common. This is essentially std::function.
#define TEMPLATE_SESSION_SERVER_IMPL
Internally used macro; public API users should disregard (same deal as in struc/channel....
#define CLASS_SESSION_SERVER_IMPL
Internally used macro; public API users should disregard (same deal as in struc/channel....
util::group_id_t m_group_id
The application must run as this group ID (GID). Files and other shared resources shall have this own...
std::string m_name
Brief application name, readable to humans and unique across all other applications' names; used both...
util::user_id_t m_user_id
The application must run as this user ID (UID). Files and other shared resources shall have this owne...
An App that is used as a client in at least one client-server IPC split.
boost::unordered_map< std::string, Client_app > Master_set
Suggested type for storing master repository or all Client_appss. See App doc header for discussion.
An App that is used as a server in at least one client-server IPC split.
util::Permissions_level m_permissions_level_for_client_apps
Specifies level of access for Client_apps (which must, also, be in m_allowed_client_apps at any rate)...
All internal mutable state of Session_server_impl.
std::atomic< uint64_t > m_last_cli_namespace
The ID used in generating the last Server_session::cli_namespace(); so the next one = this plus 1....
boost::movelib::unique_ptr< transport::Native_socket_stream_acceptor > m_master_sock_acceptor
transport::Native_socket_stream acceptor avail throughout *this to accept init please-open-session re...
Mutex m_mutex
Protects m_incomplete_sessions. See class doc header impl section for discussion of thread design.
Incomplete_sessions m_incomplete_sessions
The set of all Incomplete_session objects such that each one comes from a distinct async_accept() req...
boost::movelib::unique_ptr< flow::async::Single_thread_task_loop > m_incomplete_session_graveyard
Mostly-idle thread that solely destroys objects removed from m_incomplete_sessions in the case where ...