Flow-IPC 2.0.0
Flow-IPC project: Full implementation reference.
session_server_impl.hpp
Go to the documentation of this file.
1/* Flow-IPC: Sessions
2 * Copyright 2023 Akamai Technologies, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the
5 * "License"); you may not use this file except in
6 * compliance with the License. You may obtain a copy
7 * of the License at
8 *
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in
12 * writing, software distributed under the License is
13 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
14 * CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing
16 * permissions and limitations under the License. */
17
18/// @file
19#pragma once
20
23#include "ipc/session/error.hpp"
24#include "ipc/session/app.hpp"
29#include <boost/interprocess/sync/named_mutex.hpp>
30#include <boost/interprocess/sync/scoped_lock.hpp>
31#include <boost/move/make_unique.hpp>
32
33namespace ipc::session
34{
35
36// Types.
37
38/**
39 * Internal class template comprising API/logic common to every Session_server variant, meant to be
40 * `private`ly sub-classed and largely forwarded. In particular the vanilla Session_server
41 * (see its short Implementation doc header section) sub-classes us and makes no use of the available
42 * customization points. Contrast with, e.g., shm::classic::Session_server which uses customization
43 * points for shm::classic SHM-arena setup.
44 *
45 * The available customization points are as follows.
46 * - Per-session: `*this` is parameterized on #Server_session_obj. The vanilla value is `Server_session<...>`;
47 * but to add capabilities sub-class Server_session_impl as explained in its doc header and proceed from
48 * there. For example see shm::classic::Server_session. Server_session_impl has its own customization point(s).
49 * - No-customization = specify #Server_session here.
50 * - Cross-session: Additional per-Client_app setup (such as setting up cross-session SHM arena(s) in
51 * on-demand fashion) can be specified by passing in `per_app_setup_func` to ctor.
52 * - No-customization = specify do-nothing function here that returns `Error_code()` always.
53 * - Cross-session: Certain custom code can be caused to run as the last thing in the Server_session_impl dtor,
54 * even after all its state has been destroyed (e.g., all threads have been stopped/joined).
55 * A sub-class may use this customization point by calling sub_class_set_deinit_func(). If not called, the
56 * customization point is unused by default.
57 *
58 * ### Implementation design ###
59 * Generally the impl should be reasonably easy to follow by reading the method bodies, at least if one understands
60 * the general ipc::session client-server paradigm.
61 *
62 * The thing to understand strategically, as usual, is the thread design. This is an acceptor, so one might
63 * have expected it to be implemented similarly to, say, Native_socket_stream_acceptor -- maintaining a thread W
64 * in which to do some/most work; including maintaining a "deficit" queue of oustanding async_accept() requests
65 * and a "surplus" queue of ready `Server_session`s to emit. This was an option, but I (ygoldfel) felt that
66 * piggy-backing handling of events directly onto the unspecified handler-invoking threads of the internally
67 * used objects would produce a much simpler data structure and state machine. (As a side effect, the behavior
68 * described in "FIFO" section above occurs. Also as a side effect, the error-emission behavior described
69 * in "Error handling" above occurs. Basically: each async_accept()'s internal handling is independent of the
70 * others. They share (almost) no state; there is no single async-chain and explicit queues unlike in various other
71 * boost.asio-like classes in ::ipc. Each async_accept() triggers async op 1, the handler for which triggers async
72 * op 2, the handler for which emits the result to user. This is
73 * simpler to implement, but it also results in sensible API contract behavior, I feel.)
74 *
75 * Here is how it works.
76 *
77 * For each potential `Server_session` -- i.e., for each async_accept() -- there are 2 steps that must occur
78 * asynchronously before one is ready to emit to the user-supplied handler:
79 * - Our stored Native_socket_stream_acceptor, where we invoke
80 * transport::Native_socket_stream_acceptor::async_accept(), must emit a PEER-state (connected)
81 * transport::sync_io::Native_socket_stream (the opposing peer object living inside the opposing Client_session).
82 * - Now a Server_session_dtl may be constructed (not yet emitted to user) and then
83 * Server_session_dtl::async_accept_log_in() is invoked.
84 * - Or if the socket-accept failed, then we can emit that to the user already; done.
85 * - That Server_session_dtl::async_accept_log_in() must complete the async log-in exchange against the
86 * opposing Client_session.
87 * - Now the Server_session_dtl can be converted via pointer `static_cast<>` to `Server_session` and
88 * emitted to the user.
89 * - Or if the log-in fails at some state, then we can emit that to the user.
90 *
91 * Nomenclature: We call the `N_s_s_a::async_accept()` handler-invoking thread: Wa. It is officially an
92 * unspecified thread or threads, but, by contract, handlers are executed non-concurrently, so it can be
93 * considered one thread (and actually it is as of this writing). We call the
94 * Server_session::async_accept_log_in() handler-invoking thread: Ws. This is really a separate thread for
95 * each `Server_session`, so in fact 2 different async-accept requests can cause 2 handlers to invoke simultaneously.
96 *
97 * So each time one calls async_accept() from thread U (i.e., user thread(s) from which they must never invoke
98 * mutating stuff concurrently), that kicks off transport::Native_socket_stream_acceptor::async_accept(),
99 * which fires handler in thread Wa; we then kick off `Server_session::async_accept_log_in()`, which fires
100 * handler in thread Ws, where we finalize the `Server_session` and emit it to user. There is no cross-posting
101 * to some other worker thread W (but read on).
102 *
103 * The obvious question is, do the handlers, some of which (as noted) may run concurrently to each other
104 * (request 1's Ws handler can co-execute with request 2's Wa handler; and request 2's Wa handler can co-execute
105 * with request 3's Wa handler), mess each other over by mutatingly accessing common data? Let's consider the
106 * state involved.
107 *
108 * For each async-accept request, the amassed data are independent from any other's; they are passed around
109 * throughout the 2 async ops per request via lambda captures. There is, however, one caveat to this:
110 * Suppose `S->accept_log_in(F)` is invoked on not-yet-ready (incomplete) `Server_session* S`; suppose F is invoked
111 * with a truthy #Error_code (it failed). We are now sitting in thread Ws: and S should be destroyed.
112 * But invoking dtor of S from within S's own handler is documented to be not-okay and results in
113 * a deadlock/infinite dtor execution, or if the system can detect it, at best an abort due to a thread trying to
114 * join itself. So:
115 * - We maintain State::m_incomplete_sessions storing each such outstanding S. If dtor runs, then all S will be
116 * auto-destroyed which will automatically invoke the user handler with operation-aborted.
117 * - If an incomplete (oustanding) S successfully completes log-in, we remove it from
118 * Session_server_impl::m_incomplete_sessions and emit it to user via handler.
119 * - If it completes log-in with failure, we remove it from State::m_incomplete_sessions and then:
120 * - hand it off to a mostly-idle separate thread, State::m_incomplete_session_graveyard, which can run S's dtor
121 * in peace without deadlocking anyone. (If `*this` dtor runs before then, the S dtors will still run, as each
122 * queued lambda's captures are destroyed.)
123 * - This is the part that avoids the deadlock. The other things above are orthogonally needed for the promised
124 * boost.asio-like semantics, where handler must run exactly once eventually, from dtor at the latest.
125 *
126 * However note that State::m_incomplete_sessions is added-to in thread Wa but removed-from in various threads Ws.
127 * Therefore it is protected by a mutex; simple enough.
128 *
129 * @todo Session_server, probably in ctor or similar, should -- for safety -- enforce the accuracy
130 * of Server_app attributes including App::m_exec_path, App::m_user_id, App::m_group_id. As of this writing
131 * it enforces these things about each *opposing* Client_app and process -- so for sanity it can/should do so
132 * about itself, before the sessions can begin.
133 *
134 * @tparam Server_session_t
135 * See #Server_session_obj. Its API must exactly equal (or be a superset of) that of vanilla #Server_session.
136 * (Its impl may perform extra steps; for example `async_accept_log_in()` might set up a per-session SHM
137 * arena.)
138 * @tparam Session_server_t
139 * The class that is in fact `private`ly sub-classing us.
140 * This is necessary for this_session_srv(). See its doc header for discussion.
141 */
142template<typename Session_server_t, typename Server_session_t>
144 public flow::log::Log_context,
145 private boost::noncopyable
146{
147public:
148 // Types.
149
150 /// See this_session_srv().
151 using Session_server_obj = Session_server_t;
152
153 /// Useful short-hand for the concrete `Server_session` type emitted by async_accept().
155
156 /// Short-hand for Session_mv::Mdt_reader_ptr.
157 using Mdt_reader_ptr = typename Server_session_obj::Mdt_reader_ptr;
158
159 /// Short-hand for Session_mv::Channels.
160 using Channels = typename Server_session_obj::Channels;
161
162 // Constructors/destructor.
163
164 /**
165 * See Session_server ctor; it does that. In addition:
166 *
167 * takes and memorizes a functor that takes a Client_app const ref that identifies the app that wants to
168 * open the session, performs unspecified synchronous steps, and returns an #Error_code indicating success or
169 * reason for failure which dooms that async_accept().
170 *
171 * ### Rationale for `per_app_setup_func` ###
172 * It is not intended for per-session setup. #Server_session_dtl_obj should take care of that where it makes
173 * sense -- it does after all represent the individual budding session peer. However our sub-class
174 * (e.g., shm::classic::Session_server) may need to keep track of per-distinct-Client_app resources
175 * (e.g., the per-app-scope SHM arena) which must exist before the opposing Client_session-type object
176 * completes its setup (e.g., by opening the aforementioned per-Client_app/multi-instance-scope SHM arena).
177 * It can detect a new Client_app is logging-in and set that up in the nick of time.
178 *
179 * @tparam Per_app_setup_func
180 * See above. Signature: `Error_code F(const Client_app&)`.
181 * @param logger_ptr
182 * See Session_server ctor.
183 * @param srv_app_ref
184 * See Session_server ctor.
185 * @param cli_app_master_set_ref
186 * See Session_server ctor.
187 * @param err_code
188 * See Session_server ctor. Additional #Error_code generated:
189 * see `per_app_setup_func`.
190 * @param per_app_setup_func
191 * See above.
192 * @param this_session_srv_arg
193 * The object that is, in fact, `private`ly sub-classing `*this` (and calling this ctor).
194 * See this_session_srv(). The value is only saved but not dereferenced inside the ctor.
195 */
196 template<typename Per_app_setup_func>
197 explicit Session_server_impl(flow::log::Logger* logger_ptr,
198 Session_server_obj* this_session_srv_arg,
199 const Server_app& srv_app_ref,
200 const Client_app::Master_set& cli_app_master_set_ref,
201 Error_code* err_code, Per_app_setup_func&& per_app_setup_func);
202
203 /// See Session_server dtor.
205
206 // Methods.
207
208 /**
209 * See Session_server method. In addition: invokes `per_app_setup_func()` (from ctor)
210 * once the connecting Client_app becomes known; if that returns truthy #Error_code then this method
211 * emits that error.
212 *
213 * @tparam Task_err
214 * See Session_server method.
215 * @tparam N_init_channels_by_srv_req_func
216 * See Session_server method.
217 * @tparam Mdt_load_func
218 * See Session_server method.
219 * @param target_session
220 * See Session_server method.
221 * @param init_channels_by_srv_req
222 * See Session_server method.
223 * @param mdt_from_cli_or_null
224 * See Session_server method.
225 * @param init_channels_by_cli_req
226 * See Session_server method.
227 * @param n_init_channels_by_srv_req_func
228 * See Session_server method.
229 * @param mdt_load_func
230 * See Session_server method.
231 * @param on_done_func
232 * See Session_server method.
233 */
234 template<typename Task_err,
235 typename N_init_channels_by_srv_req_func, typename Mdt_load_func>
236 void async_accept(Server_session_obj* target_session,
237 Channels* init_channels_by_srv_req,
238 Mdt_reader_ptr* mdt_from_cli_or_null,
239 Channels* init_channels_by_cli_req,
240 N_init_channels_by_srv_req_func&& n_init_channels_by_srv_req_func,
241 Mdt_load_func&& mdt_load_func,
242 Task_err&& on_done_func);
243
244 /**
245 * See Server_session method.
246 *
247 * @param os
248 * See Server_session method.
249 */
250 void to_ostream(std::ostream* os) const;
251
252 /**
253 * Returns pointer to the object that is `private`ly sub-classing us. In other words this equals
254 * `static_cast<const Session_server_obj*>(this)`, where this class is the base of #Session_server_obj,
255 * but up-casting from a `private` base is not allowed.
256 *
257 * ### Rationale ###
258 * I (ygoldfel) acknowledge this is somewhat odd. Why should a sub-class, per se, care or know about its
259 * super-class? This at least vaguely indicates some sort of design mistake. In fact this is needed, as of
260 * this writing, because shm::classic::Server_session_impl::async_accept_log_in() gets a Session_server_impl ptr,
261 * which it knows points to an object that's really the core of a shm::classic::Session_server, and it needs to
262 * interact with the SHM-classic-specific aspect of that guy's API. So it calls this accessor here, essentially
263 * as a way to up-cast from a `private` base (which is not allowed by C++).
264 * Why can't it "just" take a `shm::classic::Session_server*` then? Answer: because Session_server_impl uses, like,
265 * `virtual`-less polymorphism to invoke `async_accept_log_in()` regardless of which object it's calling it on....
266 * It's hard to summarize here in words in any way that'll make sense, but if one looks at the relevant code
267 * it makes sense. Eventually. Bottom line is, this way, it can just pass-in `this`, and then
268 * shm::classic::Server_session_impl::async_accept_log_in() can call this_session_srv() to get at the super-class
269 * version of `this`.
270 *
271 * I feel it is not criminal -- internal things are working together in a way that they logically intend to --
272 * but intuitively it feels like there's a smoother way to design it. Probably.
273 *
274 * @todo Reconsider the details of how classes in the non-`virtual` hierarchies
275 * `Session_server`, `Server_session`, `Session_server_impl`, `Server_session_impl` cooperate internally,
276 * as there is some funky stuff going on, particularly Session_server_impl::this_session_srv().
277 *
278 * @return See above.
279 */
281
282 // Data.
283
284 /// See Session_server public data member.
286
287protected:
288 // Methods.
289
290 /**
291 * Utility for sub-classes: ensures that `task()` is invoked near the end of `*this` dtor's execution, after *all*
292 * other (mutable) state has been destroyed, including stopping/joining any threads performing
293 * async async_accept() ops. It may be invoked at most once.
294 *
295 * The value it adds: A long story best told by specific example. See the original use case which is
296 * in shm::classic::Session_server; it sets up certain SHM cleanup steps to occur,
297 * when the session-server is destroyed.
298 *
299 * ### Watch out! ###
300 * At the time `task()` runs, the calling instance of the sub-class has been destroyed -- thus it is, e.g.,
301 * usually wrong to capture your `this` in the `task` lambda, except for logging.
302 * `get_logger()` and `get_log_component()` (which are in this super-class) are still okay to use.
303 *
304 * @tparam Task
305 * Function object invoked as `void` with no args.
306 * @param task
307 * `task()` shall execute before dtor returns.
308 */
309 template<typename Task>
311
312private:
313 // Types.
314
315 /**
316 * Short-hand for concrete Server_session_dtl type, which each async_accept() creates internally, completes
317 * the log-in process upon, and then up-casts to #Server_session_obj to emit to user via move-assignment.
318 * #Server_session_dtl_obj is equal to #Server_session_obj -- it adds no data -- but exposes certain
319 * internally invoked APIs that the user shall not access.
320 */
322
323 /**
324 * Internally used ref-counted handle to a #Server_session_dtl_obj, suitable for capturing and passing around
325 * lambdas.
326 *
327 * ### Rationale ###
328 * It is `shared_ptr`, not `unique_ptr`, for two reasons. Primarily, it is so that it can be captured
329 * via #Incomplete_session_observer to avoid a leak that would result from capturing #Incomplete_session
330 * in a lambda passed-to an async op on an #Incomplete_session *itself*. `unique_ptr` cannot be observed
331 * via `weak_ptr`; `shared_ptr` can.
332 *
333 * Secondarily, a `unique_ptr` cannot be captured in a lambda in the first place.
334 */
335 using Incomplete_session = boost::shared_ptr<Server_session_dtl_obj>;
336
337 /**
338 * `weak_ptr` observer of an #Incomplete_session. Capturing this, instead of #Incomplete_session itself,
339 * allows for the underlying #Incomplete_session to be destroyed while the lambda still exists.
340 */
341 using Incomplete_session_observer = boost::weak_ptr<Server_session_dtl_obj>;
342
343 /// Short-hand for set of #Incomplete_session, with fast insertion and removal by key #Incomplete_session itself.
344 using Incomplete_sessions = boost::unordered_set<Incomplete_session>;
345
346 /// Short-hand for State::m_mutex type.
347 using Mutex = flow::util::Mutex_non_recursive;
348
349 /// Short-hand for #Mutex lock.
350 using Lock_guard = flow::util::Lock_guard<Mutex>;
351
352 /**
353 * All internal mutable state of Session_server_impl. It's grouped into this inner `struct`, so that we can
354 * destroy it in the destructor via an `optional::reset()`, letting items be destroyed automatically in
355 * the opposite order in which they were constructed -- and *then* perform certain final steps when
356 * that state (including various threads running) is no longer a factor. See Session_server_impl dtor
357 * and sub_class_set_deinit_func().
358 */
359 struct State
360 {
361 /// The ID used in generating the last Server_session::cli_namespace(); so the next one = this plus 1. 0 initially.
362 std::atomic<uint64_t> m_last_cli_namespace;
363
364 /**
365 * The set of all #Incomplete_session objects such that each one comes from a distinct async_accept() request
366 * that (1) has accepted a transport::sync_io::Native_socket_stream connection and thus created a
367 * #Server_session_dtl_obj but (2) whose Server_session_dtl::async_accept_log_in() has not yet completed (fired
368 * handler).
369 *
370 * Protected by #m_mutex; accessed from thread Wa and threads Ws. See class doc header impl section for
371 * discussion of thread design.
372 *
373 * ### Impl design ###
374 * Once a Server_session_dtl is created by `*this`, we do `async_accept_log_in()` on it. We cannot capture that
375 * Server_session_dtl itself into its handler's lambda, as that would create a cycle leak wherein
376 * the object can never be destroyed. (This is typical when maintaining a boost.asio-style I/O object by
377 * a `shared_ptr` handle.) So we capture an #Incomplete_session_observer (`weak_ptr`) thereof; but store
378 * the Server_session_dtl (a/k/a #Incomplete_session) here in `*this`, as of course it must be stored somewhere.
379 * Then if `*this` dtor is invoked before the aforementioned `async_accept_log_in()` handler fires, the handler
380 * shall fire with operation-aborted as desired.
381 *
382 * ### Ordering caveat ###
383 * As of this writing, since the dtor auto-destroys the various members as opposed to any manual ordering thereof:
384 * This must be declared before #m_master_sock_acceptor. Then when dtor runs, first the latter's thread Wa
385 * shall be joined as it is destroyed first. Then #m_incomplete_sessions shall be destroyed next. Thus there
386 * is zero danger of concurrent access to #m_incomplete_sessions. Obviously the dtor's auto-destruction
387 * of #m_incomplete_sessions is not protected by any lock.
388 *
389 * We could instead manually destroy stuff in the proper order. I (ygoldfel) do like to just rely on
390 * auto-destruction (in order opposite to declaration/initialization) to keep things clean.
391 */
393
394 /// transport::Native_socket_stream acceptor avail throughout `*this` to accept init please-open-session requests.
395 boost::movelib::unique_ptr<transport::Native_socket_stream_acceptor> m_master_sock_acceptor;
396
397 /// Protects `m_incomplete_sessions`. See class doc header impl section for discussion of thread design.
398 mutable Mutex m_mutex;
399
400 /**
401 * Mostly-idle thread that solely destroys objects removed from `m_incomplete_sessions` in the case where a
402 * Server_session_dtl::async_accept_log_in() failed as opposed to succeeded (in which case it is emitted to user).
403 */
404 boost::movelib::unique_ptr<flow::async::Single_thread_task_loop> m_incomplete_session_graveyard;
405 }; // struct State
406
407 // Data.
408
409 /// See this_session_srv().
411
412 /// See ctor.
414
415 /// See ctor.
417
418 /// See State.
419 std::optional<State> m_state;
420
421 /// See sub_class_set_deinit_func(). `.empty()` unless that was called at least once.
423}; // class Session_server_impl
424
425// Free functions: in *_fwd.hpp.
426
427// Template implementations.
428
429/// Internally used macro; public API users should disregard (same deal as in struc/channel.hpp).
430#define TEMPLATE_SESSION_SERVER_IMPL \
431 template<typename Session_server_t, typename Server_session_t>
432/// Internally used macro; public API users should disregard (same deal as in struc/channel.hpp).
433#define CLASS_SESSION_SERVER_IMPL \
434 Session_server_impl<Session_server_t, Server_session_t>
435
437template<typename Per_app_setup_func>
438CLASS_SESSION_SERVER_IMPL::Session_server_impl
439 (flow::log::Logger* logger_ptr, Session_server_obj* this_session_srv_arg, const Server_app& srv_app_ref_arg,
440 const Client_app::Master_set& cli_app_master_set_ref, Error_code* err_code,
441 Per_app_setup_func&& per_app_setup_func) :
442
443 flow::log::Log_context(logger_ptr, Log_component::S_SESSION),
444 m_srv_app_ref(srv_app_ref_arg), // Not copied!
445 m_this_session_srv(this_session_srv_arg),
446 m_cli_app_master_set_ref(cli_app_master_set_ref), // Ditto!
447 m_per_app_setup_func(std::move(per_app_setup_func)),
448 m_state(std::in_place) // Default-ct State; initialize contents, further, just below.
449{
452 using flow::error::Runtime_error;
453 using flow::async::reset_this_thread_pinning;
454 using flow::log::Sev;
455 using boost::movelib::make_unique;
456 using boost::system::system_category;
457 using boost::io::ios_all_saver;
458 using fs::ofstream;
459 using Named_sh_mutex = boost::interprocess::named_mutex;
460 using Named_sh_mutex_ptr = boost::movelib::unique_ptr<Named_sh_mutex>;
461 using Sh_lock_guard = boost::interprocess::scoped_lock<Named_sh_mutex>;
462 // using ::errno; // It's a macro apparently.
463
464 // Finish setting up m_state. See State members in order and deal with the ones needing explicit init.
465 m_state->m_last_cli_namespace = 0;
466 m_state->m_incomplete_session_graveyard
467 = boost::movelib::make_unique<flow::async::Single_thread_task_loop>
468 (get_logger(),
469 /* (Linux) OS thread name will truncate the this-addr snippet to 15-5=10 chars here;
470 * which should actually just fit. Nothing else seems particularly useful;
471 * like in non-exotic setups our srv-name is pretty much known. */
472 flow::util::ostream_op_string("SSvG-", this));
473
474 /* This is a (as of this writing -- the) *cleanup point* for any MQs previously created on behalf of this
475 * Server_app by previous active processes before us; namely when either a Server_session or opposing Client_session
476 * performs open_channel() (or pre-opens channel(s) during session creation), so the Server_session_impl
477 * creates the actual `Persistent_mq_handle`s via its ctor in create-only mode. These underlying MQs
478 * are gracefully cleaned up in Blob_stream_mq_send/receiver dtors (see their doc headers). This cleanup point is a
479 * best-effort attempt to clean up anything that was skipped due to one or more such destructors never getting
480 * to run (due to crash, abort, etc.). Note that Blob_stream_mq_send/receiver doc headers explicitly explain
481 * the need to worry about this contingency.
482 *
483 * We simply delete everything with the Shared_name prefix used when setting up the MQs
484 * (see Server_session_impl::make_channel_mqs()). The prefix is everything up-to (not including) the PID
485 * (empty_session.base().srv_namespace() below). Our own .srv_namespace() is
486 * just about to be determined and is unique across time by definition (internally, it's -- again -- our PID);
487 * so any existing MQs are by definition old. Note that as of this writing there is at most *one* active
488 * process (instance) of a given Server_app.
489 *
490 * Subtlety (kind of): We worry about such cleanup only if some type of MQ is in fact enabled at compile-time
491 * of *this* application; and we only clean up that MQ type, not the other(s) (as of this writing there are 2,
492 * but that could change conceivably). If the MQ type is changed (or MQs disabled) after a crash/abort, there
493 * could be a leak. We could also indiscriminately clean-up all known MQ types here; that would be fine.
494 * @todo Maybe we should. I don't know. shm::classic::Session_server ctor's pool cleanup point is only invoked,
495 * if that is the type of Session_server user chose at compile-time, so we are just following that example.
496 * Doing it this way strikes me as cleaner code, and the combination of a crash/abort and changed software
497 * "feels" fairly minor. */
498 if constexpr(Server_session_dtl_obj::Session_base_obj::S_MQS_ENABLED)
499 {
501 using Mq = typename Server_session_dtl_obj::Session_base_obj::Persistent_mq_handle_from_cfg;
502
503 util::remove_each_persistent_with_name_prefix<Blob_stream_mq_base<Mq>>
504 (get_logger(),
505 build_conventional_shared_name_prefix(Mq::S_RESOURCE_TYPE_ID,
507 }
508
509 /* We want to write to CNS (PID file) just below, but what is its location, and the name of its associated
510 * inter-process mutex, and for that matter the contents (formally, the Current Namespace; really the PID)?
511 * Well, this is slightly cheesy, arguably, but any Server_session we produce will need all those values,
512 * and can compute them by itself, and they'll always be the same in this process (app instance), so let's
513 * just make this short-lived dummy Server_session_dtl and get the stuff out of there. Code reuse = pretty good. */
514 const Server_session_dtl_obj empty_session(nullptr, m_srv_app_ref, Native_socket_stream());
515
516 Error_code our_err_code;
517
518 /* Owner/mode discussion:
519 * ipc::session operates in a certain model (design doc is elsewhere/outside our scope here to fully justify)
520 * that requires, for security/safety:
521 * - Owner has the specific UID:GID registered under Server_app. If we are who we are supposed to be,
522 * this will occur automatically as we create a resource. Namely we have two relevant resources in here:
523 * - CNS (PID) file. Some reasons this could fail: if file already existed
524 * *and* was created by someone else; if we have the proper UID but are also in some other group or something;
525 * and lastly Server_app misconfiguration. Mitigation: none. For one of those we could do an owner-change
526 * call to change the group, but for now let's say it's overkill, and actually doing so might hide
527 * a problem in the environment: let's not even risk that stuff.
528 * - Associated shared mutex (in Linux apparently a semaphore thingie). This is an interesting situation;
529 * it is not mentioned in the aforementioned design in detail -- too much of an impl detail for that --
530 * so let's consider it. Should the Server_app UID:GID apply to it too? Actually not quite: the way we use it,
531 * it's a symmetrically shared resource (read/write for both us and client), but more to the point
532 * the *client* is *allowed* to create it (hence OPEN_OR_CREATE both here and in Client_session_impl): it's
533 * accessed in order to get one's turn at accessing the CNS file, and to be "accessed" it must be created
534 * as needed (and then it'll keep existing until reboot). So actually the *GID* should be correct
535 * according to Server_app, but the UID can be, in fact, Server_app's -- or Client_app's, or *another*
536 * Client_app's entirely, or any number of them!
537 * - So should we check something about the mutex's UID/GID then?
538 * - It would probably be not bad to check for the GID. It is even conceivable to check the UID as that
539 * of one of the allowed `Client_app`s.
540 * - I (ygoldfel) feel it's overkill. I could be wrong, but it just feels insane: the mutex is only a way
541 * to access CNS file in orderly fashion without concurrency issues. Either it works, or it doesn't
542 * work; trying to sanity-check that the right UID/GID owns it is going beyond the spirit of the design:
543 * to make ascertain that "certain model" of trust/safety/security. We already do that with CNS itself;
544 * we don't need to be paranoid about the-thing-that-is-needed-to-use-CNS.
545 * - The mode is as dictated by Server_app::m_permissions_level_for_client_apps. This we can and should
546 * ensure via a mode-set/change call. There are subtleties about how to do that, but they're discussed
547 * near the call sites below. As for now: We have two relevant resources in here, again:
548 * - CNS (PID) file. Yes, indeed, we set its permissions below.
549 * - Associated shared mutex. See above. So, actually, we sets its permissions below too. That is actually
550 * (though unmentioned in the design) a pretty good idea for the "certain model" in the design:
551 * If, say, it's set to unrestricted access, then any user could just acquire the lock... and block IPC
552 * from proceeding, ever, for anyone else wishing to work with that Server_app. So, certainly,
553 * it should be locked down to the same extent as CNS itself. What's interesting about that is that
554 * a client, too, can create it (again, see above) -- and thus needs to set some kind of sensible
555 * permissions on creation (if applicable). And so it does... by checking Server_app [sic] as well and
556 * indeed setting that level of permissions. Does it make sense? Yes. Here are the scenarios specifically:
557 * - If the access level in Server_app is set to NO_ACCESS: Well then no one can access it. LoL. Next.
558 * - If UNRESTRICTED: Well then it's meant to be world-accessible! Yay! LoL again! Next.
559 * - If USER_ACCESS: Logically, for CNS to be accessible, if only the server UID is allowed access, then
560 * for it to interoperate with any clients, the clients must also be of that UID. So the client
561 * setting the mode to *its* UID should accomplish the same as if server had done it.
562 * - If GROUP_ACCESS (the approach mandated by the design, though we don't assume it, hence the
563 * Server_app configuration): Same logic but applied to GID.
564 * Anyway, here on the server it's simple; we should set-mode similarly to how we do for CNS.
565 * The bulk of the above long explanation is why the client does it. The comment in Client_session_impl
566 * points back to right here to avoid duplication.
567 *
568 * So let's do that stuff below. */
569
570 /* Ensure our effective user is as configured in Server_app. We do check this value on the CNS (PID) file
571 * anyway; but this is still a good check because:
572 * - We might not be creating the file ourselves (first guy to get to it since boot is).
573 * - It eliminates the need (well, strong word but anyway) to keep checking that for various other
574 * resources created down the line, whether it's MQs or SHM pools or whatever else.
575 * Doing that is possible but:
576 * - annoying: it requires either a descriptor for an ::fstat() or a rather-unportable file-system-path for
577 * anything but actual files;
578 * - super-annoying: it requires deep-inside -- possibly even eventually user-supplied -- modules like
579 * the SHM-jemalloc module to be aware of the desire to even do this in the first place on every shared
580 * resource.
581 *
582 * The idea is: checking it here up-front, plus checking it on the CNS (PID) file (from which all IPC naming
583 * and thus trust, per design, flows), is a nice way to take care of it ahead of all that. It's not perfect:
584 * the effective UID:GID can be changed at runtime. We don't need to be perfect though: the whole
585 * safety/security project, here, is not meant to be some kind of cryptographically powerful guarantee. */
586 const auto own_creds = Process_credentials::own_process_credentials();
587 if ((own_creds.user_id() != m_srv_app_ref.m_user_id) || (own_creds.group_id() != m_srv_app_ref.m_group_id))
588 {
589 FLOW_LOG_WARNING("Session acceptor [" << *this << "]: Creation underway. However, just before writing "
590 "CNS (Current Namespace Store), a/k/a PID file, we determined that "
591 "the `user` aspect of our effective credentials [" << own_creds << "] do not match "
592 "the hard-configured value passed to this ctor: "
593 "[" << m_srv_app_ref.m_user_id << ':' << m_srv_app_ref.m_group_id << "]. "
594 "We cannot proceed, as this would violate the security/safety model of ipc::session. "
595 "Emitting error.");
597 }
598 else // if (own_creds match m_srv_app_ref.m_{user|group}_id)
599 {
600 const auto mutex_name = empty_session.base().cur_ns_store_mutex_absolute_name();
602 const auto cns_path = empty_session.base().cur_ns_store_absolute_path();
605
606 const auto logger_ptr = get_logger();
607 if (logger_ptr && logger_ptr->should_log(Sev::S_INFO, get_log_component()))
608 {
609 ios_all_saver saver{*(logger_ptr->this_thread_ostream())}; // Revert std::oct/etc. soon.
610 FLOW_LOG_INFO_WITHOUT_CHECKING
611 ("Session acceptor [" << *this << "]: Created. Writing CNS (Current Namespace Store), a/k/a PID "
612 "file [" << cns_path << "] (perms "
613 "[" << std::setfill('0')
614 << std::setw(4) // Subtlety: This resets right away after the perms are output...
615 << std::oct << cns_perms.get_permissions() << "], "
616 "shared-mutex name [" << mutex_name << "], shared-mutex perms "
617 "[" << std::setw(4) // ...hence gotta do this again.
618 << mutex_perms.get_permissions() << "]); "
619 "then listening for incoming master socket stream "
620 "connects (through Native_socket_stream_acceptor that was just cted) to address "
621 "based partially on the namespace (PID) written to that file.");
622 }
623
624 /* See Client_session_impl where it, too, creates this sh_mutex for notes equally applicable here.
625 * It reads the file we are about to write and locks the same inter-process mutex accordingly. */
626 Named_sh_mutex_ptr sh_mutex;
628 "Server_session_impl::ctor:named-mutex-open-or-create", [&]()
629 {
630 sh_mutex = make_unique<Named_sh_mutex>(util::OPEN_OR_CREATE, mutex_name.native_str(), mutex_perms);
631 /* Set the permissions as discussed in long comment above. --^
632 * Bonus: All bipc OPEN_OR_CREATE guys take care to ensure permissions are set regardless of umask,
633 * so no need for us to set_resource_permissions() here.
634 *
635 * As for ensuring ownership... skipping as discussed in long comment above. */
636 });
637
638 if (!our_err_code)
639 {
640 Sh_lock_guard sh_lock(*sh_mutex);
641
642 /* Only set permissions if we in fact create CNS (PID) file. Since we use mutex and trust mutex's other
643 * users, we can atomically-enough check whether we create it by pre-checking its existence. To pre-check
644 * its existence use fs::exists(). fs::exists() can yield an error, but we intentionally eat any error
645 * and treat it as-if file does not exist. Whatever issue it was, if any, should get detected via ofstream
646 * opening. Hence, if there's an error, we pre-assume we_created_cns==true, and let the chips where they
647 * may subsequently. (Note this is all a low-probability eventuality.) */
648 Error_code dummy; // Can't just use fs::exists(cns_path), as it might throw an exception (not what we want).
649 const bool we_created_cns = !fs::exists(cns_path, dummy);
650 ofstream cns_file(cns_path);
651 // Make it exist immediately (for the following check). @todo Might be unnecessary. At least it's harmless.
652 cns_file.flush();
653
654 /* Ensure owner is as configured. (Is this redundant, given that we checked UID:GID above? Yes and no:
655 * yes, if we created it; no, if we hadn't. So why not only check if (!we_created_cns)? Answer: paranoia,
656 * sanity checking. We're not gonna do this check down the line for this session (per earlier-explained
657 * decision), so might as well just sanity-check.
658 *
659 * Ideally we'd:
660 * - Pass in an fstream-or-FD-or-similar (we've opened the file after all, so there is one), not a name.
661 * - Use something in C++/C standard library or Boost, not an OS call.
662 *
663 * It's nice to want things. On the FD front we're somewhat screwed; there is a gcc-oriented hack to get the FD,
664 * but it involves protected access and non-standard stuff. Hence we must work from the `path` cns_path.
665 * We've created it, so it really should work, even if it's a little slower or what-not. As for using
666 * a nice library... pass the buck: */
667 ensure_resource_owner_is_app(get_logger(), cns_path, m_srv_app_ref, &our_err_code);
668 if ((!our_err_code) && we_created_cns)
669 {
670 /* The owner check passed; and we just created it. (At this stage fs::exists() having failed somehow is
671 * more-or-less impossible: ensure_resource_owner_is_app() would've failed if so.) So:
672 * Set mode. Again, no great way to get an FD, nor to use the fstream itself. So just: */
673 util::set_resource_permissions(get_logger(), cns_path, cns_perms, &our_err_code);
674 // If it failed, it logged.
675 } // if (we_created_cns && (!(our_err_code [from set_resource_permissions()])))
676
677 if (!our_err_code)
678 {
679 cns_file << empty_session.base().srv_namespace().str() << '\n';
680
681 if (!cns_file.good())
682 {
683 const auto sys_err_code = our_err_code = Error_code(errno, system_category());
684 FLOW_LOG_WARNING("Session acceptor [" << *this << "]: Could not open or write CNS (PID) file "
685 "file [" << cns_path << "]; system error details follow.");
686 FLOW_ERROR_SYS_ERROR_LOG_WARNING(); // Log based on sys_err_code.
687 }
688 // Close file, unlock mutex.
689 } // if (!our_err_code) (from ensure_resource_owner_is_app(), or from set_resource_permissions())
690 // else { It logged. }
691 } // if (!our_err_code) (mutex creation)
692 // else { It logged. }
693
694 sh_mutex.reset(); // Again, see note in Client_session_impl which does the same.
695 } // else if (own_creds match m_srv_app_ref.m_{user|group}_id) // Otherwise our_err_code is truthy.
696
697 if (our_err_code)
698 {
699 if (err_code)
700 {
701 *err_code = our_err_code;
702 return;
703 }
704 // else
705 throw Runtime_error(our_err_code, FLOW_UTIL_WHERE_AM_I_STR());
706 }
707 /* Got here: CNS (PID file) written fine. Now we can listen on the socket stream acceptor derived off that value:
708 * clients will read that file and thus know to connect to that. */
709
710 // This might throw; or emit failure to *err_code; or neither (success).
711 m_state->m_master_sock_acceptor
712 = make_unique<transport::Native_socket_stream_acceptor>
713 (get_logger(),
714 empty_session.base().session_master_socket_stream_acceptor_absolute_name(),
715 err_code);
716
717 // See class doc header. Start this very-idle thread for a bit of corner case work.
718 if (err_code && (!*err_code))
719 {
720 m_state->m_incomplete_session_graveyard->start(reset_this_thread_pinning);
721 // Don't inherit any strange core-affinity! ^-- Worker must float free.
722 }
723 // else { Might as well not start that thread. }
724} // Session_server_impl::Session_server_impl()
725
727CLASS_SESSION_SERVER_IMPL::~Session_server_impl()
728{
729 FLOW_LOG_INFO("Session acceptor [" << *this << "]: Shutting down. The Native_socket_stream_acceptor will now "
730 "shut down, and all outstanding Native_socket_stream_acceptor handlers and Server_session "
731 "handlers shall fire with operation-aborted error codes.");
732 /* We've written our internal async-op handlers in such a way as to get those operation-aborted handler
733 * invocations to automatically occur as our various m_* are destroyed just past this line.
734 * Namely:
735 * - m_master_sock_acceptor dtor runs: fires our handler with its operation-aborted code; we translate it
736 * into the expected operation-aborted code; cool. This occurs with any such pending handlers.
737 * - m_incomplete_sessions dtor runs: Each Server_session_dtl_obj dtor runs: Any pending async_accept_log_in()
738 * emits the expected operation-aborted code; cool.
739 *
740 * Additionally via sub_class_set_deinit_func() we allow for certain final de-init code to be executed, once
741 * all state has been destroyed (including what we just mentioned). Hence force this to occur now: */
742 m_state.reset();
743
744 // Last thing! See sub_class_set_deinit_func() doc header which will eventually lead you to a rationale comment.
745 if (!m_deinit_func_or_empty.empty())
746 {
747 FLOW_LOG_INFO("Session acceptor [" << *this << "]: Continuing shutdown. A sub-class desires final de-init "
748 "work, once everything else is stopped (might be persistent resource cleanup); invoking "
749 "synchronously.");
750 m_deinit_func_or_empty();
751 FLOW_LOG_TRACE("De-init work finished.");
752 }
753} // Session_server_impl::~Session_server_impl()
754
756template<typename Task_err,
757 typename N_init_channels_by_srv_req_func, typename Mdt_load_func>
758void CLASS_SESSION_SERVER_IMPL::async_accept(Server_session_obj* target_session,
759 Channels* init_channels_by_srv_req,
760 Mdt_reader_ptr* mdt_from_cli_or_null,
761 Channels* init_channels_by_cli_req,
762 N_init_channels_by_srv_req_func&& n_init_channels_by_srv_req_func,
763 Mdt_load_func&& mdt_load_func,
764 Task_err&& on_done_handler)
765{
766 using util::String_view;
767 using flow::async::Task_asio_err;
768 using boost::make_shared;
769 using std::to_string;
770 using std::string;
771
772 assert(target_session);
773 assert(m_state->m_master_sock_acceptor && "By contract do not invoke async_accept() if ctor failed.");
774
775 /* We are in thread U or Wa or one of the Ws. U is obviously fine/mainstream. Ws means they invoked us
776 * directly from our own completion handler again; it is fine, since a Ws is started per async_accept();
777 * does not interact with other Ws. Wa means they invoked us directly from our own completion handler
778 * again, albeit only on error would that be from Wa; anyway Native_socket_stream_acceptor allows it. */
779
780 Task_asio_err on_done_func(std::move(on_done_handler));
781 auto sock_stm = make_shared<transport::sync_io::Native_socket_stream>(); // Empty target socket stream.
782 const auto sock_stm_raw = sock_stm.get();
783
784 FLOW_LOG_INFO("Session acceptor [" << *this << "]: Async-accept request: Immediately issuing socket stream "
785 "acceptor async-accept as step 1. If that async-succeeds, we will complete the login asynchronously; "
786 "if that succeeds we will finally emit the ready-to-go session to user via handler.");
787 m_state->m_master_sock_acceptor->async_accept
788 (sock_stm_raw,
789 [this, target_session, init_channels_by_srv_req, mdt_from_cli_or_null,
790 init_channels_by_cli_req,
791 n_init_channels_by_srv_req_func = std::move(n_init_channels_by_srv_req_func),
792 mdt_load_func = std::move(mdt_load_func),
793 sock_stm = std::move(sock_stm),
794 on_done_func = std::move(on_done_func)]
795 (const Error_code& err_code) mutable
796 {
797 // We are in thread Wa (unspecified; really N_s_s_acceptor worker thread).
798
799 if (err_code)
800 {
801 /* Couple cases to consider. 1, could've failed with a true (probably system) error.
802 * 2, it could've failed with operation-aborted which can only occur if m_master_sock_acceptor is destroyed
803 * during the async_accept(), which in turn can only happen if *this is destroyed during it.
804 *
805 * 1 is straightforward; just report the error through user handler, and that's that. As for 2:
806 *
807 * Since we've decided to (as explained in Implementation section of our class doc header) fully piggy-back
808 * all work, including handlers, on our helper objects' (m_master_sock_acceptor, or in step 2 the
809 * Server_session itself) worker threads, we can just report operation-aborted through user handler as well.
810 * Our dtor doesn't then need to worry about (having tracked un-fired user handlers) firing un-fired handlers:
811 * we will just do so by piggy-backing on m_master_sock_acceptor doing so. Code is much simpler then.
812 *
813 * Subtlety: avoid even logging. Logging is safe for 1, but it's not that high in value. I believe
814 * logging is safe in 2, because m_master_sock_acceptor dtor will only return once it has fired the
815 * handler (with operation-aborted), which happens before Log_context super-class is destroyed... but
816 * generally we avoid extra code on operation-aborted everywhere in Flow-IPC and `flow` (that I (ygoldfel)
817 * control anyway), so let's just keep to that. @todo Maybe re-add TRACE logging when not operation-aborted.
818 *
819 * If you change this, please consider synchronizing with the async_accept_log_in() handler below.
820 *
821 * Subtlety: transport:: and session:: have separate operation-aborted codes, and we promised the latter so: */
824 return;
825 } // if (err_code)
826 // else: No problem at this async-stage.
827
828 FLOW_LOG_INFO("Session acceptor [" << *this << "]: Async-accept request: Socket stream "
829 "acceptor async-accept succeeded resulting in socket stream [" << *sock_stm << "]; that was step "
830 "1. Now we create the server session and have it undergo async-login; if that succeeds we will "
831 "finally emit the ready-to-go session to user via handler.");
832
833 Incomplete_session incomplete_session
834 = make_shared<Server_session_dtl_obj>(get_logger(),
835 m_srv_app_ref, // Not copied!
836 // Gobble up the stream obj into the channel in session:
837 std::move(*sock_stm));
838 /* It's important that we save this in *this, so that if *this is destroyed, then *incomplete_session is
839 * destroyed, so that it triggers the handler below with operation-aborted, so that we pass that on to
840 * the user handler as promised. However it is therefore essential that we don't also capture
841 * incomplete_session (the shared_ptr!) in our handler lambda but rather only the observer (weak_ptr) of it!
842 * Otherwise we create a cycle and a memory leak. */
843 {
844 Lock_guard incomplete_sessions_lock(m_state->m_mutex);
845 m_state->m_incomplete_sessions.insert(incomplete_session);
846 }
847
848 // This little hook is required by Server_session_dtl::async_accept_log_in() for Client_app* lookup and related.
849 auto cli_app_lookup_func = [this](String_view cli_app_name) -> const Client_app*
850 {
851 const auto cli_it = m_cli_app_master_set_ref.find(string(cli_app_name));
852 return (cli_it == m_cli_app_master_set_ref.end()) ? static_cast<const Client_app*>(0) : &cli_it->second;
853 };
854
855 /* And this one is required to issue a unique cli-namespace, if all goes well. The "if all goes well" part
856 * is why it's a hook and not just pre-computed and passed-into async_accept_log_in() as an arg. */
857 auto cli_namespace_func = [this]() -> Shared_name
858 {
859 return Shared_name::ct(to_string(++m_state->m_last_cli_namespace));
860 };
861
862 /* Lastly, we are to execute this upon knowing the Client_app and before the log-in response it sent to
863 * opposing Client_session. */
864 auto pre_rsp_setup_func = [this,
865 incomplete_session_observer = Incomplete_session_observer(incomplete_session)]
866 () -> Error_code
867 {
868 // We are in thread Ws (unspecified; really Server_session worker thread).
869
870 // (Could have captured incomplete_session itself, but I'd rather assert() than leak.)
871 auto incomplete_session = incomplete_session_observer.lock();
872 assert(incomplete_session
873 && "The Server_session_dtl_obj cannot be dead (dtor ran), if it's invoking its async handlers OK "
874 "(and thus calling us in its async_accept_log_in() success path.");
875 // If it's not dead, we're not dead either (as us dying is the only reason incomplete_session would die).
876
877 const auto cli_app_ptr = incomplete_session->base().cli_app_ptr();
878 assert(cli_app_ptr && "async_accept_log_in() contract is to call pre_rsp_setup_func() once all "
879 "the basic elements of Session_base are known (including Client_app&).");
880
881 return m_per_app_setup_func(*cli_app_ptr);
882 }; // auto pre_rsp_setup_func =
883
884 /* @todo It occurs to me (ygoldfel) now that I look at it: all these args -- even on_done_func --
885 * could instead be passed into Server_session_dtl ctor; that guy could save them into m_* (as of this writing
886 * it passes them around via lambdas). After all, as of now, async_accept_log_in() is non-retriable.
887 * Pros: much less lambda-capture boiler-plate in there.
888 * Cons: (1) added state can leak (they'd need to worry about maybe clearing that stuff on entry to almost-PEER
889 * state, and possibly on failure); (2) arguably less maintainable (maybe Server_session_dtl may want to
890 * make async_accept() re-triable a-la Client_session_impl::async_connect()?).
891 *
892 * I, personally, have a big bias against non-const m_* state (where it can be avoided reasonably), so I
893 * like that this stuff only stays around via captures, until the relevant async-op is finished. So I prefer
894 * it this way. Still: arguable.
895 *
896 * Anyway. Go! */
897
898 incomplete_session->async_accept_log_in
899 (this,
900 init_channels_by_srv_req, // Our async out-arg for them to set on success (before on_done_func(Error_code())).
901 mdt_from_cli_or_null, // Ditto.
902 init_channels_by_cli_req, // Ditto.
903
904 std::move(cli_app_lookup_func), // Look up Client_app by name and give it to them!
905 std::move(cli_namespace_func), // Generate new per-session namespace and give it to them!
906 std::move(pre_rsp_setup_func), // Set up stuff necessary for this Client_app, especially if 1st occurrence!
907
908 std::move(n_init_channels_by_srv_req_func), // How many channels does our caller want to set up?
909 std::move(mdt_load_func), // Let caller fill out srv->cli metadata!
910
911 [this, incomplete_session_observer = Incomplete_session_observer(incomplete_session),
912 target_session, on_done_func = std::move(on_done_func)]
913 (const Error_code& async_err_code)
914 {
915 // We are in thread Ws (unspecified; really Server_session worker thread).
916
917 auto incomplete_session = incomplete_session_observer.lock();
918 if (incomplete_session)
919 {
920 /* No matter what -- the session is no longer incomplete; either it accepted log-in in OK or not.
921 * Remove it from *this. We'll either forget it (error) or give it to user (otherwise). Anyway remove it. */
922 {
923 Lock_guard incomplete_sessions_lock(m_state->m_mutex);
924#ifndef NDEBUG
925 const bool erased_ok = 1 ==
926#endif
927 m_state->m_incomplete_sessions.erase(incomplete_session);
928 assert(erased_ok && "Who else would have erased it?!");
929 }
930
931 if (async_err_code)
932 {
933 /* See class doc header. We are in thread Ws; letting incomplete_session (the shared_ptr) be destroyed
934 * here (below, in the `if (async_err_code)` clause) would cause it to try
935 * to join thread Ws which would deadlock; we'd be breaking the contract to
936 * never destroy Server_session from its own handler. So hand it off to this very-idle thread to do it
937 * asynchronously. */
938 m_state->m_incomplete_session_graveyard->post([incomplete_session = std::move(incomplete_session)]
939 () mutable
940 {
941 // That's that. ~Server_session() will run here:
942 incomplete_session.reset(); // Just in case compiler wants to warn about unused-var or optimize it away...?
943 });
944 // incomplete_session (the shared_ptr) is hosed at this point.
945 assert((!incomplete_session) && "It should've been nullified by being moved-from into the captures.");
946 } // if (async_err_code)
947 } // if (incomplete_session) (but it may have been nullified inside <=> async_err_code is truthy)
948 else // if (!incomplete_session)
949 {
950 /* Server_session_dtl disappeared under us, because *this is disappearing under us.
951 * Naturally no need to remove it from any m_incomplete_sessions, since that's not a thing.
952 * However this sanity check is worthwhile: */
954 && "The incomplete-session Server_session_dtl can only disappear under us if *this is destroyed "
955 "which can only occur <=> operation-aborted is emitted due to *this destruction destroying that "
956 "incomplete-session.");
957 } // else if (!incomplete_session)
958
959 if (async_err_code)
960 {
961 /* As in the acceptor failure case above just forward it to handler. All the same comments apply,
962 * except there is no subtlety about operation-aborted coming from outside ipc::session.
963 * If you change this, please consider synchronizing with the async_accept() handler above. */
964 on_done_func(async_err_code);
965 return;
966 }
967 // else if (!async_err_code)
968 assert(incomplete_session);
969
970 // Yay! Give it to the user.
971 FLOW_LOG_INFO("Session acceptor [" << *this << "]: Async-accept request: Successfully resulted in logged-in "
972 "server session [" << *incomplete_session << "]. Feeding to user via callback.");
973
974 /* Server_session_dtl *is* Server_session, just with an added public API; now we reinterpret the pointer
975 * and give the user the Server_session without that public API which accesses its internals. */
976 *target_session = std::move(*(static_cast<Server_session_obj*>(incomplete_session.get())));
977 // *incomplete_session is now as-if default-cted.
978
979 on_done_func(async_err_code); // Pass in Error_code().
980 FLOW_LOG_TRACE("Handler finished.");
981 }); // incomplete_session->async_accept_log_in()
982 }); // m_master_sock_acceptor->async_accept()
983} // Session_server_impl::async_accept()
984
986void CLASS_SESSION_SERVER_IMPL::to_ostream(std::ostream* os) const
987{
988 *os << '[' << m_srv_app_ref << "]@" << static_cast<const void*>(this);
989}
990
992typename CLASS_SESSION_SERVER_IMPL::Session_server_obj* CLASS_SESSION_SERVER_IMPL::this_session_srv()
993{
994 return m_this_session_srv;
995}
996
998template<typename Task>
999void CLASS_SESSION_SERVER_IMPL::sub_class_set_deinit_func(Task&& task)
1000{
1001 m_deinit_func_or_empty = std::move(task);
1002}
1003
1005std::ostream& operator<<(std::ostream& os,
1006 const CLASS_SESSION_SERVER_IMPL& val)
1007{
1008 val.to_ostream(&os);
1009 return os;
1010}
1011
1012#undef CLASS_SESSION_SERVER_IMPL
1013#undef TEMPLATE_SESSION_SERVER_IMPL
1014
1015} // namespace ipc::session
This is the data-less sub-class of Server_session or any more-advanced (e.g., SHM-capable) variant th...
const Session_base_obj & base() const
Provides const access to Session_base super-object.
Internal class template comprising API/logic common to every Session_server variant,...
void to_ostream(std::ostream *os) const
See Server_session method.
boost::shared_ptr< Server_session_dtl_obj > Incomplete_session
Internally used ref-counted handle to a Server_session_dtl_obj, suitable for capturing and passing ar...
void sub_class_set_deinit_func(Task &&task)
Utility for sub-classes: ensures that task() is invoked near the end of *this dtor's execution,...
Session_server_t Session_server_obj
See this_session_srv().
boost::unordered_set< Incomplete_session > Incomplete_sessions
Short-hand for set of Incomplete_session, with fast insertion and removal by key Incomplete_session i...
flow::util::Lock_guard< Mutex > Lock_guard
Short-hand for Mutex lock.
void async_accept(Server_session_obj *target_session, Channels *init_channels_by_srv_req, Mdt_reader_ptr *mdt_from_cli_or_null, Channels *init_channels_by_cli_req, N_init_channels_by_srv_req_func &&n_init_channels_by_srv_req_func, Mdt_load_func &&mdt_load_func, Task_err &&on_done_func)
See Session_server method.
const Client_app::Master_set & m_cli_app_master_set_ref
See ctor.
const Server_app & m_srv_app_ref
See Session_server public data member.
typename Server_session_obj::Mdt_reader_ptr Mdt_reader_ptr
Short-hand for Session_mv::Mdt_reader_ptr.
Function< void()> m_deinit_func_or_empty
See sub_class_set_deinit_func(). .empty() unless that was called at least once.
typename Server_session_obj::Channels Channels
Short-hand for Session_mv::Channels.
Session_server_impl(flow::log::Logger *logger_ptr, Session_server_obj *this_session_srv_arg, const Server_app &srv_app_ref, const Client_app::Master_set &cli_app_master_set_ref, Error_code *err_code, Per_app_setup_func &&per_app_setup_func)
See Session_server ctor; it does that.
Session_server_obj *const m_this_session_srv
See this_session_srv().
std::optional< State > m_state
See State.
Session_server_obj * this_session_srv()
Returns pointer to the object that is privately sub-classing us.
const Function< Error_code(const Client_app &client_app)> m_per_app_setup_func
See ctor.
~Session_server_impl()
See Session_server dtor.
boost::weak_ptr< Server_session_dtl_obj > Incomplete_session_observer
weak_ptr observer of an Incomplete_session.
flow::util::Mutex_non_recursive Mutex
Short-hand for State::m_mutex type.
To be instantiated typically once in a given process, an object of this type asynchronously listens f...
Base of Blob_stream_mq_sender and Blob_stream_mq_receiver containing certain static facilities,...
Implements both sync_io::Native_handle_sender and sync_io::Native_handle_receiver concepts by using a...
A process's credentials (PID, UID, GID as of this writing).
String-wrapping abstraction representing a name uniquely distinguishing a kernel-persistent entity fr...
static Shared_name ct(const Source &src)
Copy-constructs from a char-sequence container (including string, util::String_view,...
@ S_RESOURCE_OWNER_UNEXPECTED
A resource in the file system (file, SHM pool, MQ, etc.) has or could have unexpected owner; ipc::ses...
@ S_MUTEX_BIPC_MISC_LIBRARY_ERROR
Low-level boost.ipc.mutex: boost.interprocess emitted miscellaneous library exception sans a system c...
@ S_OBJECT_SHUTDOWN_ABORTED_COMPLETION_HANDLER
Async completion handler is being called prematurely, because underlying object is shutting down,...
Flow-IPC module providing the broad lifecycle and shared-resource organization – via the session conc...
Definition: app.cpp:27
void ensure_resource_owner_is_app(flow::log::Logger *logger_ptr, const fs::path &path, const App &app, Error_code *err_code)
Utility, used internally but exposed in public API in case it is of general use, that checks that the...
Definition: app.cpp:31
Shared_name build_conventional_shared_name_prefix(const Shared_name &resource_type, const Shared_name &srv_app_name)
Return the prefix common to all calls to either build_conventional_shared_name() overload with the ar...
std::ostream & operator<<(std::ostream &os, const App &val)
Prints string representation of the given App to the given ostream.
Definition: app.cpp:124
@ S_OBJECT_SHUTDOWN_ABORTED_COMPLETION_HANDLER
Async completion handler is being called prematurely, because underlying object is shutting down,...
flow::async::Task Task
Short-hand for polymorphic function (a-la std::function<>) that takes no arguments and returns nothin...
Definition: util_fwd.hpp:122
const Open_or_create OPEN_OR_CREATE
Tag value indicating an open-if-exists-else-create operation.
Definition: util.cpp:30
const boost::array< Permissions, size_t(Permissions_level::S_END_SENTINEL)> PRODUCER_CONSUMER_RESOURCE_PERMISSIONS_LVL_MAP
Maps general Permissions_level specifier to low-level Permissions value, when the underlying resource...
Definition: util.cpp:42
void set_resource_permissions(flow::log::Logger *logger_ptr, const fs::path &path, const Permissions &perms, Error_code *err_code)
Utility that sets the permissions of the given resource (at the supplied file system path) to specifi...
Definition: util.cpp:46
void op_with_possible_bipc_exception(flow::log::Logger *logger_ptr, Error_code *err_code, const Error_code &misc_bipc_lib_error, String_view context, const Func &func)
Internal (to ipc) utility that invokes the given function that invokes a boost.interprocess operation...
Definition: util.hpp:34
Permissions shared_resource_permissions(Permissions_level permissions_lvl)
Maps general Permissions_level specifier to low-level Permissions value, when the underlying resource...
Definition: util.cpp:37
flow::util::String_view String_view
Short-hand for Flow's String_view.
Definition: util_fwd.hpp:115
Log_component
The flow::log::Component payload enumeration containing various log components used by Flow-IPC inter...
Definition: common.hpp:323
flow::Error_code Error_code
Short-hand for flow::Error_code which is very common.
Definition: common.hpp:298
flow::Function< Signature > Function
Short-hand for polymorphic functor holder which is very common. This is essentially std::function.
Definition: common.hpp:302
#define TEMPLATE_SESSION_SERVER_IMPL
Internally used macro; public API users should disregard (same deal as in struc/channel....
#define CLASS_SESSION_SERVER_IMPL
Internally used macro; public API users should disregard (same deal as in struc/channel....
util::group_id_t m_group_id
The application must run as this group ID (GID). Files and other shared resources shall have this own...
Definition: app.hpp:171
std::string m_name
Brief application name, readable to humans and unique across all other applications' names; used both...
Definition: app.hpp:144
util::user_id_t m_user_id
The application must run as this user ID (UID). Files and other shared resources shall have this owne...
Definition: app.hpp:168
An App that is used as a client in at least one client-server IPC split.
Definition: app.hpp:185
boost::unordered_map< std::string, Client_app > Master_set
Suggested type for storing master repository or all Client_appss. See App doc header for discussion.
Definition: app.hpp:192
An App that is used as a server in at least one client-server IPC split.
Definition: app.hpp:206
util::Permissions_level m_permissions_level_for_client_apps
Specifies level of access for Client_apps (which must, also, be in m_allowed_client_apps at any rate)...
Definition: app.hpp:288
All internal mutable state of Session_server_impl.
std::atomic< uint64_t > m_last_cli_namespace
The ID used in generating the last Server_session::cli_namespace(); so the next one = this plus 1....
boost::movelib::unique_ptr< transport::Native_socket_stream_acceptor > m_master_sock_acceptor
transport::Native_socket_stream acceptor avail throughout *this to accept init please-open-session re...
Mutex m_mutex
Protects m_incomplete_sessions. See class doc header impl section for discussion of thread design.
Incomplete_sessions m_incomplete_sessions
The set of all Incomplete_session objects such that each one comes from a distinct async_accept() req...
boost::movelib::unique_ptr< flow::async::Single_thread_task_loop > m_incomplete_session_graveyard
Mostly-idle thread that solely destroys objects removed from m_incomplete_sessions in the case where ...