Flow-IPC 1.0.2
Flow-IPC project: Full implementation reference.
session_server_impl.hpp
Go to the documentation of this file.
1/* Flow-IPC: Sessions
2 * Copyright 2023 Akamai Technologies, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the
5 * "License"); you may not use this file except in
6 * compliance with the License. You may obtain a copy
7 * of the License at
8 *
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in
12 * writing, software distributed under the License is
13 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
14 * CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing
16 * permissions and limitations under the License. */
17
18/// @file
19#pragma once
20
23#include "ipc/session/error.hpp"
24#include "ipc/session/app.hpp"
29#include <boost/interprocess/sync/named_mutex.hpp>
30#include <boost/interprocess/sync/scoped_lock.hpp>
31#include <boost/move/make_unique.hpp>
32
33namespace ipc::session
34{
35
36// Types.
37
38/**
39 * Internal class template comprising API/logic common to every Session_server variant, meant to be
40 * `private`ly sub-classed and largely forwarded. In particular the vanilla Session_server
41 * (see its short Implementation doc header section) sub-classes us and makes no use of the available
42 * customization points. Contrast with, e.g., shm::classic::Session_server which uses customization
43 * points for shm::classic SHM-arena setup.
44 *
45 * The available customization points are as follows.
46 * - Per-session: `*this` is parameterized on #Server_session_obj. The vanilla value is `Server_session<...>`;
47 * but to add capabilities sub-class Server_session_impl as explained in its doc header and proceed from
48 * there. For example see shm::classic::Server_session. Server_session_impl has its own customization point(s).
49 * - No-customization = specify #Server_session here.
50 * - Cross-session: Additional per-Client_app setup (such as setting up cross-session SHM arena(s) in
51 * on-demand fashion) can be specified by passing in `per_app_setup_func` to ctor.
52 * - No-customization = specify do-nothing function here that returns `Error_code()` always.
53 * - Cross-session: Certain custom code can be caused to run as the last thing in the Server_session_impl dtor,
54 * even after all its state has been destroyed (e.g., all threads have been stopped/joined).
55 * A sub-class may use this customization point by calling sub_class_set_deinit_func(). If not called, the
56 * customization point is unused by default.
57 *
58 * ### Implementation design ###
59 * Generally the impl should be reasonably easy to follow by reading the method bodies, at least if one understands
60 * the general ipc::session client-server paradigm.
61 *
62 * The thing to understand strategically, as usual, is the thread design. This is an acceptor, so one might
63 * have expected it to be implemented similarly to, say, Native_socket_stream_acceptor -- maintaining a thread W
64 * in which to do some/most work; including maintaining a "deficit" queue of oustanding async_accept() requests
65 * and a "surplus" queue of ready `Server_session`s to emit. This was an option, but I (ygoldfel) felt that
66 * piggy-backing handling of events directly onto the unspecified handler-invoking threads of the internally
67 * used objects would produce a much simpler data structure and state machine. (As a side effect, the behavior
68 * described in "FIFO" section above occurs. Also as a side effect, the error-emission behavior described
69 * in "Error handling" above occurs. Basically: each async_accept()'s internal handling is independent of the
70 * others. They share (almost) no state; there is no single async-chain and explicit queues unlike in various other
71 * boost.asio-like classes in ::ipc. Each async_accept() triggers async op 1, the handler for which triggers async
72 * op 2, the handler for which emits the result to user. This is
73 * simpler to implement, but it also results in sensible API contract behavior, I feel.)
74 *
75 * Here is how it works.
76 *
77 * For each potential `Server_session` -- i.e., for each async_accept() -- there are 2 steps that must occur
78 * asynchronously before one is ready to emit to the user-supplied handler:
79 * - Our stored Native_socket_stream_acceptor, where we invoke
80 * transport::Native_socket_stream_acceptor::async_accept(), must emit a PEER-state (connected)
81 * transport::sync_io::Native_socket_stream (the opposing peer object living inside the opposing Client_session).
82 * - Now a Server_session_dtl may be constructed (not yet emitted to user) and then
83 * Server_session_dtl::async_accept_log_in() is invoked.
84 * - Or if the socket-accept failed, then we can emit that to the user already; done.
85 * - That Server_session_dtl::async_accept_log_in() must complete the async log-in exchange against the
86 * opposing Client_session.
87 * - Now the Server_session_dtl can be converted via pointer `static_cast<>` to `Server_session` and
88 * emitted to the user.
89 * - Or if the log-in fails at some state, then we can emit that to the user.
90 *
91 * Nomenclature: We call the `N_s_s_a::async_accept()` handler-invoking thread: Wa. It is officially an
92 * unspecified thread or threads, but, by contract, handlers are executed non-concurrently, so it can be
93 * considered one thread (and actually it is as of this writing). We call the
94 * Server_session::async_accept_log_in() handler-invoking thread: Ws. This is really a separate thread for
95 * each `Server_session`, so in fact 2 different async-accept requests can cause 2 handlers to invoke simultaneously.
96 *
97 * So each time one calls async_accept() from thread U (i.e., user thread(s) from which they must never invoke
98 * mutating stuff concurrently), that kicks off transport::Native_socket_stream_acceptor::async_accept(),
99 * which fires handler in thread Wa; we then kick off `Server_session::async_accept_log_in()`, which fires
100 * handler in thread Ws, where we finalize the `Server_session` and emit it to user. There is no cross-posting
101 * to some other worker thread W (but read on).
102 *
103 * The obvious question is, do the handlers, some of which (as noted) may run concurrently to each other
104 * (request 1's Ws handler can co-execute with request 2's Wa handler; and request 2's Wa handler can co-execute
105 * with request 3's Wa handler), mess each other over by mutatingly accessing common data? Let's consider the
106 * state involved.
107 *
108 * For each async-accept request, the amassed data are independent from any other's; they are passed around
109 * throughout the 2 async ops per request via lambda captures. There is, however, one caveat to this:
110 * Suppose `S->accept_log_in(F)` is invoked on not-yet-ready (incomplete) `Server_session* S`; suppose F is invoked
111 * with a truthy #Error_code (it failed). We are now sitting in thread Ws: and S should be destroyed.
112 * But invoking dtor of S from within S's own handler is documented to be not-okay and results in
113 * a deadlock/infinite dtor execution, or if the system can detect it, at best an abort due to a thread trying to
114 * join itself. So:
115 * - We maintain State::m_incomplete_sessions storing each such outstanding S. If dtor runs, then all S will be
116 * auto-destroyed which will automatically invoke the user handler with operation-aborted.
117 * - If an incomplete (oustanding) S successfully completes log-in, we remove it from
118 * Session_server_impl::m_incomplete_sessions and emit it to user via handler.
119 * - If it completes log-in with failure, we remove it from State::m_incomplete_sessions and then:
120 * - hand it off to a mostly-idle separate thread, State::m_incomplete_session_graveyard, which can run S's dtor
121 * in peace without deadlocking anyone. (If `*this` dtor runs before then, the S dtors will still run, as each
122 * queued lambda's captures are destroyed.)
123 * - This is the part that avoids the deadlock. The other things above are orthogonally needed for the promised
124 * boost.asio-like semantics, where handler must run exactly once eventually, from dtor at the latest.
125 *
126 * However note that State::m_incomplete_sessions is added-to in thread Wa but removed-from in various threads Ws.
127 * Therefore it is protected by a mutex; simple enough.
128 *
129 * @todo Session_server, probably in ctor or similar, should -- for safety -- enforce the accuracy
130 * of Server_app attributes including App::m_exec_path, App::m_user_id, App::m_group_id. As of this writing
131 * it enforces these things about each *opposing* Client_app and process -- so for sanity it can/should do so
132 * about itself, before the sessions can begin.
133 *
134 * @tparam Server_session_t
135 * See #Server_session_obj. Its API must exactly equal (or be a superset of) that of vanilla #Server_session.
136 * (Its impl may perform extra steps; for example `async_accept_log_in()` might set up a per-session SHM
137 * arena.)
138 * @tparam Session_server_t
139 * The class that is in fact `private`ly sub-classing us.
140 * This is necessary for this_session_srv(). See its doc header for discussion.
141 */
142template<typename Session_server_t, typename Server_session_t>
144 public flow::log::Log_context,
145 private boost::noncopyable
146{
147public:
148 // Types.
149
150 /// See this_session_srv().
151 using Session_server_obj = Session_server_t;
152
153 /// Useful short-hand for the concrete `Server_session` type emitted by async_accept().
155
156 /// Short-hand for Session_mv::Mdt_reader_ptr.
157 using Mdt_reader_ptr = typename Server_session_obj::Mdt_reader_ptr;
158
159 /// Short-hand for Session_mv::Channels.
160 using Channels = typename Server_session_obj::Channels;
161
162 // Constructors/destructor.
163
164 /**
165 * See Session_server ctor; it does that. In addition:
166 *
167 * takes and memorizes a functor that takes a Client_app const ref that identifies the app that wants to
168 * open the session, performs unspecified synchronous steps, and returns an #Error_code indicating success or
169 * reason for failure which dooms that async_accept().
170 *
171 * ### Rationale for `per_app_setup_func` ###
172 * It is not intended for per-session setup. #Server_session_dtl_obj should take care of that where it makes
173 * sense -- it does after all represent the individual budding session peer. However our sub-class
174 * (e.g., shm::classic::Session_server) may need to keep track of per-distinct-Client_app resources
175 * (e.g., the per-app-scope SHM arena) which must exist before the opposing Client_session-type object
176 * completes its setup (e.g., by opening the aforementioned per-Client_app/multi-instance-scope SHM arena).
177 * It can detect a new Client_app is logging-in and set that up in the nick of time.
178 *
179 * @tparam Per_app_setup_func
180 * See above. Signature: `Error_code F(const Client_app&)`.
181 * @param logger_ptr
182 * See Session_server ctor.
183 * @param srv_app_ref
184 * See Session_server ctor.
185 * @param cli_app_master_set_ref
186 * See Session_server ctor.
187 * @param err_code
188 * See Session_server ctor. Additional #Error_code generated:
189 * see `per_app_setup_func`.
190 * @param per_app_setup_func
191 * See above.
192 * @param this_session_srv_arg
193 * The object that is, in fact, `private`ly sub-classing `*this` (and calling this ctor).
194 * See this_session_srv(). The value is only saved but not dereferenced inside the ctor.
195 */
196 template<typename Per_app_setup_func>
197 explicit Session_server_impl(flow::log::Logger* logger_ptr,
198 Session_server_obj* this_session_srv_arg,
199 const Server_app& srv_app_ref,
200 const Client_app::Master_set& cli_app_master_set_ref,
201 Error_code* err_code, Per_app_setup_func&& per_app_setup_func);
202
203 /// See Session_server dtor.
205
206 // Methods.
207
208 /**
209 * See Session_server method. In addition: invokes `per_app_setup_func()` (from ctor)
210 * once the connecting Client_app becomes known; if that returns truthy #Error_code then this method
211 * emits that error.
212 *
213 * @tparam Task_err
214 * See Session_server method.
215 * @tparam N_init_channels_by_srv_req_func
216 * See Session_server method.
217 * @tparam Mdt_load_func
218 * See Session_server method.
219 * @param target_session
220 * See Session_server method.
221 * @param init_channels_by_srv_req
222 * See Session_server method.
223 * @param mdt_from_cli_or_null
224 * See Session_server method.
225 * @param init_channels_by_cli_req
226 * See Session_server method.
227 * @param n_init_channels_by_srv_req_func
228 * See Session_server method.
229 * @param mdt_load_func
230 * See Session_server method.
231 * @param on_done_func
232 * See Session_server method.
233 */
234 template<typename Task_err,
235 typename N_init_channels_by_srv_req_func, typename Mdt_load_func>
236 void async_accept(Server_session_obj* target_session,
237 Channels* init_channels_by_srv_req,
238 Mdt_reader_ptr* mdt_from_cli_or_null,
239 Channels* init_channels_by_cli_req,
240 N_init_channels_by_srv_req_func&& n_init_channels_by_srv_req_func,
241 Mdt_load_func&& mdt_load_func,
242 Task_err&& on_done_func);
243
244 /**
245 * See Server_session method.
246 *
247 * @param os
248 * See Server_session method.
249 */
250 void to_ostream(std::ostream* os) const;
251
252 /**
253 * Returns pointer to the object that is `private`ly sub-classing us. In other words this equals
254 * `static_cast<const Session_server_obj*>(this)`, where this class is the base of #Session_server_obj,
255 * but up-casting from a `private` base is not allowed.
256 *
257 * ### Rationale ###
258 * I (ygoldfel) acknowledge this is somewhat odd. Why should a sub-class, per se, care or know about its
259 * super-class? This at least vaguely indicates some sort of design mistake. In fact this is needed, as of
260 * this writing, because shm::classic::Server_session_impl::async_accept_log_in() gets a Session_server_impl ptr,
261 * which it knows points to an object that's really the core of a shm::classic::Session_server, and it needs to
262 * interact with the SHM-classic-specific aspect of that guy's API. So it calls this accessor here, essentially
263 * as a way to up-cast from a `private` base (which is not allowed by C++).
264 * Why can't it "just" take a `shm::classic::Session_server*` then? Answer: because Session_server_impl uses, like,
265 * `virtual`-less polymorphism to invoke `async_accept_log_in()` regardless of which object it's calling it on....
266 * It's hard to summarize here in words in any way that'll make sense, but if one looks at the relevant code
267 * it makes sense. Eventually. Bottom line is, this way, it can just pass-in `this`, and then
268 * shm::classic::Server_session_impl::async_accept_log_in() can call this_session_srv() to get at the super-class
269 * version of `this`.
270 *
271 * I feel it is not criminal -- internal things are working together in a way that they logically intend to --
272 * but intuitively it feels like there's a smoother way to design it. Probably.
273 *
274 * @todo Reconsider the details of how classes in the non-`virtual` hierarchies
275 * `Session_server`, `Server_session`, `Session_server_impl`, `Server_session_impl` cooperate internally,
276 * as there is some funky stuff going on, particularly Session_server_impl::this_session_srv().
277 *
278 * @return See above.
279 */
281
282 // Data.
283
284 /// See Session_server public data member.
286
287protected:
288 // Methods.
289
290 /**
291 * Utility for sub-classes: ensures that `task()` is invoked near the end of `*this` dtor's execution, after *all*
292 * other (mutable) state has been destroyed, including stopping/joining any threads performing
293 * async async_accept() ops. It may be invoked at most once.
294 *
295 * The value it adds: A long story best told by specific example. See the original use case which is
296 * in shm::classic::Session_server; it sets up certain SHM cleanup steps to occur,
297 * when the session-server is destroyed.
298 *
299 * ### Watch out! ###
300 * At the time `task()` runs, the calling instance of the sub-class has been destroyed -- thus it is, e.g.,
301 * usually wrong to capture your `this` in the `task` lambda, except for logging.
302 * `get_logger()` and `get_log_component()` (which are in this super-class) are still okay to use.
303 *
304 * @tparam Task
305 * Function object invoked as `void` with no args.
306 * @param task
307 * `task()` shall execute before dtor returns.
308 */
309 template<typename Task>
311
312private:
313 // Types.
314
315 /**
316 * Short-hand for concrete Server_session_dtl type, which each async_accept() creates internally, completes
317 * the log-in process upon, and then up-casts to #Server_session_obj to emit to user via move-assignment.
318 * #Server_session_dtl_obj is equal to #Server_session_obj -- it adds no data -- but exposes certain
319 * internally invoked APIs that the user shall not access.
320 */
322
323 /**
324 * Internally used ref-counted handle to a #Server_session_dtl_obj, suitable for capturing and passing around
325 * lambdas.
326 *
327 * ### Rationale ###
328 * It is `shared_ptr`, not `unique_ptr`, for two reasons. Primarily, it is so that it can be captured
329 * via #Incomplete_session_observer to avoid a leak that would result from capturing #Incomplete_session
330 * in a lambda passed-to an async op on an #Incomplete_session *itself*. `unique_ptr` cannot be observed
331 * via `weak_ptr`; `shared_ptr` can.
332 *
333 * Secondarily, a `unique_ptr` cannot be captured in a lambda in the first place.
334 */
335 using Incomplete_session = boost::shared_ptr<Server_session_dtl_obj>;
336
337 /**
338 * `weak_ptr` observer of an #Incomplete_session. Capturing this, instead of #Incomplete_session itself,
339 * allows for the underlying #Incomplete_session to be destroyed while the lambda still exists.
340 */
341 using Incomplete_session_observer = boost::weak_ptr<Server_session_dtl_obj>;
342
343 /// Short-hand for set of #Incomplete_session, with fast insertion and removal by key #Incomplete_session itself.
344 using Incomplete_sessions = boost::unordered_set<Incomplete_session>;
345
346 /// Short-hand for State::m_mutex type.
347 using Mutex = flow::util::Mutex_non_recursive;
348
349 /// Short-hand for #Mutex lock.
350 using Lock_guard = flow::util::Lock_guard<Mutex>;
351
352 /**
353 * All internal mutable state of Session_server_impl. It's grouped into this inner `struct`, so that we can
354 * destroy it in the destructor via an `optional::reset()`, letting items be destroyed automatically in
355 * the opposite order in which they were constructed -- and *then* perform certain final steps when
356 * that state (including various threads running) is no longer a factor. See Session_server_impl dtor
357 * and sub_class_set_deinit_func().
358 */
359 struct State
360 {
361 /// The ID used in generating the last Server_session::cli_namespace(); so the next one = this plus 1. 0 initially.
362 std::atomic<uint64_t> m_last_cli_namespace;
363
364 /**
365 * The set of all #Incomplete_session objects such that each one comes from a distinct async_accept() request
366 * that (1) has accepted a transport::sync_io::Native_socket_stream connection and thus created a
367 * #Server_session_dtl_obj but (2) whose Server_session_dtl::async_accept() has not yet completed (fired handler).
368 *
369 * Protected by #m_mutex; accessed from thread Wa and threads Ws. See class doc header impl section for
370 * discussion of thread design.
371 *
372 * ### Impl design ###
373 * Once a Server_session_dtl is created by `*this`, we do `async_accept_log_in()` on it. We cannot capture that
374 * Server_session_dtl itself into its handler's lambda, as that would create a cycle leak wherein
375 * the object can never be destroyed. (This is typical when maintaining a boost.asio-style I/O object by
376 * a `shared_ptr` handle.) So we capture an #Incomplete_session_observer (`weak_ptr`) thereof; but store
377 * the Server_session_dtl (a/k/a #Incomplete_session) here in `*this`, as of course it must be stored somewhere.
378 * Then if `*this` dtor is invoked before the aforementioned `async_accept_log_in()` handler fires, the handler
379 * shall fire with operation-aborted as desired.
380 *
381 * ### Ordering caveat ###
382 * As of this writing, since the dtor auto-destroys the various members as opposed to any manual ordering thereof:
383 * This must be declared before #m_master_sock_acceptor. Then when dtor runs, first the latter's thread Wa
384 * shall be joined as it is destroyed first. Then #m_incomplete_sessions shall be destroyed next. Thus there
385 * is zero danger of concurrent access to #m_incomplete_sessions. Obviously the dtor's auto-destruction
386 * of #m_incomplete_sessions is not protected by any lock.
387 *
388 * We could instead manually destroy stuff in the proper order. I (ygoldfel) do like to just rely on
389 * auto-destruction (in order opposite to declaration/initialization) to keep things clean.
390 */
392
393 /// transport::Native_socket_stream acceptor avail throughout `*this` to accept init please-open-session requests.
394 boost::movelib::unique_ptr<transport::Native_socket_stream_acceptor> m_master_sock_acceptor;
395
396 /// Protects `m_incomplete_sessions`. See class doc header impl section for discussion of thread design.
397 mutable Mutex m_mutex;
398
399 /**
400 * Mostly-idle thread that solely destroys objects removed from `m_incomplete_sessions` in the case where a
401 * Server_session_dtl::async_accept_log_in() failed as opposed to succeeded (in which case it is emitted to user).
402 */
403 boost::movelib::unique_ptr<flow::async::Single_thread_task_loop> m_incomplete_session_graveyard;
404 }; // struct State
405
406 // Data.
407
408 /// See this_session_srv().
410
411 /// See ctor.
413
414 /// See ctor.
416
417 /// See State.
418 std::optional<State> m_state;
419
420 /// See sub_class_set_deinit_func(). `.empty()` unless that was called at least once.
422}; // class Session_server_impl
423
424// Free functions: in *_fwd.hpp.
425
426// Template implementations.
427
428/// Internally used macro; public API users should disregard (same deal as in struc/channel.hpp).
429#define TEMPLATE_SESSION_SERVER_IMPL \
430 template<typename Session_server_t, typename Server_session_t>
431/// Internally used macro; public API users should disregard (same deal as in struc/channel.hpp).
432#define CLASS_SESSION_SERVER_IMPL \
433 Session_server_impl<Session_server_t, Server_session_t>
434
436template<typename Per_app_setup_func>
437CLASS_SESSION_SERVER_IMPL::Session_server_impl
438 (flow::log::Logger* logger_ptr, Session_server_obj* this_session_srv_arg, const Server_app& srv_app_ref_arg,
439 const Client_app::Master_set& cli_app_master_set_ref, Error_code* err_code,
440 Per_app_setup_func&& per_app_setup_func) :
441
442 flow::log::Log_context(logger_ptr, Log_component::S_SESSION),
443 m_srv_app_ref(srv_app_ref_arg), // Not copied!
444 m_this_session_srv(this_session_srv_arg),
445 m_cli_app_master_set_ref(cli_app_master_set_ref), // Ditto!
446 m_per_app_setup_func(std::move(per_app_setup_func)),
447 m_state(std::in_place) // Default-ct State; initialize contents, further, just below.
448{
451 using flow::error::Runtime_error;
452 using boost::movelib::make_unique;
453 using boost::system::system_category;
454 using boost::io::ios_all_saver;
455 using fs::ofstream;
456 using Named_sh_mutex = boost::interprocess::named_mutex;
457 using Named_sh_mutex_ptr = boost::movelib::unique_ptr<Named_sh_mutex>;
458 using Sh_lock_guard = boost::interprocess::scoped_lock<Named_sh_mutex>;
459 // using ::errno; // It's a macro apparently.
460
461 // Finish setting up m_state. See State members in order and deal with the ones needing explicit init.
462 m_state->m_last_cli_namespace = 0;
463 m_state->m_incomplete_session_graveyard = boost::movelib::make_unique<flow::async::Single_thread_task_loop>
464 (get_logger(),
465 flow::util::ostream_op_string("srv_sess_acc_graveyard[", *this, "]"));
466
467 /* This is a (as of this writing -- the) *cleanup point* for any MQs previously created on behalf of this
468 * Server_app by previous active processes before us; namely when either a Server_session or opposing Client_session
469 * performs open_channel() (or pre-opens channel(s) during session creation), so the Server_session_impl
470 * creates the actual `Persistent_mq_handle`s via its ctor in create-only mode. These underlying MQs
471 * are gracefully cleaned up in Blob_stream_mq_send/receiver dtors (see their doc headers). This cleanup point is a
472 * best-effort attempt to clean up anything that was skipped due to one or more such destructors never getting
473 * to run (due to crash, abort, etc.). Note that Blob_stream_mq_send/receiver doc headers explicitly explain
474 * the need to worry about this contingency.
475 *
476 * We simply delete everything with the Shared_name prefix used when setting up the MQs
477 * (see Server_session_impl::make_channel_mqs()). The prefix is everything up-to (not including) the PID
478 * (empty_session.base().srv_namespace() below). Our own .srv_namespace() is
479 * just about to be determined and is unique across time by definition (internally, it's -- again -- our PID);
480 * so any existing MQs are by definition old. Note that as of this writing there is at most *one* active
481 * process (instance) of a given Server_app.
482 *
483 * Subtlety (kind of): We worry about such cleanup only if some type of MQ is in fact enabled at compile-time
484 * of *this* application; and we only clean up that MQ type, not the other(s) (as of this writing there are 2,
485 * but that could change conceivably). If the MQ type is changed (or MQs disabled) after a crash/abort, there
486 * could be a leak. We could also indiscriminately clean-up all known MQ types here; that would be fine.
487 * @todo Maybe we should. I don't know. shm::classic::Session_server ctor's pool cleanup point is only invoked,
488 * if that is the type of Session_server user chose at compile-time, so we are just following that example.
489 * Doing it this way strikes me as cleaner code, and the combination of a crash/abort and changed software
490 * "feels" fairly minor. */
491 if constexpr(Server_session_dtl_obj::Session_base_obj::S_MQS_ENABLED)
492 {
494 using Mq = typename Server_session_dtl_obj::Session_base_obj::Persistent_mq_handle_from_cfg;
495
496 util::remove_each_persistent_with_name_prefix<Blob_stream_mq_base<Mq>>
497 (get_logger(),
498 build_conventional_shared_name_prefix(Mq::S_RESOURCE_TYPE_ID,
500 }
501
502 /* We want to write to CNS (PID file) just below, but what is its location, and the name of its associated
503 * inter-process mutex, and for that matter the contents (formally, the Current Namespace; really the PID)?
504 * Well, this is slightly cheesy, arguably, but any Server_session we produce will need all those values,
505 * and can compute them by itself, and they'll always be the same in this process (app instance), so let's
506 * just make this short-lived dummy Server_session_dtl and get the stuff out of there. Code reuse = pretty good. */
507 const Server_session_dtl_obj empty_session(nullptr, m_srv_app_ref, Native_socket_stream());
508
509 Error_code our_err_code;
510
511 /* Owner/mode discussion:
512 * ipc::session operates in a certain model (design doc is elsewhere/outside our scope here to fully justify)
513 * that requires, for security/safety:
514 * - Owner has the specific UID:GID registered under Server_app. If we are who we are supposed to be,
515 * this will occur automatically as we create a resource. Namely we have two relevant resources in here:
516 * - CNS (PID) file. Some reasons this could fail: if file already existed
517 * *and* was created by someone else; if we have the proper UID but are also in some other group or something;
518 * and lastly Server_app misconfiguration. Mitigation: none. For one of those we could do an owner-change
519 * call to change the group, but for now let's say it's overkill, and actually doing so might hide
520 * a problem in the environment: let's not even risk that stuff.
521 * - Associated shared mutex (in Linux apparently a semaphore thingie). This is an interesting situation;
522 * it is not mentioned in the aforementioned design in detail -- too much of an impl detail for that --
523 * so let's consider it. Should the Server_app UID:GID apply to it too? Actually not quite: the way we use it,
524 * it's a symmetrically shared resource (read/write for both us and client), but more to the point
525 * the *client* is *allowed* to create it (hence OPEN_OR_CREATE both here and in Client_session_impl): it's
526 * accessed in order to get one's turn at accessing the CNS file, and to be "accessed" it must be created
527 * as needed (and then it'll keep existing until reboot). So actually the *GID* should be correct
528 * according to Server_app, but the UID can be, in fact, Server_app's -- or Client_app's, or *another*
529 * Client_app's entirely, or any number of them!
530 * - So should we check something about the mutex's UID/GID then?
531 * - It would probably be not bad to check for the GID. It is even conceivable to check the UID as that
532 * of one of the allowed `Client_app`s.
533 * - I (ygoldfel) feel it's overkill. I could be wrong, but it just feels insane: the mutex is only a way
534 * to access CNS file in orderly fashion without concurrency issues. Either it works, or it doesn't
535 * work; trying to sanity-check that the right UID/GID owns it is going beyond the spirit of the design:
536 * to make ascertain that "certain model" of trust/safety/security. We already do that with CNS itself;
537 * we don't need to be paranoid about the-thing-that-is-needed-to-use-CNS.
538 * - The mode is as dictated by Server_app::m_permissions_level_for_client_apps. This we can and should
539 * ensure via a mode-set/change call. There are subtleties about how to do that, but they're discussed
540 * near the call sites below. As for now: We have two relevant resources in here, again:
541 * - CNS (PID) file. Yes, indeed, we set its permissions below.
542 * - Associated shared mutex. See above. So, actually, we sets its permissions below too. That is actually
543 * (though unmentioned in the design) a pretty good idea for the "certain model" in the design:
544 * If, say, it's set to unrestricted access, then any user could just acquire the lock... and block IPC
545 * from proceeding, ever, for anyone else wishing to work with that Server_app. So, certainly,
546 * it should be locked down to the same extent as CNS itself. What's interesting about that is that
547 * a client, too, can create it (again, see above) -- and thus needs to set some kind of sensible
548 * permissions on creation (if applicable). And so it does... by checking Server_app [sic] as well and
549 * indeed setting that level of permissions. Does it make sense? Yes. Here are the scenarios specifically:
550 * - If the access level in Server_app is set to NO_ACCESS: Well then no one can access it. LoL. Next.
551 * - If UNRESTRICTED: Well then it's meant to be world-accessible! Yay! LoL again! Next.
552 * - If USER_ACCESS: Logically, for CNS to be accessible, if only the server UID is allowed access, then
553 * for it to interoperate with any clients, the clients must also be of that UID. So the client
554 * setting the mode to *its* UID should accomplish the same as if server had done it.
555 * - If GROUP_ACCESS (the approach mandated by the design, though we don't assume it, hence the
556 * Server_app configuration): Same logic but applied to GID.
557 * Anyway, here on the server it's simple; we should set-mode similarly to how we do for CNS.
558 * The bulk of the above long explanation is why the client does it. The comment in Client_session_impl
559 * points back to right here to avoid duplication.
560 *
561 * So let's do that stuff below. */
562
563 /* Ensure our effective user is as configured in Server_app. We do check this value on the CNS (PID) file
564 * anyway; but this is still a good check because:
565 * - We might not be creating the file ourselves (first guy to get to it since boot is).
566 * - It eliminates the need (well, strong word but anyway) to keep checking that for various other
567 * resources created down the line, whether it's MQs or SHM pools or whatever else.
568 * Doing that is possible but:
569 * - annoying: it requires either a descriptor for an ::fstat() or a rather-unportable file-system-path for
570 * anything but actual files;
571 * - super-annoying: it requires deep-inside -- possibly even eventually user-supplied -- modules like
572 * the SHM-jemalloc module to be aware of the desire to even do this in the first place on every shared
573 * resource.
574 *
575 * The idea is: checking it here up-front, plus checking it on the CNS (PID) file (from which all IPC naming
576 * and thus trust, per design, flows), is a nice way to take care of it ahead of all that. It's not perfect:
577 * the effective UID:GID can be changed at runtime. We don't need to be perfect though: the whole
578 * safety/security project, here, is not meant to be some kind of cryptographically powerful guarantee. */
579 const auto own_creds = Process_credentials::own_process_credentials();
580 if ((own_creds.user_id() != m_srv_app_ref.m_user_id) || (own_creds.group_id() != m_srv_app_ref.m_group_id))
581 {
582 FLOW_LOG_WARNING("Session acceptor [" << *this << "]: Creation underway. However, just before writing "
583 "CNS (Current Namespace Store), a/k/a PID file, we determined that "
584 "the `user` aspect of our effective credentials [" << own_creds << "] do not match "
585 "the hard-configured value passed to this ctor: "
586 "[" << m_srv_app_ref.m_user_id << ':' << m_srv_app_ref.m_group_id << "]. "
587 "We cannot proceed, as this would violate the security/safety model of ipc::session. "
588 "Emitting error.");
590 }
591 else // if (own_creds match m_srv_app_ref.m_{user|group}_id)
592 {
593 const auto mutex_name = empty_session.base().cur_ns_store_mutex_absolute_name();
595 const auto cns_path = empty_session.base().cur_ns_store_absolute_path();
598 {
599 ios_all_saver saver(*(get_logger()->this_thread_ostream())); // Revert std::oct/etc. soon.
600 FLOW_LOG_INFO("Session acceptor [" << *this << "]: Created. Writing CNS (Current Namespace Store), a/k/a PID "
601 "file [" << cns_path << "] (perms "
602 "[" << std::setfill('0')
603 << std::setw(4) // Subtlety: This resets right away after the perms are output...
604 << std::oct << cns_perms.get_permissions() << "], "
605 "shared-mutex name [" << mutex_name << "], shared-mutex perms "
606 "[" << std::setw(4) // ...hence gotta do this again.
607 << mutex_perms.get_permissions() << "]); "
608 "then listening for incoming master socket stream "
609 "connects (through Native_socket_stream_acceptor that was just cted) to address "
610 "based partially on the namespace (PID) written to that file.");
611 }
612
613 /* See Client_session_impl where it, too, creates this sh_mutex for notes equally applicable here.
614 * It reads the file we are about to write and locks the same inter-process mutex accordingly. */
615 Named_sh_mutex_ptr sh_mutex;
617 "Server_session_impl::ctor:named-mutex-open-or-create", [&]()
618 {
619 sh_mutex = make_unique<Named_sh_mutex>(util::OPEN_OR_CREATE, mutex_name.native_str(), mutex_perms);
620 /* Set the permissions as discussed in long comment above. --^
621 * Bonus: All bipc OPEN_OR_CREATE guys take care to ensure permissions are set regardless of umask,
622 * so no need for us to set_resource_permissions() here.
623 *
624 * As for ensuring ownership... skipping as discussed in long comment above. */
625 });
626
627 if (!our_err_code)
628 {
629 Sh_lock_guard sh_lock(*sh_mutex);
630
631 /* Only set permissions if we in fact create CNS (PID) file. Since we use mutex and trust mutex's other
632 * users, we can atomically-enough check whether we create it by pre-checking its existence. To pre-check
633 * its existence use fs::exists(). fs::exists() can yield an error, but we intentionally eat any error
634 * and treat it as-if file does not exist. Whatever issue it was, if any, should get detected via ofstream
635 * opening. Hence, if there's an error, we pre-assume we_created_cns==true, and let the chips where they
636 * may subsequently. (Note this is all a low-probability eventuality.) */
637 Error_code dummy; // Can't just use fs::exists(cns_path), as it might throw an exception (not what we want).
638 const bool we_created_cns = !fs::exists(cns_path, dummy);
639 ofstream cns_file(cns_path);
640 // Make it exist immediately (for the following check). @todo Might be unnecessary. At least it's harmless.
641 cns_file.flush();
642
643 /* Ensure owner is as configured. (Is this redundant, given that we checked UID:GID above? Yes and no:
644 * yes, if we created it; no, if we hadn't. So why not only check if (!we_created_cns)? Answer: paranoia,
645 * sanity checking. We're not gonna do this check down the line for this session (per earlier-explained
646 * decision), so might as well just sanity-check.
647 *
648 * Ideally we'd:
649 * - Pass in an fstream-or-FD-or-similar (we've opened the file after all, so there is one), not a name.
650 * - Use something in C++/C standard library or Boost, not an OS call.
651 *
652 * It's nice to want things. On the FD front we're somewhat screwed; there is a gcc-oriented hack to get the FD,
653 * but it involves protected access and non-standard stuff. Hence we must work from the `path` cns_path.
654 * We've created it, so it really should work, even if it's a little slower or what-not. As for using
655 * a nice library... pass the buck: */
656 ensure_resource_owner_is_app(get_logger(), cns_path, m_srv_app_ref, &our_err_code);
657 if ((!our_err_code) && we_created_cns)
658 {
659 /* The owner check passed; and we just created it. (At this stage fs::exists() having failed somehow is
660 * more-or-less impossible: ensure_resource_owner_is_app() would've failed if so.) So:
661 * Set mode. Again, no great way to get an FD, nor to use the fstream itself. So just: */
662 util::set_resource_permissions(get_logger(), cns_path, cns_perms, &our_err_code);
663 // If it failed, it logged.
664 } // if (we_created_cns && (!(our_err_code [from set_resource_permissions()])))
665
666 if (!our_err_code)
667 {
668 cns_file << empty_session.base().srv_namespace().str() << '\n';
669
670 if (!cns_file.good())
671 {
672 const auto sys_err_code = our_err_code = Error_code(errno, system_category());
673 FLOW_LOG_WARNING("Session acceptor [" << *this << "]: Could not open or write CNS (PID) file "
674 "file [" << cns_path << "]; system error details follow.");
675 FLOW_ERROR_SYS_ERROR_LOG_WARNING(); // Log based on sys_err_code.
676 }
677 // Close file, unlock mutex.
678 } // if (!our_err_code) (from ensure_resource_owner_is_app(), or from set_resource_permissions())
679 // else { It logged. }
680 } // if (!our_err_code) (mutex creation)
681 // else { It logged. }
682
683 sh_mutex.reset(); // Again, see note in Client_session_impl which does the same.
684 } // else if (own_creds match m_srv_app_ref.m_{user|group}_id) // Otherwise our_err_code is truthy.
685
686 if (our_err_code)
687 {
688 if (err_code)
689 {
690 *err_code = our_err_code;
691 return;
692 }
693 // else
694 throw Runtime_error(our_err_code, FLOW_UTIL_WHERE_AM_I_STR());
695 }
696 /* Got here: CNS (PID file) written fine. Now we can listen on the socket stream acceptor derived off that value:
697 * clients will read that file and thus know to connect to that. */
698
699 // This might throw; or emit failure to *err_code; or neither (success).
700 m_state->m_master_sock_acceptor
701 = make_unique<transport::Native_socket_stream_acceptor>
702 (get_logger(),
703 empty_session.base().session_master_socket_stream_acceptor_absolute_name(),
704 err_code);
705
706 // See class doc header. Start this very-idle thread for a bit of corner case work.
707 if (err_code && (!*err_code))
708 {
709 m_state->m_incomplete_session_graveyard->start();
710 }
711 // else { Might as well not start that thread. }
712} // Session_server_impl::Session_server_impl()
713
715CLASS_SESSION_SERVER_IMPL::~Session_server_impl()
716{
717 FLOW_LOG_INFO("Session acceptor [" << *this << "]: Shutting down. The Native_socket_stream_acceptor will now "
718 "shut down, and all outstanding Native_socket_stream_acceptor handlers and Server_session "
719 "handlers shall fire with operation-aborted error codes.");
720 /* We've written our internal async-op handlers in such a way as to get those operation-aborted handler
721 * invocations to automatically occur as our various m_* are destroyed just past this line.
722 * Namely:
723 * - m_master_sock_acceptor dtor runs: fires our handler with its operation-aborted code; we translate it
724 * into the expected operation-aborted code; cool. This occurs with any such pending handlers.
725 * - m_incomplete_sessions dtor runs: Each Server_session_dtl_obj dtor runs: Any pending async_accept_log_in()
726 * emits the expected operation-aborted code; cool.
727 *
728 * Additionally via sub_class_set_deinit_func() we allow for certain final de-init code to be executed, once
729 * all state has been destroyed (including what we just mentioned). Hence force this to occur now: */
730 m_state.reset();
731
732 // Last thing! See sub_class_set_deinit_func() doc header which will eventually lead you to a rationale comment.
733 if (!m_deinit_func_or_empty.empty())
734 {
735 FLOW_LOG_INFO("Session acceptor [" << *this << "]: Continuing shutdown. A sub-class desires final de-init "
736 "work, once everything else is stopped (might be persistent resource cleanup); invoking "
737 "synchronously.");
738 m_deinit_func_or_empty();
739 FLOW_LOG_TRACE("De-init work finished.");
740 }
741} // Session_server_impl::~Session_server_impl()
742
744template<typename Task_err,
745 typename N_init_channels_by_srv_req_func, typename Mdt_load_func>
746void CLASS_SESSION_SERVER_IMPL::async_accept(Server_session_obj* target_session,
747 Channels* init_channels_by_srv_req,
748 Mdt_reader_ptr* mdt_from_cli_or_null,
749 Channels* init_channels_by_cli_req,
750 N_init_channels_by_srv_req_func&& n_init_channels_by_srv_req_func,
751 Mdt_load_func&& mdt_load_func,
752 Task_err&& on_done_handler)
753{
754 using util::String_view;
755 using flow::async::Task_asio_err;
756 using boost::make_shared;
757 using std::to_string;
758 using std::string;
759
760 assert(target_session);
761 assert(m_state->m_master_sock_acceptor && "By contract do not invoke async_accept() if ctor failed.");
762
763 /* We are in thread U or Wa or one of the Ws. U is obviously fine/mainstream. Ws means they invoked us
764 * directly from our own completion handler again; it is fine, since a Ws is started per async_accept();
765 * does not interact with other Ws. Wa means they invoked us directly from our own completion handler
766 * again, albeit only on error would that be from Wa; anyway Native_socket_stream_acceptor allows it. */
767
768 Task_asio_err on_done_func(std::move(on_done_handler));
769 auto sock_stm = make_shared<transport::sync_io::Native_socket_stream>(); // Empty target socket stream.
770 const auto sock_stm_raw = sock_stm.get();
771
772 FLOW_LOG_INFO("Session acceptor [" << *this << "]: Async-accept request: Immediately issuing socket stream "
773 "acceptor async-accept as step 1. If that async-succeeds, we will complete the login asynchronously; "
774 "if that succeeds we will finally emit the ready-to-go session to user via handler.");
775 m_state->m_master_sock_acceptor->async_accept
776 (sock_stm_raw,
777 [this, target_session, init_channels_by_srv_req, mdt_from_cli_or_null,
778 init_channels_by_cli_req,
779 n_init_channels_by_srv_req_func = std::move(n_init_channels_by_srv_req_func),
780 mdt_load_func = std::move(mdt_load_func),
781 sock_stm = std::move(sock_stm),
782 on_done_func = std::move(on_done_func)]
783 (const Error_code& err_code) mutable
784 {
785 // We are in thread Wa (unspecified; really N_s_s_acceptor worker thread).
786
787 if (err_code)
788 {
789 /* Couple cases to consider. 1, could've failed with a true (probably system) error.
790 * 2, it could've failed with operation-aborted which can only occur if m_master_sock_acceptor is destroyed
791 * during the async_accept(), which in turn can only happen if *this is destroyed during it.
792 *
793 * 1 is straightforward; just report the error through user handler, and that's that. As for 2:
794 *
795 * Since we've decided to (as explained in Implementation section of our class doc header) fully piggy-back
796 * all work, including handlers, on our helper objects' (m_master_sock_acceptor, or in step 2 the
797 * Server_session itself) worker threads, we can just report operation-aborted through user handler as well.
798 * Our dtor doesn't then need to worry about (having tracked un-fired user handlers) firing un-fired handlers:
799 * we will just do so by piggy-backing on m_master_sock_acceptor doing so. Code is much simpler then.
800 *
801 * Subtlety: avoid even logging. Logging is safe for 1, but it's not that high in value. I believe
802 * logging is safe in 2, because m_master_sock_acceptor dtor will only return once it has fired the
803 * handler (with operation-aborted), which happens before Log_context super-class is destroyed... but
804 * generally we avoid extra code on operation-aborted everywhere in Flow-IPC and `flow` (that I (ygoldfel)
805 * control anyway), so let's just keep to that. @todo Maybe re-add TRACE logging when not operation-aborted.
806 *
807 * If you change this, please consider synchronizing with the async_accept_log_in() handler below.
808 *
809 * Subtlety: transport:: and session:: have separate operation-aborted codes, and we promised the latter so: */
812 return;
813 } // if (err_code)
814 // else: No problem at this async-stage.
815
816 FLOW_LOG_INFO("Session acceptor [" << *this << "]: Async-accept request: Socket stream "
817 "acceptor async-accept succeeded resulting in socket stream [" << *sock_stm << "]; that was step "
818 "1. Now we create the server session and have it undergo async-login; if that succeeds we will "
819 "finally emit the ready-to-go session to user via handler.");
820
821 Incomplete_session incomplete_session
822 = make_shared<Server_session_dtl_obj>(get_logger(),
823 m_srv_app_ref, // Not copied!
824 // Gobble up the stream obj into the channel in session:
825 std::move(*sock_stm));
826 /* It's important that we save this in *this, so that if *this is destroyed, then *incomplete_session is
827 * destroyed, so that it triggers the handler below with operation-aborted, so that we pass that on to
828 * the user handler as promised. However it is therefore essential that we don't also capture
829 * incomplete_session (the shared_ptr!) in our handler lambda but rather only the observer (weak_ptr) of it!
830 * Otherwise we create a cycle and a memory leak. */
831 {
832 Lock_guard incomplete_sessions_lock(m_state->m_mutex);
833 m_state->m_incomplete_sessions.insert(incomplete_session);
834 }
835
836 // This little hook is required by Server_session_dtl::async_accept_log_in() for Client_app* lookup and related.
837 auto cli_app_lookup_func = [this](String_view cli_app_name) -> const Client_app*
838 {
839 const auto cli_it = m_cli_app_master_set_ref.find(string(cli_app_name));
840 return (cli_it == m_cli_app_master_set_ref.end()) ? static_cast<const Client_app*>(0) : &cli_it->second;
841 };
842
843 /* And this one is required to issue a unique cli-namespace, if all goes well. The "if all goes well" part
844 * is why it's a hook and not just pre-computed and passed-into async_accept_log_in() as an arg. */
845 auto cli_namespace_func = [this]() -> Shared_name
846 {
847 return Shared_name::ct(to_string(++m_state->m_last_cli_namespace));
848 };
849
850 /* Lastly, we are to execute this upon knowing the Client_app and before the log-in response it sent to
851 * opposing Client_session. */
852 auto pre_rsp_setup_func = [this,
853 incomplete_session_observer = Incomplete_session_observer(incomplete_session)]
854 () -> Error_code
855 {
856 // We are in thread Ws (unspecified; really Server_session worker thread).
857
858 // (Could have captured incomplete_session itself, but I'd rather assert() than leak.)
859 auto incomplete_session = incomplete_session_observer.lock();
860 assert(incomplete_session
861 && "The Server_session_dtl_obj cannot be dead (dtor ran), if it's invoking its async handlers OK "
862 "(and thus calling us in its async_accept_log_in() success path.");
863 // If it's not dead, we're not dead either (as us dying is the only reason incomplete_session would die).
864
865 const auto cli_app_ptr = incomplete_session->base().cli_app_ptr();
866 assert(cli_app_ptr && "async_accept_log_in() contract is to call pre_rsp_setup_func() once all "
867 "the basic elements of Session_base are known (including Client_app&).");
868
869 return m_per_app_setup_func(*cli_app_ptr);
870 }; // auto pre_rsp_setup_func =
871
872 /* @todo It occurs to me (ygoldfel) now that I look at it: all these args -- even on_done_func --
873 * could instead be passed into Server_session_dtl ctor; that guy could save them into m_* (as of this writing
874 * it passes them around via lambdas). After all, as of now, async_accept_log_in() is non-retriable.
875 * Pros: much less lambda-capture boiler-plate in there.
876 * Cons: (1) added state can leak (they'd need to worry about maybe clearing that stuff on entry to almost-PEER
877 * state, and possibly on failure); (2) arguably less maintainable (maybe Server_session_dtl may want to
878 * make async_accept() re-triable a-la Client_session_impl::async_connect()?).
879 *
880 * I, personally, have a big bias against non-const m_* state (where it can be avoided reasonably), so I
881 * like that this stuff only stays around via captures, until the relevant async-op is finished. So I prefer
882 * it this way. Still: arguable.
883 *
884 * Anyway. Go! */
885
886 incomplete_session->async_accept_log_in
887 (this,
888 init_channels_by_srv_req, // Our async out-arg for them to set on success (before on_done_func(Error_code())).
889 mdt_from_cli_or_null, // Ditto.
890 init_channels_by_cli_req, // Ditto.
891
892 std::move(cli_app_lookup_func), // Look up Client_app by name and give it to them!
893 std::move(cli_namespace_func), // Generate new per-session namespace and give it to them!
894 std::move(pre_rsp_setup_func), // Set up stuff necessary for this Client_app, especially if 1st occurrence!
895
896 std::move(n_init_channels_by_srv_req_func), // How many channels does our caller want to set up?
897 std::move(mdt_load_func), // Let caller fill out srv->cli metadata!
898
899 [this, incomplete_session_observer = Incomplete_session_observer(incomplete_session),
900 target_session, on_done_func = std::move(on_done_func)]
901 (const Error_code& async_err_code)
902 {
903 // We are in thread Ws (unspecified; really Server_session worker thread).
904
905 auto incomplete_session = incomplete_session_observer.lock();
906 if (incomplete_session)
907 {
908 /* No matter what -- the session is no longer incomplete; either it accepted log-in in OK or not.
909 * Remove it from *this. We'll either forget it (error) or give it to user (otherwise). Anyway remove it. */
910 {
911 Lock_guard incomplete_sessions_lock(m_state->m_mutex);
912#ifndef NDEBUG
913 const bool erased_ok = 1 ==
914#endif
915 m_state->m_incomplete_sessions.erase(incomplete_session);
916 assert(erased_ok && "Who else would have erased it?!");
917 }
918
919 if (async_err_code)
920 {
921 /* See class doc header. We are in thread Ws; letting incomplete_session (the shared_ptr) be destroyed
922 * here (below, in the `if (async_err_code)` clause) would cause it to try
923 * to join thread Ws which would deadlock; we'd be breaking the contract to
924 * never destroy Server_session from its own handler. So hand it off to this very-idle thread to do it
925 * asynchronously. */
926 m_state->m_incomplete_session_graveyard->post([incomplete_session = std::move(incomplete_session)]
927 () mutable
928 {
929 // That's that. ~Server_session() will run here:
930 incomplete_session.reset(); // Just in case compiler wants to warn about unused-var or optimize it away...?
931 });
932 // incomplete_session (the shared_ptr) is hosed at this point.
933 assert((!incomplete_session) && "It should've been nullified by being moved-from into the captures.");
934 } // if (async_err_code)
935 } // if (incomplete_session) (but it may have been nullified inside <=> async_err_code is truthy)
936 else // if (!incomplete_session)
937 {
938 /* Server_session_dtl disappeared under us, because *this is disappearing under us.
939 * Naturally no need to remove it from any m_incomplete_sessions, since that's not a thing.
940 * However this sanity check is worthwhile: */
942 && "The incomplete-session Server_session_dtl can only disappear under us if *this is destroyed "
943 "which can only occur <=> operation-aborted is emitted due to *this destruction destroying that "
944 "incomplete-session.");
945 } // else if (!incomplete_session)
946
947 if (async_err_code)
948 {
949 /* As in the acceptor failure case above just forward it to handler. All the same comments apply,
950 * except there is no subtlety about operation-aborted coming from outside ipc::session.
951 * If you change this, please consider synchronizing with the async_accept() handler above. */
952 on_done_func(async_err_code);
953 return;
954 }
955 // else if (!async_err_code)
956 assert(incomplete_session);
957
958 // Yay! Give it to the user.
959 FLOW_LOG_INFO("Session acceptor [" << *this << "]: Async-accept request: Successfully resulted in logged-in "
960 "server session [" << *incomplete_session << "]. Feeding to user via callback.");
961
962 /* Server_session_dtl *is* Server_session, just with an added public API; now we reinterpret the pointer
963 * and give the user the Server_session without that public API which accesses its internals. */
964 *target_session = std::move(*(static_cast<Server_session_obj*>(incomplete_session.get())));
965 // *incomplete_session is now as-if default-cted.
966
967 on_done_func(async_err_code); // Pass in Error_code().
968 FLOW_LOG_TRACE("Handler finished.");
969 }); // incomplete_session->async_accept_log_in()
970 }); // m_master_sock_acceptor->async_accept()
971} // Session_server_impl::async_accept()
972
974void CLASS_SESSION_SERVER_IMPL::to_ostream(std::ostream* os) const
975{
976 *os << '[' << m_srv_app_ref << "]@" << static_cast<const void*>(this);
977}
978
980typename CLASS_SESSION_SERVER_IMPL::Session_server_obj* CLASS_SESSION_SERVER_IMPL::this_session_srv()
981{
982 return m_this_session_srv;
983}
984
986template<typename Task>
987void CLASS_SESSION_SERVER_IMPL::sub_class_set_deinit_func(Task&& task)
988{
989 m_deinit_func_or_empty = std::move(task);
990}
991
993std::ostream& operator<<(std::ostream& os,
994 const CLASS_SESSION_SERVER_IMPL& val)
995{
996 val.to_ostream(&os);
997 return os;
998}
999
1000#undef CLASS_SESSION_SERVER_IMPL
1001#undef TEMPLATE_SESSION_SERVER_IMPL
1002
1003} // namespace ipc::session
This is the data-less sub-class of Server_session or any more-advanced (e.g., SHM-capable) variant th...
const Session_base_obj & base() const
Provides const access to Session_base super-object.
Internal class template comprising API/logic common to every Session_server variant,...
void to_ostream(std::ostream *os) const
See Server_session method.
boost::shared_ptr< Server_session_dtl_obj > Incomplete_session
Internally used ref-counted handle to a Server_session_dtl_obj, suitable for capturing and passing ar...
void sub_class_set_deinit_func(Task &&task)
Utility for sub-classes: ensures that task() is invoked near the end of *this dtor's execution,...
Session_server_t Session_server_obj
See this_session_srv().
boost::unordered_set< Incomplete_session > Incomplete_sessions
Short-hand for set of Incomplete_session, with fast insertion and removal by key Incomplete_session i...
flow::util::Lock_guard< Mutex > Lock_guard
Short-hand for Mutex lock.
void async_accept(Server_session_obj *target_session, Channels *init_channels_by_srv_req, Mdt_reader_ptr *mdt_from_cli_or_null, Channels *init_channels_by_cli_req, N_init_channels_by_srv_req_func &&n_init_channels_by_srv_req_func, Mdt_load_func &&mdt_load_func, Task_err &&on_done_func)
See Session_server method.
const Client_app::Master_set & m_cli_app_master_set_ref
See ctor.
const Server_app & m_srv_app_ref
See Session_server public data member.
typename Server_session_obj::Mdt_reader_ptr Mdt_reader_ptr
Short-hand for Session_mv::Mdt_reader_ptr.
Function< void()> m_deinit_func_or_empty
See sub_class_set_deinit_func(). .empty() unless that was called at least once.
typename Server_session_obj::Channels Channels
Short-hand for Session_mv::Channels.
Session_server_impl(flow::log::Logger *logger_ptr, Session_server_obj *this_session_srv_arg, const Server_app &srv_app_ref, const Client_app::Master_set &cli_app_master_set_ref, Error_code *err_code, Per_app_setup_func &&per_app_setup_func)
See Session_server ctor; it does that.
Session_server_obj *const m_this_session_srv
See this_session_srv().
std::optional< State > m_state
See State.
Session_server_obj * this_session_srv()
Returns pointer to the object that is privately sub-classing us.
const Function< Error_code(const Client_app &client_app)> m_per_app_setup_func
See ctor.
~Session_server_impl()
See Session_server dtor.
boost::weak_ptr< Server_session_dtl_obj > Incomplete_session_observer
weak_ptr observer of an Incomplete_session.
flow::util::Mutex_non_recursive Mutex
Short-hand for State::m_mutex type.
To be instantiated typically once in a given process, an object of this type asynchronously listens f...
Base of Blob_stream_mq_sender and Blob_stream_mq_receiver containing certain static facilities,...
Implements both sync_io::Native_handle_sender and sync_io::Native_handle_receiver concepts by using a...
A process's credentials (PID, UID, GID as of this writing).
String-wrapping abstraction representing a name uniquely distinguishing a kernel-persistent entity fr...
static Shared_name ct(const Source &src)
Copy-constructs from a char-sequence container (including string, util::String_view,...
@ S_RESOURCE_OWNER_UNEXPECTED
A resource in the file system (file, SHM pool, MQ, etc.) has or could have unexpected owner; ipc::ses...
@ S_MUTEX_BIPC_MISC_LIBRARY_ERROR
Low-level boost.ipc.mutex: boost.interprocess emitted miscellaneous library exception sans a system c...
@ S_OBJECT_SHUTDOWN_ABORTED_COMPLETION_HANDLER
Async completion handler is being called prematurely, because underlying object is shutting down,...
Flow-IPC module providing the broad lifecycle and shared-resource organization – via the session conc...
Definition: app.cpp:27
void ensure_resource_owner_is_app(flow::log::Logger *logger_ptr, const fs::path &path, const App &app, Error_code *err_code)
Utility, used internally but exposed in public API in case it is of general use, that checks that the...
Definition: app.cpp:31
Shared_name build_conventional_shared_name_prefix(const Shared_name &resource_type, const Shared_name &srv_app_name)
Return the prefix common to all calls to either build_conventional_shared_name() overload with the ar...
std::ostream & operator<<(std::ostream &os, const App &val)
Prints string representation of the given App to the given ostream.
Definition: app.cpp:124
@ S_OBJECT_SHUTDOWN_ABORTED_COMPLETION_HANDLER
Async completion handler is being called prematurely, because underlying object is shutting down,...
flow::async::Task Task
Short-hand for polymorphic function (a-la std::function<>) that takes no arguments and returns nothin...
Definition: util_fwd.hpp:122
const Open_or_create OPEN_OR_CREATE
Tag value indicating an open-if-exists-else-create operation.
Definition: util.cpp:30
const boost::array< Permissions, size_t(Permissions_level::S_END_SENTINEL)> PRODUCER_CONSUMER_RESOURCE_PERMISSIONS_LVL_MAP
Maps general Permissions_level specifier to low-level Permissions value, when the underlying resource...
Definition: util.cpp:42
void set_resource_permissions(flow::log::Logger *logger_ptr, const fs::path &path, const Permissions &perms, Error_code *err_code)
Utility that sets the permissions of the given resource (at the supplied file system path) to specifi...
Definition: util.cpp:46
void op_with_possible_bipc_exception(flow::log::Logger *logger_ptr, Error_code *err_code, const Error_code &misc_bipc_lib_error, String_view context, const Func &func)
Internal (to ipc) utility that invokes the given function that invokes a boost.interprocess operation...
Definition: util.hpp:34
Permissions shared_resource_permissions(Permissions_level permissions_lvl)
Maps general Permissions_level specifier to low-level Permissions value, when the underlying resource...
Definition: util.cpp:37
flow::util::String_view String_view
Short-hand for Flow's String_view.
Definition: util_fwd.hpp:115
Log_component
The flow::log::Component payload enumeration containing various log components used by Flow-IPC inter...
Definition: common.hpp:323
flow::Error_code Error_code
Short-hand for flow::Error_code which is very common.
Definition: common.hpp:298
flow::Function< Signature > Function
Short-hand for polymorphic functor holder which is very common. This is essentially std::function.
Definition: common.hpp:302
#define TEMPLATE_SESSION_SERVER_IMPL
Internally used macro; public API users should disregard (same deal as in struc/channel....
#define CLASS_SESSION_SERVER_IMPL
Internally used macro; public API users should disregard (same deal as in struc/channel....
util::group_id_t m_group_id
The application must run as this group ID (GID). Files and other shared resources shall have this own...
Definition: app.hpp:171
std::string m_name
Brief application name, readable to humans and unique across all other applications' names; used both...
Definition: app.hpp:144
util::user_id_t m_user_id
The application must run as this user ID (UID). Files and other shared resources shall have this owne...
Definition: app.hpp:168
An App that is used as a client in at least one client-server IPC split.
Definition: app.hpp:185
boost::unordered_map< std::string, Client_app > Master_set
Suggested type for storing master repository or all Client_appss. See App doc header for discussion.
Definition: app.hpp:192
An App that is used as a server in at least one client-server IPC split.
Definition: app.hpp:206
util::Permissions_level m_permissions_level_for_client_apps
Specifies level of access for Client_apps (which must, also, be in m_allowed_client_apps at any rate)...
Definition: app.hpp:288
All internal mutable state of Session_server_impl.
std::atomic< uint64_t > m_last_cli_namespace
The ID used in generating the last Server_session::cli_namespace(); so the next one = this plus 1....
boost::movelib::unique_ptr< transport::Native_socket_stream_acceptor > m_master_sock_acceptor
transport::Native_socket_stream acceptor avail throughout *this to accept init please-open-session re...
Mutex m_mutex
Protects m_incomplete_sessions. See class doc header impl section for discussion of thread design.
Incomplete_sessions m_incomplete_sessions
The set of all Incomplete_session objects such that each one comes from a distinct async_accept() req...
boost::movelib::unique_ptr< flow::async::Single_thread_task_loop > m_incomplete_session_graveyard
Mostly-idle thread that solely destroys objects removed from m_incomplete_sessions in the case where ...