29#include <boost/interprocess/sync/named_mutex.hpp>
30#include <boost/interprocess/sync/scoped_lock.hpp>
31#include <boost/move/make_unique.hpp>
142template<
typename Session_server_t,
typename Server_session_t>
144 public flow::log::Log_context,
145 private boost::noncopyable
160 using Channels =
typename Server_session_obj::Channels;
196 template<
typename Per_app_setup_func>
201 Error_code* err_code, Per_app_setup_func&& per_app_setup_func);
234 template<
typename Task_err,
235 typename N_init_channels_by_srv_req_func,
typename Mdt_load_func>
240 N_init_channels_by_srv_req_func&& n_init_channels_by_srv_req_func,
241 Mdt_load_func&& mdt_load_func,
242 Task_err&& on_done_func);
309 template<
typename Task>
347 using Mutex = flow::util::Mutex_non_recursive;
430#define TEMPLATE_SESSION_SERVER_IMPL \
431 template<typename Session_server_t, typename Server_session_t>
433#define CLASS_SESSION_SERVER_IMPL \
434 Session_server_impl<Session_server_t, Server_session_t>
437template<
typename Per_app_setup_func>
438CLASS_SESSION_SERVER_IMPL::Session_server_impl
441 Per_app_setup_func&& per_app_setup_func) :
443 flow::log::Log_context(logger_ptr,
Log_component::S_SESSION),
444 m_srv_app_ref(srv_app_ref_arg),
445 m_this_session_srv(this_session_srv_arg),
446 m_cli_app_master_set_ref(cli_app_master_set_ref),
447 m_per_app_setup_func(std::move(per_app_setup_func)),
448 m_state(std::in_place)
452 using flow::error::Runtime_error;
453 using flow::async::reset_this_thread_pinning;
454 using flow::log::Sev;
455 using boost::movelib::make_unique;
456 using boost::system::system_category;
457 using boost::io::ios_all_saver;
459 using Named_sh_mutex = boost::interprocess::named_mutex;
460 using Named_sh_mutex_ptr = boost::movelib::unique_ptr<Named_sh_mutex>;
461 using Sh_lock_guard = boost::interprocess::scoped_lock<Named_sh_mutex>;
465 m_state->m_last_cli_namespace = 0;
466 m_state->m_incomplete_session_graveyard
467 = boost::movelib::make_unique<flow::async::Single_thread_task_loop>
472 flow::util::ostream_op_string(
"SSvG-",
this));
498 if constexpr(Server_session_dtl_obj::Session_base_obj::S_MQS_ENABLED)
501 using Mq =
typename Server_session_dtl_obj::Session_base_obj::Persistent_mq_handle_from_cfg;
503 util::remove_each_persistent_with_name_prefix<Blob_stream_mq_base<Mq>>
586 const auto own_creds = Process_credentials::own_process_credentials();
589 FLOW_LOG_WARNING(
"Session acceptor [" << *
this <<
"]: Creation underway. However, just before writing "
590 "CNS (Current Namespace Store), a/k/a PID file, we determined that "
591 "the `user` aspect of our effective credentials [" << own_creds <<
"] do not match "
592 "the hard-configured value passed to this ctor: "
594 "We cannot proceed, as this would violate the security/safety model of ipc::session. "
600 const auto mutex_name = empty_session.
base().cur_ns_store_mutex_absolute_name();
602 const auto cns_path = empty_session.
base().cur_ns_store_absolute_path();
606 const auto logger_ptr = get_logger();
607 if (logger_ptr && logger_ptr->should_log(Sev::S_INFO, get_log_component()))
609 ios_all_saver saver{*(logger_ptr->this_thread_ostream())};
610 FLOW_LOG_INFO_WITHOUT_CHECKING
611 (
"Session acceptor [" << *
this <<
"]: Created. Writing CNS (Current Namespace Store), a/k/a PID "
612 "file [" << cns_path <<
"] (perms "
613 "[" << std::setfill(
'0')
615 << std::oct << cns_perms.get_permissions() <<
"], "
616 "shared-mutex name [" << mutex_name <<
"], shared-mutex perms "
618 << mutex_perms.get_permissions() <<
"]); "
619 "then listening for incoming master socket stream "
620 "connects (through Native_socket_stream_acceptor that was just cted) to address "
621 "based partially on the namespace (PID) written to that file.");
626 Named_sh_mutex_ptr sh_mutex;
628 "Server_session_impl::ctor:named-mutex-open-or-create", [&]()
630 sh_mutex = make_unique<Named_sh_mutex>(
util::OPEN_OR_CREATE, mutex_name.native_str(), mutex_perms);
640 Sh_lock_guard sh_lock(*sh_mutex);
649 const bool we_created_cns = !fs::exists(cns_path, dummy);
650 ofstream cns_file(cns_path);
668 if ((!our_err_code) && we_created_cns)
679 cns_file << empty_session.
base().srv_namespace().str() <<
'\n';
681 if (!cns_file.good())
683 const auto sys_err_code = our_err_code =
Error_code(errno, system_category());
684 FLOW_LOG_WARNING(
"Session acceptor [" << *
this <<
"]: Could not open or write CNS (PID) file "
685 "file [" << cns_path <<
"]; system error details follow.");
686 FLOW_ERROR_SYS_ERROR_LOG_WARNING();
701 *err_code = our_err_code;
705 throw Runtime_error(our_err_code, FLOW_UTIL_WHERE_AM_I_STR());
711 m_state->m_master_sock_acceptor
712 = make_unique<transport::Native_socket_stream_acceptor>
714 empty_session.
base().session_master_socket_stream_acceptor_absolute_name(),
718 if (err_code && (!*err_code))
720 m_state->m_incomplete_session_graveyard->start(reset_this_thread_pinning);
727CLASS_SESSION_SERVER_IMPL::~Session_server_impl()
729 FLOW_LOG_INFO(
"Session acceptor [" << *
this <<
"]: Shutting down. The Native_socket_stream_acceptor will now "
730 "shut down, and all outstanding Native_socket_stream_acceptor handlers and Server_session "
731 "handlers shall fire with operation-aborted error codes.");
745 if (!m_deinit_func_or_empty.empty())
747 FLOW_LOG_INFO(
"Session acceptor [" << *
this <<
"]: Continuing shutdown. A sub-class desires final de-init "
748 "work, once everything else is stopped (might be persistent resource cleanup); invoking "
750 m_deinit_func_or_empty();
751 FLOW_LOG_TRACE(
"De-init work finished.");
756template<
typename Task_err,
757 typename N_init_channels_by_srv_req_func,
typename Mdt_load_func>
762 N_init_channels_by_srv_req_func&& n_init_channels_by_srv_req_func,
763 Mdt_load_func&& mdt_load_func,
764 Task_err&& on_done_handler)
767 using flow::async::Task_asio_err;
768 using boost::make_shared;
769 using std::to_string;
772 assert(target_session);
773 assert(m_state->m_master_sock_acceptor &&
"By contract do not invoke async_accept() if ctor failed.");
780 Task_asio_err on_done_func(std::move(on_done_handler));
781 auto sock_stm = make_shared<transport::sync_io::Native_socket_stream>();
782 const auto sock_stm_raw = sock_stm.get();
784 FLOW_LOG_INFO(
"Session acceptor [" << *
this <<
"]: Async-accept request: Immediately issuing socket stream "
785 "acceptor async-accept as step 1. If that async-succeeds, we will complete the login asynchronously; "
786 "if that succeeds we will finally emit the ready-to-go session to user via handler.");
787 m_state->m_master_sock_acceptor->async_accept
789 [
this, target_session, init_channels_by_srv_req, mdt_from_cli_or_null,
790 init_channels_by_cli_req,
791 n_init_channels_by_srv_req_func = std::move(n_init_channels_by_srv_req_func),
792 mdt_load_func = std::move(mdt_load_func),
793 sock_stm = std::move(sock_stm),
794 on_done_func = std::move(on_done_func)]
828 FLOW_LOG_INFO(
"Session acceptor [" << *
this <<
"]: Async-accept request: Socket stream "
829 "acceptor async-accept succeeded resulting in socket stream [" << *sock_stm <<
"]; that was step "
830 "1. Now we create the server session and have it undergo async-login; if that succeeds we will "
831 "finally emit the ready-to-go session to user via handler.");
834 = make_shared<Server_session_dtl_obj>(get_logger(),
837 std::move(*sock_stm));
844 Lock_guard incomplete_sessions_lock(m_state->m_mutex);
845 m_state->m_incomplete_sessions.insert(incomplete_session);
851 const auto cli_it = m_cli_app_master_set_ref.find(
string(cli_app_name));
852 return (cli_it == m_cli_app_master_set_ref.end()) ?
static_cast<const Client_app*
>(0) : &cli_it->second;
864 auto pre_rsp_setup_func = [
this,
871 auto incomplete_session = incomplete_session_observer.lock();
872 assert(incomplete_session
873 &&
"The Server_session_dtl_obj cannot be dead (dtor ran), if it's invoking its async handlers OK "
874 "(and thus calling us in its async_accept_log_in() success path.");
877 const auto cli_app_ptr = incomplete_session->base().cli_app_ptr();
878 assert(cli_app_ptr &&
"async_accept_log_in() contract is to call pre_rsp_setup_func() once all "
879 "the basic elements of Session_base are known (including Client_app&).");
881 return m_per_app_setup_func(*cli_app_ptr);
898 incomplete_session->async_accept_log_in
900 init_channels_by_srv_req,
901 mdt_from_cli_or_null,
902 init_channels_by_cli_req,
904 std::move(cli_app_lookup_func),
905 std::move(cli_namespace_func),
906 std::move(pre_rsp_setup_func),
908 std::move(n_init_channels_by_srv_req_func),
909 std::move(mdt_load_func),
912 target_session, on_done_func = std::move(on_done_func)]
917 auto incomplete_session = incomplete_session_observer.lock();
918 if (incomplete_session)
923 Lock_guard incomplete_sessions_lock(m_state->m_mutex);
925 const bool erased_ok = 1 ==
927 m_state->m_incomplete_sessions.erase(incomplete_session);
928 assert(erased_ok &&
"Who else would have erased it?!");
938 m_state->m_incomplete_session_graveyard->post([incomplete_session = std::move(incomplete_session)]
942 incomplete_session.reset();
945 assert((!incomplete_session) &&
"It should've been nullified by being moved-from into the captures.");
954 &&
"The incomplete-session Server_session_dtl can only disappear under us if *this is destroyed "
955 "which can only occur <=> operation-aborted is emitted due to *this destruction destroying that "
956 "incomplete-session.");
964 on_done_func(async_err_code);
968 assert(incomplete_session);
971 FLOW_LOG_INFO(
"Session acceptor [" << *
this <<
"]: Async-accept request: Successfully resulted in logged-in "
972 "server session [" << *incomplete_session <<
"]. Feeding to user via callback.");
976 *target_session = std::move(*(
static_cast<Server_session_obj*
>(incomplete_session.get())));
979 on_done_func(async_err_code);
980 FLOW_LOG_TRACE(
"Handler finished.");
986void CLASS_SESSION_SERVER_IMPL::to_ostream(std::ostream* os)
const
988 *os <<
'[' << m_srv_app_ref <<
"]@" <<
static_cast<const void*
>(
this);
992typename CLASS_SESSION_SERVER_IMPL::Session_server_obj* CLASS_SESSION_SERVER_IMPL::this_session_srv()
994 return m_this_session_srv;
998template<
typename Task>
999void CLASS_SESSION_SERVER_IMPL::sub_class_set_deinit_func(
Task&& task)
1001 m_deinit_func_or_empty = std::move(task);
1008 val.to_ostream(&os);
1012#undef CLASS_SESSION_SERVER_IMPL
1013#undef TEMPLATE_SESSION_SERVER_IMPL
This is the data-less sub-class of Server_session or any more-advanced (e.g., SHM-capable) variant th...
const Session_base_obj & base() const
Provides const access to Session_base super-object.
Internal class template comprising API/logic common to every Session_server variant,...
void to_ostream(std::ostream *os) const
See Server_session method.
boost::shared_ptr< Server_session_dtl_obj > Incomplete_session
Internally used ref-counted handle to a Server_session_dtl_obj, suitable for capturing and passing ar...
void sub_class_set_deinit_func(Task &&task)
Utility for sub-classes: ensures that task() is invoked near the end of *this dtor's execution,...
Session_server_t Session_server_obj
See this_session_srv().
boost::unordered_set< Incomplete_session > Incomplete_sessions
Short-hand for set of Incomplete_session, with fast insertion and removal by key Incomplete_session i...
flow::util::Lock_guard< Mutex > Lock_guard
Short-hand for Mutex lock.
void async_accept(Server_session_obj *target_session, Channels *init_channels_by_srv_req, Mdt_reader_ptr *mdt_from_cli_or_null, Channels *init_channels_by_cli_req, N_init_channels_by_srv_req_func &&n_init_channels_by_srv_req_func, Mdt_load_func &&mdt_load_func, Task_err &&on_done_func)
See Session_server method.
const Client_app::Master_set & m_cli_app_master_set_ref
See ctor.
const Server_app & m_srv_app_ref
See Session_server public data member.
typename Server_session_obj::Mdt_reader_ptr Mdt_reader_ptr
Short-hand for Session_mv::Mdt_reader_ptr.
Function< void()> m_deinit_func_or_empty
See sub_class_set_deinit_func(). .empty() unless that was called at least once.
typename Server_session_obj::Channels Channels
Short-hand for Session_mv::Channels.
Session_server_impl(flow::log::Logger *logger_ptr, Session_server_obj *this_session_srv_arg, const Server_app &srv_app_ref, const Client_app::Master_set &cli_app_master_set_ref, Error_code *err_code, Per_app_setup_func &&per_app_setup_func)
See Session_server ctor; it does that.
Session_server_obj *const m_this_session_srv
See this_session_srv().
std::optional< State > m_state
See State.
Session_server_obj * this_session_srv()
Returns pointer to the object that is privately sub-classing us.
const Function< Error_code(const Client_app &client_app)> m_per_app_setup_func
See ctor.
~Session_server_impl()
See Session_server dtor.
boost::weak_ptr< Server_session_dtl_obj > Incomplete_session_observer
weak_ptr observer of an Incomplete_session.
flow::util::Mutex_non_recursive Mutex
Short-hand for State::m_mutex type.
To be instantiated typically once in a given process, an object of this type asynchronously listens f...
Base of Blob_stream_mq_sender and Blob_stream_mq_receiver containing certain static facilities,...
Implements both sync_io::Native_handle_sender and sync_io::Native_handle_receiver concepts by using a...
A process's credentials (PID, UID, GID as of this writing).
String-wrapping abstraction representing a name uniquely distinguishing a kernel-persistent entity fr...
static Shared_name ct(const Source &src)
Copy-constructs from a char-sequence container (including string, util::String_view,...
@ S_RESOURCE_OWNER_UNEXPECTED
A resource in the file system (file, SHM pool, MQ, etc.) has or could have unexpected owner; ipc::ses...
@ S_MUTEX_BIPC_MISC_LIBRARY_ERROR
Low-level boost.ipc.mutex: boost.interprocess emitted miscellaneous library exception sans a system c...
@ S_OBJECT_SHUTDOWN_ABORTED_COMPLETION_HANDLER
Async completion handler is being called prematurely, because underlying object is shutting down,...
Flow-IPC module providing the broad lifecycle and shared-resource organization – via the session conc...
void ensure_resource_owner_is_app(flow::log::Logger *logger_ptr, const fs::path &path, const App &app, Error_code *err_code)
Utility, used internally but exposed in public API in case it is of general use, that checks that the...
Shared_name build_conventional_shared_name_prefix(const Shared_name &resource_type, const Shared_name &srv_app_name)
Return the prefix common to all calls to either build_conventional_shared_name() overload with the ar...
std::ostream & operator<<(std::ostream &os, const App &val)
Prints string representation of the given App to the given ostream.
@ S_OBJECT_SHUTDOWN_ABORTED_COMPLETION_HANDLER
Async completion handler is being called prematurely, because underlying object is shutting down,...
flow::async::Task Task
Short-hand for polymorphic function (a-la std::function<>) that takes no arguments and returns nothin...
const Open_or_create OPEN_OR_CREATE
Tag value indicating an open-if-exists-else-create operation.
const boost::array< Permissions, size_t(Permissions_level::S_END_SENTINEL)> PRODUCER_CONSUMER_RESOURCE_PERMISSIONS_LVL_MAP
Maps general Permissions_level specifier to low-level Permissions value, when the underlying resource...
void set_resource_permissions(flow::log::Logger *logger_ptr, const fs::path &path, const Permissions &perms, Error_code *err_code)
Utility that sets the permissions of the given resource (at the supplied file system path) to specifi...
void op_with_possible_bipc_exception(flow::log::Logger *logger_ptr, Error_code *err_code, const Error_code &misc_bipc_lib_error, String_view context, const Func &func)
Internal (to ipc) utility that invokes the given function that invokes a boost.interprocess operation...
Permissions shared_resource_permissions(Permissions_level permissions_lvl)
Maps general Permissions_level specifier to low-level Permissions value, when the underlying resource...
flow::util::String_view String_view
Short-hand for Flow's String_view.
Log_component
The flow::log::Component payload enumeration containing various log components used by Flow-IPC inter...
flow::Error_code Error_code
Short-hand for flow::Error_code which is very common.
flow::Function< Signature > Function
Short-hand for polymorphic functor holder which is very common. This is essentially std::function.
#define TEMPLATE_SESSION_SERVER_IMPL
Internally used macro; public API users should disregard (same deal as in struc/channel....
#define CLASS_SESSION_SERVER_IMPL
Internally used macro; public API users should disregard (same deal as in struc/channel....
util::group_id_t m_group_id
The application must run as this group ID (GID). Files and other shared resources shall have this own...
std::string m_name
Brief application name, readable to humans and unique across all other applications' names; used both...
util::user_id_t m_user_id
The application must run as this user ID (UID). Files and other shared resources shall have this owne...
An App that is used as a client in at least one client-server IPC split.
boost::unordered_map< std::string, Client_app > Master_set
Suggested type for storing master repository or all Client_appss. See App doc header for discussion.
An App that is used as a server in at least one client-server IPC split.
util::Permissions_level m_permissions_level_for_client_apps
Specifies level of access for Client_apps (which must, also, be in m_allowed_client_apps at any rate)...
All internal mutable state of Session_server_impl.
std::atomic< uint64_t > m_last_cli_namespace
The ID used in generating the last Server_session::cli_namespace(); so the next one = this plus 1....
boost::movelib::unique_ptr< transport::Native_socket_stream_acceptor > m_master_sock_acceptor
transport::Native_socket_stream acceptor avail throughout *this to accept init please-open-session re...
Mutex m_mutex
Protects m_incomplete_sessions. See class doc header impl section for discussion of thread design.
Incomplete_sessions m_incomplete_sessions
The set of all Incomplete_session objects such that each one comes from a distinct async_accept() req...
boost::movelib::unique_ptr< flow::async::Single_thread_task_loop > m_incomplete_session_graveyard
Mostly-idle thread that solely destroys objects removed from m_incomplete_sessions in the case where ...