Flow-IPC 1.0.0
Flow-IPC project: Full implementation reference.
session_base.hpp
Go to the documentation of this file.
1/* Flow-IPC: Sessions
2 * Copyright 2023 Akamai Technologies, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the
5 * "License"); you may not use this file except in
6 * compliance with the License. You may obtain a copy
7 * of the License at
8 *
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in
12 * writing, software distributed under the License is
13 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
14 * CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing
16 * permissions and limitations under the License. */
17
18/// @file
19#pragma once
20
23#include "ipc/session/schema/detail/session_master_channel.capnp.h"
24#include "ipc/session/app.hpp"
25#include "ipc/session/error.hpp"
29#include <boost/thread/future.hpp>
30
31namespace ipc::session
32{
33
34// Types.
35
36/**
37 * Internal type containing data and types common to internal types Server_session_impl and Client_session_impl
38 * which are the respective true cores of #Server_session and #Client_session respectively.
39 *
40 * A Server_session_impl and Client_session_impl share many basic properties, some public. For example consider
41 * that a Server_session_impl on this side has 1 counterpart Client_session_impl on the other side (or vice versa); for
42 * both it is salient which Server_app is on the server side and which Client_app is on the client side.
43 * In terms of types/constants, on each side the two Session objects must be identically configured via various
44 * template params which results in non-trivial type aliases like the highly significant #Channel_obj,
45 * exposed in the Session concept, and constants like #S_SOCKET_STREAM_ENABLED.
46 *
47 * Regarding data: an operational (PEER-state) Session (on either end) will need various members to be set
48 * before it is indeed in PEER state. This object stores such data, usually unset at first, and features
49 * `protected` setters to invoke once each, until all are set permanently for PEER state.
50 *
51 * Session_base also has a `protected` inner class, Session_base::Graceful_finisher which certain variations
52 * (for which this is necessary) of Server_session_impl and Client_session_impl use to carry out a
53 * graceful-session-end handshake procedure. This is here in Session_base -- despite being somewhat more
54 * algorithmic/stateful than the other stuff here -- essentially because the Graceful_finisher (if needed at
55 * all) on each side acts completely symmetrically. Other items tend to be (internally) asymmetrical in
56 * behavior between Server_session_impl and Client_session_impl (and variants). More info on
57 * Session_base::Graceful_finisher in its own doc header.
58 *
59 * @tparam Mdt_payload
60 * See #Server_session, #Client_session (or Session concept).
61 * @tparam S_MQ_TYPE_OR_NONE
62 * See #Server_session, #Client_session.
63 * @tparam S_TRANSMIT_NATIVE_HANDLES
64 * See #Server_session, #Client_session.
65 */
66template<schema::MqType S_MQ_TYPE_OR_NONE, bool S_TRANSMIT_NATIVE_HANDLES, typename Mdt_payload>
68 private boost::noncopyable
69{
70public:
71 // Constants.
72
73 /// See Session_mv.
74 static constexpr bool S_MQS_ENABLED = S_MQ_TYPE_OR_NONE != schema::MqType::NONE;
75
76 /// See Session_mv.
77 static constexpr bool S_SOCKET_STREAM_ENABLED = (!S_MQS_ENABLED) || S_TRANSMIT_NATIVE_HANDLES;
78
79 // Types.
80
81 static_assert(S_MQ_TYPE_OR_NONE != schema::MqType::END_SENTINEL,
82 "Do not use the value END_SENTINEL for S_MQ_TYPE_OR_NONE; it is only a sentinel. Did you mean NONE?");
83
84 // Ensure the definitions immediately following are based on correct assumptions.
85 static_assert(std::is_enum_v<schema::MqType>,
86 "Sanity-checking capnp-generated MqType enum (must be an enum).");
87 static_assert(std::is_unsigned_v<std::underlying_type_t<schema::MqType>>,
88 "Sanity-checking capnp-generated MqType enum (backing type must be unsigned).");
89 static_assert((int(schema::MqType::END_SENTINEL) - 1) == 2,
90 "Code apparently must be updated -- expected exactly 2 MqType enum values plus NONE + sentinel.");
91 static_assert((int(schema::MqType::POSIX) != int(schema::MqType::BIPC))
92 && (int(schema::MqType::POSIX) > int(schema::MqType::NONE))
93 && (int(schema::MqType::BIPC) > int(schema::MqType::NONE))
94 && (int(schema::MqType::POSIX) < int(schema::MqType::END_SENTINEL))
95 && (int(schema::MqType::BIPC) < int(schema::MqType::END_SENTINEL)),
96 "Code apparently must be updated -- "
97 "expected exactly 2 particular MqType enum values plus NONE + sentinel.");
98
99 /**
100 * Relevant only if #S_MQS_ENABLED, this is the Persistent_mq_handle-concept impl type specified by
101 * the user via `S_MQ_TYPE_OR_NONE`.
102 */
104 = std::conditional_t<!S_MQS_ENABLED,
106 std::conditional_t<S_MQ_TYPE_OR_NONE == schema::MqType::POSIX,
109
110 /// See Session_mv (or Session concept).
111 using Channel_obj = std::conditional_t<S_MQS_ENABLED,
112 std::conditional_t
113 <S_TRANSMIT_NATIVE_HANDLES,
116 std::conditional_t
117 <S_TRANSMIT_NATIVE_HANDLES,
120
121 /// See Session_mv. Note: If changed from `vector` please update those doc headers too.
122 using Channels = std::vector<Channel_obj>;
123
124 /// See Session_mv (or Session concept).
125 using Mdt_payload_obj = Mdt_payload;
126
127 /// See Session_mv (or Session concept).
130
131 /// See Session_mv (or Session concept).
134
135 /// See Session_mv (or Session concept).
136 using Mdt_builder_ptr = boost::shared_ptr<Mdt_builder>;
137
138 /// See Session_mv (or Session concept).
140
141 /// See Session_mv (or Session concept).
143
144 // Methods.
145
146 /**
147 * See Server_session_impl, Client_session_impl. However in Session_base this value may be not-yet-set (empty), or
148 * set (and immutable from then on).
149 *
150 * This value shall be known and immutable from construction for Server_session_impl, since server namespace -- PID
151 * as of this writing -- can be determined from the start on the server side and applies to every server session
152 * that Session_server produces. Client_session_impl, however, determines
153 * it at the latest possible moment which is at Client_session_impl::async_connect() time, at which point it needs
154 * to determine the PID via PID file.
155 *
156 * @return See above.
157 */
158 const Shared_name& srv_namespace() const;
159
160 /**
161 * See Server_session_impl, Client_session_impl. However in Session_base this value may be not-yet-set (empty), or
162 * set (and immutable from then on).
163 *
164 * This value shall be generated uniquely (within some context) for each new `Server_session` produced by
165 * Session_server; and Client_session_impl shall find that out while logging in (last part of entering PEER
166 * state).
167 *
168 * @return See above.
169 */
170 const Shared_name& cli_namespace() const;
171
172 /**
173 * See Server_session_impl, Client_session_impl. However in Session_base this value may be not-yet-set (null), or set
174 * (and immutable from then on). Note the value to which we refer is the actual pointer. (The Client_app that
175 * is pointed-to, itself, is certainly immutable too.)
176 *
177 * This value shall be set from the start in a Client_session_impl but determined during a given Server_session_impl's
178 * log-in (the opposing Client_session_impl will transmit the Client_app::m_name). The log-in shall complete the
179 * `Server_session`'s entry to PEER state.
180 *
181 * @return See above.
182 */
183 const Client_app* cli_app_ptr() const;
184
185 /**
186 * Computes the name of the interprocess named-mutex used to control reading/writing to the file
187 * storing (written by server, read by client) the value for srv_namespace(). The file
188 * is located as cur_ns_store_absolute_path().
189 *
190 * This may be called anytime.
191 *
192 * @return See above.
193 */
195
196 /**
197 * Computes the absolute path to file storing (written by server, read by client) the value for srv_namespace().
198 * The file is located as cur_ns_store_absolute_path().
199 *
200 * This may be called anytime.
201 *
202 * @return See above.
203 */
204 fs::path cur_ns_store_absolute_path() const;
205
206 /**
207 * Computes the absolute name at which the server shall set up a transport::Native_socket_stream_acceptor
208 * to which client shall transport::Native_socket_stream::sync_connect() in order to establish a PEER-state
209 * session.
210 *
211 * This must be called no earlier than set_srv_namespace(); otherwise behavior undefined (assertion
212 * may trip).
213 *
214 * @return See above.
215 */
217
218 /**
219 * See Session_mv::heap_fixed_builder_config() (1-arg).
220 *
221 * @param logger_ptr
222 * See above.
223 * @return See above.
224 */
225 static Structured_msg_builder_config heap_fixed_builder_config(flow::log::Logger* logger_ptr);
226
227 /**
228 * See Session_mv::heap_reader_config() (1-arg).
229 *
230 * @param logger_ptr
231 * See above.
232 * @return See above.
233 */
234 static Structured_msg_reader_config heap_reader_config(flow::log::Logger* logger_ptr);
235
236 // Data.
237
238 /**
239 * Reference to Server_app (referring to local process in Server_session_impl, opposing process in
240 * Client_session_impl).
241 * This is known from construction and immutable (both the reference, of course, and the Server_app itself).
242 */
244
245protected:
246 // Constants.
247
248 /**
249 * Internal timeout for `open_channel()`.
250 *
251 * ### The value ###
252 * We initially tried something much less generous, 500ms. It worked fine, but some people encountered
253 * the timeout due to unrelated reasons, and it was natural to blame it on the timeout. This much longer
254 * timeout should make it obvious, in such a situation, that it's not about some slowness inside Flow-IPC
255 * but a pathological application problem -- particularly around session-open time. For example not
256 * calling Server_session_impl::init_handlers(), or calling it late, as of this writing can cause issues with this.
257 *
258 * The downside is it makes Session::open_channel() potentially blocking formally speaking, whereas 500ms
259 * could still claim to be non-blocking. It's a matter of perspective really. This value just seems to
260 * cause less confusion. We might reconsider the whole thing however.
261 */
262 static constexpr util::Fine_duration S_OPEN_CHANNEL_TIMEOUT = boost::chrono::seconds(60);
263
264 /**
265 * The max sendable MQ message size as decided by Server_session_impl::make_channel_mqs() (and imposed on both sides,
266 * both directions), if #S_MQS_ENABLED *and* Server_session_impl::S_SHM_ENABLED is `false`, when a channel is opened
267 * (regardless of which side did the active-open or requested pre-opening at session start).
268 * If `*this` belongs to Server_session_impl, that's what this is.
269 * If it belongs to Client_session_impl, then this is what the opposing process -- if they're using the same code! --
270 * will have decided.
271 *
272 * Our own heap_fixed_builder_config(), forwarded to Session_mv::heap_fixed_builder_config(), similarly uses
273 * this constant in a matching way.
274 *
275 * @note If Server_session_impl::S_SHM_ENABLED is `true`, then a different (much smaller) MQ message size limit
276 * is configured. In that case, also, heap_fixed_builder_config() is not relevant and should not be used.
277 *
278 * While it looks simple, there is a number of subtleties one must understand if *ever* considering changing it.
279 *
280 * ### bipc versus POSIX MQs ###
281 * As of this writing the same constant value is used for both types of MQ configurable. The actual value is
282 * chosen due to a certain aspect of POSIX MQs (discussed just below); and we reuse it for bipc, though we absolutely
283 * do not have to, for simplicity/for lack of better ideas at the moment. It would be possible to bifurcate those
284 * two cases if really desired.
285 *
286 * ### Why 8Ki? ###
287 * By default in Linux POSIX MQs this happens to be the actual limit for # of unread messages --
288 * visible in /proc/sys/fs/mqueue/msgsize_max -- so we cannot go higher typically. However that file can be modified.
289 * For now we assume a typical environment; or at least that it will not go *below* this typical default.
290 * If did try a higher number here, opening of MQs by server will likely emit an error and refuse
291 * (Server_session_impl::make_channel_mqs()).
292 *
293 * ### Things to consider if changing the value away from the above ###
294 * - Contemplate why you're doing it. bipc MQs are seen (in Boost source) to be a simple zero-copy data structure
295 * in an internally maintained kernel-persistent SHM pool; while I (ygoldfel) haven't verified via kernal source,
296 * likely Linux POSIX MQ impl is something very similar (reasoning omitted but trust me). So copy perf should
297 * not be a factor; only RAM use and the functional ability to transmit messages of a certain size.
298 * - If the plan is to use `transport::struc::Heap_fixed_builder`-backed for structured messages
299 * (via transport::struc::Channel), then the max size is quite important: if a *leaf* in your message exceeds
300 * this size when serialized, it is a fatal error.
301 * - If, however, the plan is to use SHM-backing (e.g., via `shm::classic::*_session`), then this constant
302 * does not get used (a much smaller value does -- so small it would be ~meaningless to decrease it).
303 * - Suppose you have changed this value, and suppose the `Heap_fixed_builder`-based use case *does* matter.
304 * If ::ipc has not yet been released in production, ever, then it's fine (assuming, that is, it'll work
305 * in the first place given the aforementioned `/proc/sys/...` limit for POSIX MQs). If it *has* been
306 * released then there is an annoying subtlety to consider:
307 * - If you can guarantee (via release process) that the client and server will always use the same ::ipc software,
308 * then you're still fine. Just change this value; done. Otherwise though:
309 * - There's the unfortunate caveat that is Session_base::heap_fixed_builder_config(). There you will note
310 * it says that the *server* decides (for both sides) what this value is. In that case that method
311 * will be correct in the server process; but if the client process is speaking to a different version
312 * of the server, with a different value for #S_MQS_MAX_MSG_SZ, then that is a potential bug.
313 * - Therefore it would be advisable to not mess with it (again... once a production version is out there).
314 * If you *do* mess with it, there are ways to ensure it all works out anyway: logic could be added
315 * wherein the client specifies its own #S_MQS_MAX_MSG_SZ when issuing an open-channel request,
316 * and the server must honor it in its Server_session_impl::make_channel_mqs(). So it can be done --
317 * just know that in that case you'll have to actually add such logic; or somewhat break
318 * heap_fixed_builder_config(). The reason I (ygoldfel) have not done this already is it seems unlikely
319 * (for various reasons listed above) that tweaking this value is of much practical value.
320 */
321 static constexpr size_t S_MQS_MAX_MSG_SZ = 8 * 1024;
322
323 // Types.
324
325 /**
326 * The (internally used) session master channel is a transport::struc::Channel of this concrete type.
327 *
328 * Rationale for the chosen knob values:
329 * - To be able to open_channel() (and hence passive-open) a #Channel_obj that can transmit native handles,
330 * certainly a handles pipe is required to transmit half of each socket-pair. That said, an `Mqs_channel`
331 * would work fine if #S_SOCKET_STREAM_ENABLED is `false`. The real reason at least a handles pipe is
332 * required is that to establish this channel -- unlike subsequent channels in the session -- a client-server
333 * flow is required. This connection establishment is not possible with MQs, so of the available options
334 * only a socket stream would work.
335 * - There is no bulky data transfers over this channel, so adding
336 * a parallel MQ-based blobs pipe is overkill even if it could help performance; more so since the
337 * master channel is unlikely to be used frequently (open_channel() on either side shouldn't be that frequent).
338 * Since, again, the minute differences in perf versus a Unix-domain-socket-based transport are unlikely to
339 * be significant in this use case, it is easier to use a socket stream which lacks any kernel-persistent
340 * cleanup considerations -- or even just distinct naming considerations for that matter.
341 * - So: use a transport::Socket_stream_channel.
342 * - Use the specially tailored (internal) session master channel capnp schema, whose key messages cover (at least):
343 * - session log-in at the start;
344 * - open-channel request/response.
345 * - As of this writing, for serializing/deserializing, it's either the heap-allocating engine on either side,
346 * or it's something SHM-based (which could be faster). However at least some SHM-based builders/readers
347 * themselves (internally) require a transport::Channel to function, which would require a Session typically,
348 * creating a cyclical feature dependency. Plus, SHM has cleanup considerations. So a heap-based engine
349 * on either side is the natural choice. The cost is some perf: one copies from heap into the session master
350 * transport; and from there into heap on the other side. In this context that's not a significant perf loss.
351 */
354 schema::detail::SessionMasterChannelMessageBody
356
357 /**
358 * Handle to #Master_structured_channel.
359 *
360 * ### Rationale for type chosen ###
361 * It's a handle at all, because at first and possibly later there may be no session master channel, so a null
362 * value is useful. (Though, `std::optional` could be used instead.) It's a ref-counted pointer as opposed
363 * to `unique_ptr` so that it can be observed via #Master_structured_channel_observer (`weak_ptr`) which is not
364 * (and cannot) be available for `unique_ptr`. The observing is needed tactically for certain async lambda needs.
365 */
366 using Master_structured_channel_ptr = boost::shared_ptr<Master_structured_channel>;
367
368 /// Observer of #Master_structured_channel_ptr. See its doc header.
369 using Master_structured_channel_observer = boost::weak_ptr<Master_structured_channel>;
370
371 /// Concrete function type for the on-passive-open handler (if any), used for storage.
373 Mdt_reader_ptr&& new_channel_mdt)>;
374
375 /**
376 * Optional to use by subclasses, this operates a simple state machine that carries out a graceful-session-end
377 * handshake procedure. A particular Client_session_impl and symmetrically its opposing Server_session_impl
378 * shall instantiate a `*this` internally and invoke its methods on certain events, as described in their
379 * contracts.
380 *
381 * Now we explain what we're solving (Rationale); then how we solve it (Design). The latter is much simpler,
382 * we think, to understand then the former. Hence apologies in advance for the length of "Rationale" section.
383 *
384 * ### Rationale ###
385 * Consider a particular Client_session_impl + Server_session_impl class/object pair, A and B. In this case --
386 * which is somewhat unusual (relative to most internal `Session` algorithms) but in a good, simplifying way --
387 * it does not matter which is A and which is B. So let's say A is us and B is the opposing guy.
388 *
389 * Also, for now, assume these are just the vanilla ones: session::Client_session_impl and
390 * session::Server_session_impl; not SHM-enabled -- no SHM arenas in play.
391 *
392 * Consider what happens when A dtor is invoked, in PEER state. The session ends, and the session-end trigger is us.
393 * Without Graceful_finisher what happens is as follows. The #Master_structured_channel in `*this` is closed;
394 * and then `*this` goes away altogether right away/synchronously. Soon B's #Master_structured_channel
395 * throws the graceful-close error; B itself correctly interprets this as a session-end trigger; and B throws
396 * an error via the `Session` error handler. Now, the user is told by docs they informally *should* destroy
397 * the B (invoke its dtor) ASAP, but nothing is formally forcing them to; it can sit around. It will never
398 * accept channel passive-opens; `open_channel()` will no-op / return null; etc. It is useless, but it can stay
399 * around. Eventually B dtor will be invoked by user; it'll basically clean up data structures, and that's that.
400 *
401 * There is no problem there: A dtor and B dtor can run at quite different times, or more precisely B dtor
402 * can run much later than A dtor which triggered session-end. Nothing bad happens.
403 *
404 * Now we'll discuss this in specific terms around actual `Client/Server_session_impl` variants that do exist
405 * at this time. We could try to be formal and general, but it's easier to just be specific for exposition
406 * purposes.
407 *
408 * Take, now, the shm::classic::Client_session_impl shm::classic::Server_session_impl variants. The above
409 * generally holds, but there are now SHM arenas in play. We can forget about the app-scope arenas right off
410 * the bat: That guy stays around as long as the `Session_server` does, and that guy is formally required
411 * to be around until all child `Session`s have been destroyed. So only session-scope arenas -- the ones
412 * inside A and B -- are relevant. In fact, for SHM-classic there really is only *one* arena: A holds a
413 * handle to it (`a.session_shm()` points to it), and B does so symmetrically.
414 *
415 * Now reconsider the scenario above: A dtor runs. (Insert vanilla `*_session_impl` text from above here.)
416 * Also, though, A dtor will remove the underlying SHM-pool by its name. (This is possibly a lie. This is
417 * technically so, only if A is a `Server_session_impl`; we've assigned the task to the server side as of this
418 * writing. But this technicality is irrelevant: unlinking the SHM-pool doesn't blow up the other side's
419 * access to the arena; it remains in RAM until that guy closes. So just bear with us here.) Then indeed
420 * B dtor can run sometime later. Is there a problem? For SHM-classic, which is what we're discussing, no:
421 * The arena -- 1 shared SHM-pool in this case -- stays alive, until B's handle gets closed by B dtor.
422 * What about allocations/deallocations? No problem there either: By contract, the A user *must* drop all
423 * cross-process `shared_ptr` handles to objects that it owns, before invoking A dtor; so A is fine, and the
424 * internal cross-process ref-counts for each A-held object get decremented by 1, due to the `shared_ptr` handles
425 * being nullified by the user by contract. (For context: The guided manual `session_app_org` page describes
426 * to the reader how to structure their program, so that this is guaranteed. In short, declare the object
427 * handle data members in their code after the `Session` data member, so they get deinitialized in the reverse
428 * order, in the user's hypothetical recommended `App_session` class.) Isn't there a problem in B, though? No:
429 * the B user shall drop *its* `shared_ptr` handles, before invoking B dtor; this will drop their internal
430 * cross-process ref-counts to 0; at which point shm::Pool_arena logic will deallocate those objects. Deallocation
431 * is entirely distributed between A and B; in this scenario B's handles just happen to live longer and therefore
432 * happen to be the ones triggering the deallocations -- by the B process itself, synchronously.
433 * A is not "in charge" of deallocations of anything in any special way; it doesn't matter which process originally
434 * allocated a given object either. That's the beauty/simplicity of SHM-classic. (There are of course trade-offs
435 * in play; this is worse for segregation/safety... see Manual and such.)
436 *
437 * Now we come to the point. Consider session::shm::arena_lend::jemalloc::Client_session_impl and
438 * `Server_session_impl`. There *is* a problem. Interestingly this time it really does *not* matter whether
439 * A is server or client-side. In SHM-jemalloc the session-scope arenas are not 1 but 2:
440 * A has an arena, from which process A can allocate, and in which process A shall deallocate.
441 * B has an arena, from which process B can allocate, and in which process B shall deallocate.
442 * If B borrows object X from A, and is later done with it -- the `shared_ptr` local ref-count reached 0 --
443 * SHM-jemalloc-arranged `shared_ptr` deleter will send an internal message over a special channel to A.
444 * A will deallocate X once that has occurred, *and* any A-held `shared_ptr` group for X has also reached
445 * ref-count 0 (which may have happened before, or it may happen later). Exactly the same is true of A borrowing
446 * from B, conversely/symmetrically.
447 *
448 * The problem is this: Suppose B holds a borrowed handle X (to an A-allocated object). Now A dtor runs;
449 * its `.session_shm()` arena -- its constitutent SHM-pools! -- is destroyed. That in itself shouldn't be
450 * a problem; again B presumably holds a SHM-pool handle, so it won't just disappear from RAM under it.
451 * I (ygoldfel) am somewhat fuzzy on what exactly happens (not being the direct author of SHM-jemalloc), but
452 * basically as I understand it, in destroying the arena, a bunch of jemalloc deinit steps execute; perhaps
453 * heap-marker data structures are modified all over the place... it's chaos. The bottom line is:
454 * A arena gets mangled, so the B-held borrowed handle X has a high chance of pointing, now, to garbage:
455 * Not unmapped garbage; not into a SHM-pool that's magically gone -- it's still there -- so there's not necessarily
456 * a SEGV as a result; just garbage in the ~sense of accessing `free()`d (not un-`mmap()`ped) memory.
457 *
458 * (Update: I've spoken to the SHM-jemalloc author echan, and the above is basically right with a couple
459 * clarifications. It's not so much that the arena object being destroyed is the issue; in fact that in itself would
460 * be handled gracefully by SHM-jemalloc innards; they will keep the arena around, even if the outer object goes away
461 * (or more precisely as of this writing the arena-handle -- a `shared_ptr` -- does), while it knows things are
462 * pointing into it, even borrowed things in another process. Really it's the fact that we kill the *channel*
463 * it uses to communicate with the borrower process: the arena is told there's nothing out there pointing into
464 * it: locally the user has to have dropped all constructed-object handles; and the borrower process it no
465 * longer has a way of contacting it; so it assumes tgat guy's done/gone, as are all the borrowed handles in it.
466 * That clarification doesn't change the situation though: Whether it's the SHM-jemalloc-session, or the
467 * SHM-jemalloc-session-used-channel, or the SHM-jemalloc-arena going away that makes borrowed objects unusable --
468 * they are unusable, is the bottom line.)
469 *
470 * All that is to say: A dtor runs; and user handle to object X in B instantly becomes unusable.
471 * For example, I (ygoldfel) have observed a simple thing: A SHM-jemalloc-backed transport::struc::Msg_in received
472 * from A, in B, was absolutely fine. Then it was running a long tight-loop verification of its contents -- verifying
473 * hashes of millions of X-pointed objects in SHM, preventing B dtor from running. Meanwhile, A in a test program
474 * had nothing more to do and ran A dtor (closed session). Suddenly, the hash-verifier code in the tight loop
475 * was hitting hash mismatches, throwing capnp exceptions in trying to traverse the message, and so on.
476 *
477 * What this means, ultimately, is straightforward: A dtor must not destroy its SHM-jemalloc arena
478 * (as accessible normally through `.session_shm()`), until not just *A* user has dropped all its object
479 * `shared_ptr` handles; but *B* user -- that's in the other process, B -- has as well! How can we enforce
480 * this, though? One approach is to just put it on the user: Tell them that they must design their protocol
481 * in such a way as to arrange some kind of handshake, as appropriate, beyond which each side knows for a fact
482 * that the session objects, and therefore arenas, are safe to destroy.
483 *
484 * We don't want to do that for obvious reasons. What we already tell them should be sufficient: If you get
485 * a session-hosing error handler execution in your `Session`, then don't use that `Session`, and you should
486 * destroy it ASAP, for it is useless, and the other side's `Session` is not long for this life. If they
487 * listen to this, then A dtor will soon be followed by B dtor anyway. It might be delayed by some long
488 * tight-loop operation such as the test-program hash-verifying scenario above, but generally users avoid such
489 * things; and when they don't they can expect other things to get blocked, possibly in the opposing process.
490 * Subjectively, weird blocking -- caused by the other side acting pathologically -- is a lot more acceptable
491 * at session-start or session-end than something like that occuring mid-session.
492 *
493 * So what's the exact proposal? It's this: If A dtor begins executing, it must first block until it knows
494 * B dtor has begun executing; but once it does know that, it knows that all B-held object handles have been
495 * dropped -- which is what we want. (It would be possible for B user to indicate it some other way -- another
496 * API call, short of destruction -- but that seems unnecessarily complex/precious. If the session is finished,
497 * then there's no reason to keep the `Session` object around = the overall policy we've pursued so far, so why
498 * make exceptions for this?)
499 *
500 * To be clear, this means certain behavior by B user can cause A dtor to block. We can explain this in the docs,
501 * of course, but perhaps much more effective is to make it clear in the logs: say the wait begins here and what
502 * it'll take for it to end; and say when it has ended and why.
503 *
504 * Also to be clear, Graceful_finisher should be used only in `*_session_impl` pairs that actually have this
505 * problem; so of the 3 pairs we've discussed above -- only SHM-jemalloc's ones. The others can completely
506 * forego all this.
507 *
508 * @todo Consider how to avoid having SHM-jemalloc ipc::session mechanism require one side's `Session` dtor
509 * await (and possibly block) the start of the opposing side's `Session` dtor, before it can proceed. The
510 * reason it does this today is above this to-do in the code. The blocking is not too bad -- if the user's
511 * applications are properly written, it will not occur, and the existing logging should make clear why it
512 * happens, if it happens; but it's still not ideal. There are a couple known mitigating approaches,
513 * and at least 1 ticket covering them is likely filed. (Either way the Graceful_finisher mechanism could
514 * then be removed probably.) To summarize them: 1, there can be a mechanism deep inside SHM-jemalloc code that gives
515 * (arbitrarily long, configurable) grace period for known borrowed objects whose session channels
516 * have closed before those objects were returned; this was echan's idea on this topic. Once the grace period
517 * is reached, it would act as-if they got returned then (so in the above scenario the arena could get
518 * jemalloc-deinitialized and all). If program needs to exit, it could either block until the end of the
519 * grace period or... who knows? 2, something similar could be done about the SHM-internal-use channel:
520 * do not close it but add it to some list of such channels; they would be finally closed upon detection of
521 * the other side's `Session` dtor being reached, in some background thread somewhere. (On 2nd thought
522 * this sounds a lot like Graceful_finisher -- but for this particular internal-use channel only. The code
523 * here would only get more complex, thought maybe not too much more. However, it would resolve the observed
524 * `Session`-dtor-blocking user-visible issue, at least until the delayed-channel-death process had to exit
525 * entirely.) Beyond these basic ideas (which could even perhaps be combined) this requires some thinking;
526 * it is an interesting problem. In the meantime the dtor-cross-process-barrier-of-sorts existing solution is
527 * pretty decent.
528 *
529 * ### Design / how to use ###
530 * How to use: Subclass shall create a `*this` if and only if the feature is required; it shall call its methods
531 * and ctor on certain events, as prescribed in their doc headers.
532 *
533 * Design: All this assumes PEER state. Now:
534 *
535 * Again let's say we're A, and they're B -- and just like we are. Our goal is to ensure A dtor, at its
536 * start, blocks until B dtor has started executing; and to give their side the information necessary to do the
537 * same. That's on_dtor_start(): the method that'll do the blocking; when it returns the dtor of subclass can
538 * proceed. So what's needed for on_dtor_start() to `return`? One, A dtor has to have started -- but that's true
539 * by the contract of when the method is called; two, B dtor has to have started. How do we get informed of the
540 * latter? Answer: B sends a special message along the #Master_structured_channel, from its own on_dtor_start().
541 * Naturally, we do the same -- in full-duplex fashion in a sense -- so that they can use the same logic. So then
542 * the steps in on_dtor_start() are:
543 * -# Send `GracefulSessionEnd` message to B, if possible (#Master_structured_channel is up, and the send
544 * op doesn't return failure).
545 * -# Perform `m_opposing_session_done.get_future().wait()`, where `m_opposing_session_done` is a `promise<void>`
546 * that shall be fulfilled on these events (and only if it hasn't already been fulfilled before):
547 * - The #Master_structured_channel has received `GracefulSessionEnd` *from* B.
548 * - The #Master_structured_channel has emitted a channel-hosing error.
549 *
550 * Notes:
551 * - The mainstream case is receiving `GracefulSessionEnd`: In normal operation, with no acts of god, we will no
552 * longer destroy the session-master-channel, until returning from on_dtor_start().
553 * - This could happen after the `.wait()` starts -- meaning A dtor indeed ran before B dtor (our side is
554 * the one triggering session-end).
555 * - It could just as easily happen *before* the `.wait()` starts -- meaning the reverse. Then `.wait()`
556 * will return immediately.
557 * - Of course who knows what's going on -- the #Master_structured_channel could still go down without
558 * the `GracefulSessionEnd`. Is it really right to treat this the same, and let on_dtor_start() return?
559 * Answer: Yes. Firstly, there's hardly any other choice: the channel's dead; there's no other way of knowing
560 * what to wait for; and we can't just sit there cluelessly. But more to the point, this means it's a
561 * *not*-graceful scenario: Graceful_finisher can't hope to deal with it and can only proceed heuristically.
562 * This is not networking; so we should have a pretty good level of predictability at any rate.
563 *
564 * Last but not least consider that by delaying the destruction of the #Master_structured_channel until after
565 * the dtor has started (assuming no channel-hosing acts of god), we've changed something: The error handler shall
566 * no longer inform the user of a session-end trigger from the other side! Left alone this way, we've broken
567 * the whole system by introducing a chicken-egg paradox: We don't let our user be informed of the session-end
568 * trigger when would normally, so they won't call our dtor; but because they won't call our dtor, we'll never
569 * reach the point where the channel would get hosed and we'd know the inform the user. How to fix this?
570 * The answer is pretty obvious: Receiving the `GracefulSessionEnd` shall now trigger a special graceful-session-end
571 * error. Naturally this would only be usefully emitted if we're not in the dtor yet.
572 * So the latter situation should kick-off the user invoking our dtor sometime soon, hopefully; we'll send our
573 * `GracefulSessionEnd` to them; and so on.
574 */
576 public flow::log::Log_context,
577 private boost::noncopyable
578 {
579 public:
580 // Constructors/destructor.
581
582 /**
583 * You must invoke this ctor (instantiate us) if and only if synchronized dtor execution is indeed required;
584 * and `*this_session` has just reached PEER state. Invoke from thread W only.
585 *
586 * ### How `master_channel` shall be touched by `*this` ###
587 * We're being very explicit about this, since there's some inter-class private-state sharing going on...
588 * generally not the most stable or stylistically-amazing situation....
589 * - This ctor will `.expect_msg(GRACEFUL_SESSION_END)`, so we can detect its arrival and mark it down as needed
590 * and possibly invoke `this_session->hose()` to report it to the user.
591 * Hence there's no method you'll need to call nor any setup like this needed on your part.
592 * - on_dtor_start() will attempt to send `GracefulSessionEnd` to the opposing Graceful_finisher.
593 *
594 * @param logger_ptr
595 * Logger to use for logging subsequently. (Maybe `Session_base` should just subclass `Log_context`
596 * for all os us? Add to-do?)
597 * @param this_session
598 * The containing Session_base.
599 * @param async_worker
600 * The thread W of the containing `*_session_impl`. `GracefulSessionEnd` handler is at least partially
601 * invoked there (`hose(...)`); and on_dtor_start() posts onto it.
602 * @param master_channel
603 * The containing `Session` master channel. The pointee must exist until `*this` Graceful_finisher
604 * is gone.
605 */
606 explicit Graceful_finisher(flow::log::Logger* logger_ptr, Session_base* this_session,
607 flow::async::Single_thread_task_loop* async_worker,
608 Master_structured_channel* master_channel);
609
610 // Methods.
611
612 /**
613 * Must be invoked if the `*_session_impl` detects that the master channel has emitted channel-hosing error.
614 *
615 * Invoke from thread W.
616 */
618
619 /**
620 * The reason Graceful_finisher exists, this method must be called at the start of `*_session_impl` dtor; and
621 * will block until it is appropriate to let that dtor proceed to shut down the `*_session_impl`.
622 *
623 * Invoke from thread U, not thread W.
624 */
625 void on_dtor_start();
626
627 private:
628 // Data.
629
630 /// The containing Session_base. It shall exist until `*this` is gone.
632
633 /// The containing `Session` thread W loop. It shall exist until `*this` is gone.
634 flow::async::Single_thread_task_loop* m_async_worker;
635
636 /// The containing `Session` master channel. It shall exist until `*this` is gone.
638
639 /**
640 * A promise whose fulfillment is a necessary and sufficient condition for on_dtor_start() returning
641 * (letting `Session` dtor complete).
642 *
643 * It is fulfilled once the #Master_structured_channel receives `GracefulSessionEnd` from opposing side
644 * (indicating the opposing on_dtor_start() has started) or got hosed (indicating we'll now never know this
645 * and must act as-if opposing on_dtor_start() has started). See "Design" in our doc header.
646 */
647 boost::promise<void> m_opposing_session_done;
648 }; // class Graceful_finisher
649
650 // Constructors.
651
652 /**
653 * Constructs: Client_session_impl form (the user is the one constructing the object, though in NULL state).
654 * The values taken as args are set permanently (undefined behavior/assertion may trip if an attempt is made to
655 * modify one via mutator). The remaining values must be set via mutator before PEER state.
656 *
657 * @param cli_app_ref
658 * See cli_app_ptr().
659 * @param srv_app_ref
660 * See #m_srv_app_ref.
661 * @param on_err_func
662 * On-error handler from user.
663 * @param on_passive_open_channel_func_or_empty_arg
664 * On-passive-open handler from user (empty if user wishes the disable passive-opens on this side).
665 */
666 explicit Session_base(const Client_app& cli_app_ref, const Server_app& srv_app_ref,
667 flow::async::Task_asio_err&& on_err_func,
668 On_passive_open_channel_func&& on_passive_open_channel_func_or_empty_arg);
669
670 /**
671 * Constructs: Server_session_impl form (Session_server is the one constructing the object, though in NULL state,
672 * before log-in has completed, but after the socket-stream connection has been established).
673 * The values taken as args are set permanently (undefined behavior/assertion may trip if an attempt is made to
674 * modify one via mutator). The remaining values must be set via mutator before PEER state.
675 *
676 * @param srv_app_ref
677 * See #m_srv_app_ref.
678 */
679 explicit Session_base(const Server_app& srv_app_ref);
680
681 // Methods.
682
683 /**
684 * Sets srv_namespace() (do not call if already set).
685 * @param srv_namespace_new
686 * Value.
687 */
688 void set_srv_namespace(Shared_name&& srv_namespace_new);
689
690 /**
691 * Sets cli_namespace() (do not call if already set).
692 * @param cli_namespace_new
693 * Value.
694 */
695 void set_cli_namespace(Shared_name&& cli_namespace_new);
696
697 /**
698 * Sets cli_app_ptr() (do not call if already set).
699 * @param cli_app_ptr_new
700 * Value.
701 */
702 void set_cli_app_ptr(const Client_app* cli_app_ptr_new);
703
704 /**
705 * Sets on_passive_open_channel_func_or_empty() (do not call if already set; do not call if user intends for
706 * passive-opens to be disabled on this side).
707 *
708 * @param on_passive_open_channel_func
709 * Value.
710 */
711 void set_on_passive_open_channel_func(On_passive_open_channel_func&& on_passive_open_channel_func);
712
713 /**
714 * Sets on_err_func() (do not call if already set).
715 *
716 * @param on_err_func_arg
717 * Value.
718 */
719 void set_on_err_func(flow::async::Task_asio_err&& on_err_func_arg);
720
721 /**
722 * Returns `true` if and only if set_on_err_func() has been called.
723 * @return See above.
724 */
725 bool on_err_func_set() const;
726
727 /**
728 * The on-passive-open handler (may be empty even in final state, meaning user wants passive-opens disabled
729 * on this side).
730 * @return See above.
731 */
733
734 /**
735 * Marks this session as hosed for (truthy) reason `err_code`; and *synchronously* invokes on-error handler;
736 * only invoke if not already hosed().
737 *
738 * This utility is important and has certain pre-conditions (behavior undefined if not met; assertion may trip):
739 * - `*this` must be in PEER state. In particular on_err_func_set() must return `true`.
740 * - hosed() must return `false`.
741 * - `err_code` must be truthy (non-success).
742 * - This must be invoked from a thread such that it is OK to *synchronously* invoke on-error handler.
743 * As of this writing this is Server_session_impl's or Client_session_impl's thread W in practice, which is how
744 * thread safety of hose() versus hosed() is also guaranteed by those users.
745 *
746 * @param err_code
747 * Truthy error.
748 */
749 void hose(const Error_code& err_code);
750
751 /**
752 * Returns `true` if and only if hose() has been called. If so then #m_on_err_func has been executed already.
753 * `*this` must be in PEER state. In particular on_err_func_set() must return `true`.
754 *
755 * @return Ditto.
756 */
757 bool hosed() const;
758
759private:
760 // Data.
761
762 /**
763 * See cli_app_ptr().
764 *
765 * ### Rationale for it being `atomic<>` ###
766 * It is for the following specific reason. Consider `ostream<<` of `*this`. `*this` is really either
767 * in Client_session_impl or Server_session_impl. In both cases these `ostream<<`s access only #m_srv_app_ref, which
768 * is immutable throughout (so no problem there) and #m_cli_app_ptr. For Client_session_impl #m_cli_app_ptr is
769 * also immutable throughout (so no problem there). For Server_session_impl however it does change in one spot:
770 * Server_session_impl::async_accept_log_in() internal async handler for the log-in request shall, on success,
771 * call set_cli_app_ptr() and change it from null to non-null. This could cause concurrent access to the
772 * data member, even though it's a mere pointer (but we don't count on the alleged "atomicity" of this; it is
773 * generally considered not safe).
774 *
775 * Now, as of this writing, there's exactly one spot where `ostream<<` could be invoked from thread U while
776 * it is being assigned in thread W: `async_accept_log_in()` is called by our internal code in
777 * Session_server and only once; the only thing that can occur in thread U until the log-in response
778 * handler is executed is that Server_session_impl dtor is called. Before that dtor stops that thread W,
779 * it does print the Server_session_impl once as of this writing. Therefore, out of sheer caution, this guy
780 * is `atomic<>`. That said there could be other such invocations, as code might change during maintenance
781 * in the future, in which case this `atomic<>`ness will quietly come in handy.
782 */
783 std::atomic<const Client_app*> m_cli_app_ptr;
784
785 /// See srv_namespace().
787
788 /// See cli_namespace().
790
791 /// See set_on_err_func().
792 flow::async::Task_asio_err m_on_err_func;
793
794 /// See on_passive_open_channel_func_or_empty().
796
797 /**
798 * Starts falsy; becomes forever truthy (with a specific #Error_code that will not change thereafter)
799 * once hose() is called (with that truthy value). Note hose() may not be called before PEER state, which implies
800 * #m_on_err_func is non-empty.
801 *
802 * ### Concurrency ###
803 * See hose() and hosed() doc headers. TL;DR: It is up to the caller to only call those, basically, from
804 * thread W only.
805 */
807}; // class Session_base
808
809/// Internally used macro; public API users should disregard (same deal as in struc/channel.hpp).
810#define TEMPLATE_SESSION_BASE \
811 template<schema::MqType S_MQ_TYPE_OR_NONE, bool S_TRANSMIT_NATIVE_HANDLES, typename Mdt_payload>
812/// Internally used macro; public API users should disregard (same deal as in struc/channel.hpp).
813#define CLASS_SESSION_BASE \
814 Session_base<S_MQ_TYPE_OR_NONE, S_TRANSMIT_NATIVE_HANDLES, Mdt_payload>
815
816// Template implementations.
817
819CLASS_SESSION_BASE::Session_base(const Client_app& cli_app_ref, const Server_app& srv_app_ref,
820 flow::async::Task_asio_err&& on_err_func,
821 On_passive_open_channel_func&& on_passive_open_channel_func_or_empty_arg) :
822 /* This is, basically, the (protected) ctor for Client_session, followed by Client_session_impl::async_connect().
823 * By the time we're constructed, by definition Client_app and, therefore-ish, m_on_*_func are known.
824 * However: m_srv_namespace is looked-up (in file system, as of this writing) at the forthcoming async_connect();
825 * empty for now; set_srv_namespace() invoked at that time. m_cli_namespace is returned by server in
826 * log-in response; set_cli_namespace() invoked at that time. */
827
828 m_srv_app_ref(srv_app_ref), // Not copied!
829 m_cli_app_ptr(&cli_app_ref), // Ditto!
830 m_on_err_func(std::move(on_err_func)),
831 m_on_passive_open_channel_func_or_empty(std::move(on_passive_open_channel_func_or_empty_arg))
832 // m_srv_namespace and m_cli_namespace remain .empty() for now.
833{
834 // Yep.
835}
836
838CLASS_SESSION_BASE::Session_base(const Server_app& srv_app_ref) :
839 /* This is, basically, the (protected) ctor for Server_session, followed by
840 * Server_session_impl::async_accept_log_in(), itself `protected` as of this writing. Various items are not
841 * known until that async-succeeds; Client_app is not known (until it is identified in the log-in exchange), and,
842 * therefore-ish, m_on_*_func are also unknown.
843 * Server_app is still known from the very start. Moreover, due to the way m_srv_namespace is determined (our own
844 * PID), we can confidently initialize that from the start (ourselves being the server).
845 * m_cli_namespace is generated on server side during log-in proceedings; set_cli_namespace() invoked at that time. */
846
847 m_srv_app_ref(srv_app_ref), // Not copied!
848 m_cli_app_ptr(0), // null for now.
849 // As promised, we know our own srv-namespace:
850 m_srv_namespace(Shared_name::ct(std::to_string(util::Process_credentials::own_process_id())))
851 // m_cli_namespace + m_on_err_func and m_on_passive_open_channel_func_or_empty remain .empty() for now.
852{
853 // Yep.
854}
855
857void CLASS_SESSION_BASE::set_cli_app_ptr(const Client_app* cli_app_ptr_new)
858{
859 assert(cli_app_ptr_new && "As of this writing cli_app_ptr should be set at most once, from null to non-null.");
860
861#ifndef NDEBUG
862 const auto prev = m_cli_app_ptr.exchange(cli_app_ptr_new);
863 assert((!prev) && "As of this writing cli_app_ptr should be set at most once, from null to non-null.");
864#else
865 m_cli_app_ptr = cli_app_ptr_new;
866#endif
867}
868
870void CLASS_SESSION_BASE::set_srv_namespace(Shared_name&& srv_namespace_new)
871{
872 assert(!srv_namespace_new.empty());
873 assert((srv_namespace_new != Shared_name::S_SENTINEL) && "That is a reserved sentinel value.");
874 assert(m_srv_namespace.empty() && "As of this writing srv_namespace should be set at most once, from empty.");
875
876 m_srv_namespace = std::move(srv_namespace_new);
877}
878
880void CLASS_SESSION_BASE::set_cli_namespace(Shared_name&& cli_namespace_new)
881{
882 assert(!cli_namespace_new.empty());
883 assert((cli_namespace_new != Shared_name::S_SENTINEL) && "That is a reserved sentinel value.");
884 assert(m_cli_namespace.empty() && "As of this writing cli_namespace should be set at most once, from empty.");
885
886 m_cli_namespace = std::move(cli_namespace_new);
887}
888
890void CLASS_SESSION_BASE::set_on_passive_open_channel_func(On_passive_open_channel_func&& on_passive_open_channel_func)
891{
892 assert(m_on_passive_open_channel_func_or_empty.empty());
893 m_on_passive_open_channel_func_or_empty = std::move(on_passive_open_channel_func);
894}
895
897void CLASS_SESSION_BASE::set_on_err_func(flow::async::Task_asio_err&& on_err_func_arg)
898{
899 assert(!on_err_func_arg.empty());
900 assert(m_on_err_func.empty() && "Call set_on_err_func() once at most and only if not already set through ctor.");
901
902 m_on_err_func = std::move(on_err_func_arg);
903}
904
906const Client_app* CLASS_SESSION_BASE::cli_app_ptr() const
907{
908 return m_cli_app_ptr.load(); // Fetch atomic<> payload and return it. (Could write `return m_cli_app_ptr;` too.)
909}
910
912const Shared_name& CLASS_SESSION_BASE::srv_namespace() const
913{
914 return m_srv_namespace;
915}
916
918const Shared_name& CLASS_SESSION_BASE::cli_namespace() const
919{
920 return m_cli_namespace;
921}
922
924const typename CLASS_SESSION_BASE::On_passive_open_channel_func&
925 CLASS_SESSION_BASE::on_passive_open_channel_func_or_empty() const
926{
927 return m_on_passive_open_channel_func_or_empty;
928}
929
931bool CLASS_SESSION_BASE::on_err_func_set() const
932{
933 return !m_on_err_func.empty();
934}
935
937void CLASS_SESSION_BASE::hose(const Error_code& err_code)
938{
939 assert((!hosed()) && "By contract do not call unless hosed() is false.");
940
941 m_peer_state_err_code_or_ok = err_code;
942 m_on_err_func(err_code);
943} // Session_base::hose()
944
946bool CLASS_SESSION_BASE::hosed() const
947{
948 assert(on_err_func_set() && "By contract do not call until PEER state -- when on-error handler must be known.");
949 return bool(m_peer_state_err_code_or_ok);
950}
951
953Shared_name CLASS_SESSION_BASE::cur_ns_store_mutex_absolute_name() const
954{
955 using std::to_string;
956
957 // Global scope. Pertains to Server_app::m_name, *all* processes thereof, *all* Client_apps, *all* sessions.
959 Shared_name::ct(m_srv_app_ref.m_name),
961 /* That was the standard stuff. Now the us-specific stuff:
962 *
963 * Above we specified it's a mutex; now for what's this mutex? It's for the CNS (Current Namespace Store),
964 * a/k/a the PID file. */
965 mutex_name /= "cur_ns_store";
966
967 /* Further differentiate the names by owner/permissions that will be used when creating the file.
968 * This is in response to seeing, during testing, the situation where (in that particular case)
969 * m_permissions_level_for_client_apps changed from an accidental S_NO_ACCESS to S_(something sane).
970 * The file was created with 0000 permissions, then later no one could open it (or create it, since it existed)
971 * due to it being untouchable, essentially forever without manual intervention (such as `rm`ing it).
972 * The following, which should be otherwise harmless, should at least help avoid such conflicts.
973 * (Is it harmless? Almost, yes: more files can potentially be created and sit around until reboot;
974 * but they are tiny.) */
975 mutex_name /= 'u';
976 mutex_name += to_string(m_srv_app_ref.m_user_id);
977 mutex_name += 'g';
978 mutex_name += to_string(m_srv_app_ref.m_group_id);
979 mutex_name += 'p';
980 mutex_name += to_string(int(m_srv_app_ref.m_permissions_level_for_client_apps));
981
982 return mutex_name;
983} // Session_base::cur_ns_store_mutex_absolute_name()
984
986fs::path CLASS_SESSION_BASE::cur_ns_store_absolute_path() const
987{
988 using Path = fs::path;
989
990 Path path(m_srv_app_ref.m_kernel_persistent_run_dir_override.empty()
992 : m_srv_app_ref.m_kernel_persistent_run_dir_override);
993 path /= m_srv_app_ref.m_name;
994 path += ".libipc-cns.pid";
995 return path;
996}
997
999Shared_name CLASS_SESSION_BASE::session_master_socket_stream_acceptor_absolute_name() const
1000{
1001 assert((!m_srv_namespace.empty()) && "Serv-namespace must have been set by set_srv_namespace() by now.");
1002
1003 // Global scope. Pertains to Server_app::m_name, the particular process, *all* Client_apps, *all* sessions.
1005 Shared_name::ct(m_srv_app_ref.m_name),
1006 m_srv_namespace);
1007 /* That was the standard stuff. Now the us-specific stuff:
1008 *
1009 * Above we specified it's a socket stream acceptor; but which one/for what? It's the required one for
1010 * any further IPC; hence S_SENTINEL, `0`. (As of this writing, in ipc::session anyway, we don't have any other use
1011 * for Native_socket_stream_acceptor (other socket streams are opened through the ones birthed through this one).
1012 * If we did, we could use `1`, `2`, etc., or something. Anyway Shared_name::S_SENTINEL here is a solid choice.) */
1013 acc_name /= Shared_name::S_SENTINEL;
1014
1015 return acc_name;
1016} // Session_base::session_master_socket_stream_acceptor_absolute_name()
1017
1019typename CLASS_SESSION_BASE::Structured_msg_builder_config
1020 CLASS_SESSION_BASE::heap_fixed_builder_config(flow::log::Logger* logger_ptr) // Static.
1021{
1024
1025 /* This is arguably easiest when one has an actual Channel_obj; see struc::Channel::heap_fixed_builder_config()
1026 * which works with that. Then one can query the Channel_obj for its send-blob/meta-blob-max-size(s), pick the
1027 * smaller, and return that. We are static -- the whole point is to still work when one doesn't have an actual
1028 * channel but might still want to construct a transport::struc::Msg_out -- in fact they might not even have a Session
1029 * in our case. Not a big deal: we use similar logic but have to use some specific knowledge about how
1030 * we use our own class template params (S_MQ_TYPE_OR_NONE, S_TRANSMIT_NATIVE_HANDLES) to configure any channels
1031 * we open; this tells us how big the messages the various pipes (we ourselves set up) can send-out.
1032 *
1033 * So follow along that other method's (rather simple) procedure and apply our "insider" knowledge. Namely:
1034 * We will *specifically* either use an MQ (of one of 2 specific types), or a socket stream, or both.
1035 * - If socket stream *only*: Then Native_socket_stream advertises its blob/meta-blob max-size in a
1036 * non-concept-implementing public constant. Use that; simple.
1037 * - If MQ *only*: Not quite as simple; the message max size is configurable at run time, and we don't have
1038 * the actual MQ object (by definition). Not to worry:
1039 * - Suppose we're really Server_session_impl. Yay: we are the ones, in Server_session_impl::make_channel_mqs(),
1040 * who determine the MQ size, for any channel that is opened. It's a constant... so use that constant!
1041 * - Suppose we're really Client_session_impl. That's slightly annoying, as the opposing Server_session_impl is
1042 * technically determining the MQ size (prev bullet). As of this writing, though, it's always been a constant
1043 * which has never changed. So we can just use that same constant. In the future, *technically*, the
1044 * procedure could change -- but we punt on dealing with until such time that's an actual worry.
1045 * The constant's doc header has notes about it, so it won't be overlooked. (Spoiler alert: It's unlikely
1046 * there will be a pressing need to mess with this; SHM-based transmission obviates the need for such
1047 * perf tweaking.)
1048 * - If both: Use the lower of the two (as as in struc::Channel::heap_fixed_builder_config()). */
1049
1050 size_t sz;
1051 if constexpr(S_MQS_ENABLED && (!S_SOCKET_STREAM_ENABLED))
1052 {
1053 sz = S_MQS_MAX_MSG_SZ;
1054 }
1055 else if constexpr((!S_MQS_ENABLED) && S_SOCKET_STREAM_ENABLED)
1056 {
1057 sz = Native_socket_stream::S_MAX_META_BLOB_LENGTH;
1058 }
1059 else
1060 {
1061 static_assert(S_MQS_ENABLED && S_SOCKET_STREAM_ENABLED, "There must be *some* transport mechanism.");
1062
1063 sz = std::min(S_MQS_MAX_MSG_SZ, Native_socket_stream::S_MAX_META_BLOB_LENGTH);
1064 }
1065
1066 return Heap_fixed_builder::Config{ logger_ptr, sz, 0, 0 };
1067} // Session_base::heap_fixed_builder_config()
1068
1070CLASS_SESSION_BASE::Graceful_finisher::Graceful_finisher(flow::log::Logger* logger_ptr, Session_base* this_session,
1071 flow::async::Single_thread_task_loop* async_worker,
1072 Master_structured_channel* master_channel) :
1073 flow::log::Log_context(logger_ptr, Log_component::S_SESSION),
1074 m_this_session(this_session),
1075 m_async_worker(async_worker),
1076 m_master_channel(master_channel)
1077{
1078 // We are in thread W.
1079
1080 m_master_channel->expect_msg(Master_structured_channel::Msg_which_in::GRACEFUL_SESSION_END,
1081 [this](auto&&)
1082 // (The message doesn't matter and contains no fields; only that we received it matters.)
1083 {
1084 // We are in thread Wc (unspecified, really struc::Channel async callback thread).
1085 m_async_worker->post([this]()
1086 {
1087 // We are in thread W. (We need to be to safely trigger hosed() and hose().)
1088
1089 FLOW_LOG_INFO("Received GracefulSessionEnd from opposing Session object along session master channel "
1090 "[" << *m_master_channel << "]. Will emit to local user as graceful-end "
1091 "error; and mark it down. If our Session dtor is running, it shall return soon. If it has "
1092 "not yet run, it shall return immediately upon starting to execute.");
1093
1094 m_opposing_session_done.set_value();
1095 /* Can m_opposing_session_done already be set? (That would throw promise exception, and we don't catch it
1096 * above.) Answer: no, in the current protocol at least: Only once can it be sent (hence .expect_msg(), not
1097 * .expect_msgs()); and if *m_master_channel were to be hosed, then that would occur either after
1098 * expect_msg() firing handler, or it would occur instead of it. (We *do* handle the "after" case in
1099 * on_master_channel_hosed(), where we do indeed catch the exception.) */
1100
1101 if (!m_this_session->hosed())
1102 {
1104 }
1105 }); // m_async_worker->post()
1106 }); // m_master_channel->expect_msg(GRACEFUL_SESSION_END)
1107} // Session_base::Graceful_finisher::Graceful_finisher()
1108
1110void CLASS_SESSION_BASE::Graceful_finisher::on_master_channel_hosed()
1111{
1112 using boost::promise_already_satisfied;
1113
1114 // We are in thread W.
1115 try
1116 {
1117 m_opposing_session_done.set_value();
1118 }
1119 catch (const promise_already_satisfied&)
1120 {
1121 // Interesting. @todo Maybe log?
1122 }
1123} // } // Session_base::Graceful_finisher::on_master_channel_hosed()
1124
1126void CLASS_SESSION_BASE::Graceful_finisher::on_dtor_start()
1127{
1128 using flow::async::Synchronicity;
1129
1130 // We are in thread U. In fact in we're in a Client/Server_session_impl dtor... but at its very start.
1131
1132 /* Thread W must be fine and running; per algorithm (see Graceful_finisher class doc header).
1133 * It's only safe to access m_master_channel from there. */
1134 m_async_worker->post([&]()
1135 {
1136 FLOW_LOG_INFO("In Session object dtor sending GracefulSessionEnd to opposing Session object along "
1137 "session master channel [" << *m_master_channel << "] -- if at all possible.");
1138
1139 auto msg = m_master_channel->create_msg();
1140 msg.body_root()->initGracefulSessionEnd();
1141
1142 Error_code err_code_ignored;
1143 m_master_channel->send(msg, nullptr, &err_code_ignored);
1144 /* Whatever happened, it logged. We make a best effort; it channel is hosed or send fails, then the other
1145 * side shall detect it via its on_master_channel_hosed() presumably and set its m_opposing_session_done, hence
1146 * its on_dtor_start() will proceed. */
1147 }, Synchronicity::S_ASYNC_AND_AWAIT_CONCURRENT_COMPLETION); // m_async_worker->post()
1148
1149 // That would've been non-blocking. So now lastly:
1150
1151 FLOW_LOG_INFO("In Session object dtor we not await sign that the opposing Session object dtor has also started; "
1152 "only then will we proceed with destroying our Session object: e.g., maybe they hold handles "
1153 "to objects in a SHM-arena we would deinitialize. If their dtor has already been called, we will "
1154 "proceed immediately. If not, we will now wait for that. This would only block if the opposing "
1155 "side's user code neglects to destroy Session object on error; or has some kind of blocking "
1156 "operation in progress before destroying Session object. "
1157 "Session master channel: [" << *m_master_channel << "].");
1158 m_opposing_session_done.get_future().wait();
1159 FLOW_LOG_INFO("In Session object dtor: Done awaiting sign that the opposing Session object dtor has also started. "
1160 "Will now proceed with our Session object destruction.");
1161} // Session_base::Graceful_finisher::on_dtor_start()
1162
1164typename CLASS_SESSION_BASE::Structured_msg_reader_config
1165 CLASS_SESSION_BASE::heap_reader_config(flow::log::Logger* logger_ptr) // Static.
1166{
1167 return transport::struc::Heap_reader::Config{ logger_ptr, 0 };
1168} // Session_base::heap_reader_config()
1169
1170#undef CLASS_SESSION_BASE
1171#undef TEMPLATE_SESSION_BASE
1172
1173} // namespace ipc::session
Optional to use by subclasses, this operates a simple state machine that carries out a graceful-sessi...
Graceful_finisher(flow::log::Logger *logger_ptr, Session_base *this_session, flow::async::Single_thread_task_loop *async_worker, Master_structured_channel *master_channel)
You must invoke this ctor (instantiate us) if and only if synchronized dtor execution is indeed requi...
flow::async::Single_thread_task_loop * m_async_worker
The containing Session thread W loop. It shall exist until *this is gone.
Master_structured_channel * m_master_channel
The containing Session master channel. It shall exist until *this is gone.
boost::promise< void > m_opposing_session_done
A promise whose fulfillment is a necessary and sufficient condition for on_dtor_start() returning (le...
Session_base *const m_this_session
The containing Session_base. It shall exist until *this is gone.
void on_dtor_start()
The reason Graceful_finisher exists, this method must be called at the start of *_session_impl dtor; ...
void on_master_channel_hosed()
Must be invoked if the *_session_impl detects that the master channel has emitted channel-hosing erro...
Internal type containing data and types common to internal types Server_session_impl and Client_sessi...
static Structured_msg_builder_config heap_fixed_builder_config(flow::log::Logger *logger_ptr)
See Session_mv::heap_fixed_builder_config() (1-arg).
Shared_name cur_ns_store_mutex_absolute_name() const
Computes the name of the interprocess named-mutex used to control reading/writing to the file storing...
Error_code m_peer_state_err_code_or_ok
Starts falsy; becomes forever truthy (with a specific Error_code that will not change thereafter) onc...
static constexpr bool S_SOCKET_STREAM_ENABLED
See Session_mv.
const On_passive_open_channel_func & on_passive_open_channel_func_or_empty() const
The on-passive-open handler (may be empty even in final state, meaning user wants passive-opens disab...
static constexpr bool S_MQS_ENABLED
See Session_mv.
void set_srv_namespace(Shared_name &&srv_namespace_new)
Sets srv_namespace() (do not call if already set).
Shared_name m_cli_namespace
See cli_namespace().
Mdt_payload Mdt_payload_obj
See Session_mv (or Session concept).
bool hosed() const
Returns true if and only if hose() has been called.
void set_on_passive_open_channel_func(On_passive_open_channel_func &&on_passive_open_channel_func)
Sets on_passive_open_channel_func_or_empty() (do not call if already set; do not call if user intends...
Function< void(Channel_obj &&new_channel, Mdt_reader_ptr &&new_channel_mdt)> On_passive_open_channel_func
Concrete function type for the on-passive-open handler (if any), used for storage.
Shared_name session_master_socket_stream_acceptor_absolute_name() const
Computes the absolute name at which the server shall set up a transport::Native_socket_stream_accepto...
const Client_app * cli_app_ptr() const
See Server_session_impl, Client_session_impl.
static Structured_msg_reader_config heap_reader_config(flow::log::Logger *logger_ptr)
See Session_mv::heap_reader_config() (1-arg).
boost::shared_ptr< Mdt_builder > Mdt_builder_ptr
See Session_mv (or Session concept).
const Shared_name & srv_namespace() const
See Server_session_impl, Client_session_impl.
void set_cli_app_ptr(const Client_app *cli_app_ptr_new)
Sets cli_app_ptr() (do not call if already set).
Shared_name m_srv_namespace
See srv_namespace().
std::vector< Channel_obj > Channels
See Session_mv. Note: If changed from vector please update those doc headers too.
std::conditional_t< S_MQS_ENABLED, std::conditional_t< S_TRANSMIT_NATIVE_HANDLES, transport::Mqs_socket_stream_channel< true, Persistent_mq_handle_from_cfg >, transport::Mqs_channel< true, Persistent_mq_handle_from_cfg > >, std::conditional_t< S_TRANSMIT_NATIVE_HANDLES, transport::Socket_stream_channel< true >, transport::Socket_stream_channel_of_blobs< true > > > Channel_obj
See Session_mv (or Session concept).
flow::async::Task_asio_err m_on_err_func
See set_on_err_func().
typename transport::struc::schema::Metadata< Mdt_payload_obj >::Builder Mdt_builder
See Session_mv (or Session concept).
void hose(const Error_code &err_code)
Marks this session as hosed for (truthy) reason err_code; and synchronously invokes on-error handler;...
std::conditional_t<!S_MQS_ENABLED, transport::Null_peer, std::conditional_t< S_MQ_TYPE_OR_NONE==schema::MqType::POSIX, transport::Posix_mq_handle, transport::Bipc_mq_handle > > Persistent_mq_handle_from_cfg
Relevant only if S_MQS_ENABLED, this is the Persistent_mq_handle-concept impl type specified by the u...
void set_cli_namespace(Shared_name &&cli_namespace_new)
Sets cli_namespace() (do not call if already set).
std::atomic< const Client_app * > m_cli_app_ptr
See cli_app_ptr().
Session_base(const Client_app &cli_app_ref, const Server_app &srv_app_ref, flow::async::Task_asio_err &&on_err_func, On_passive_open_channel_func &&on_passive_open_channel_func_or_empty_arg)
Constructs: Client_session_impl form (the user is the one constructing the object,...
const Shared_name & cli_namespace() const
See Server_session_impl, Client_session_impl.
bool on_err_func_set() const
Returns true if and only if set_on_err_func() has been called.
boost::shared_ptr< typename transport::struc::schema::Metadata< Mdt_payload_obj >::Reader > Mdt_reader_ptr
See Session_mv (or Session concept).
On_passive_open_channel_func m_on_passive_open_channel_func_or_empty
See on_passive_open_channel_func_or_empty().
boost::weak_ptr< Master_structured_channel > Master_structured_channel_observer
Observer of Master_structured_channel_ptr. See its doc header.
static constexpr util::Fine_duration S_OPEN_CHANNEL_TIMEOUT
Internal timeout for open_channel().
void set_on_err_func(flow::async::Task_asio_err &&on_err_func_arg)
Sets on_err_func() (do not call if already set).
fs::path cur_ns_store_absolute_path() const
Computes the absolute path to file storing (written by server, read by client) the value for srv_name...
static constexpr size_t S_MQS_MAX_MSG_SZ
The max sendable MQ message size as decided by Server_session_impl::make_channel_mqs() (and imposed o...
boost::shared_ptr< Master_structured_channel > Master_structured_channel_ptr
Handle to Master_structured_channel.
const Server_app & m_srv_app_ref
Reference to Server_app (referring to local process in Server_session_impl, opposing process in Clien...
Implements the Persistent_mq_handle concept by thinly wrapping bipc::message_queue,...
A Channel with at least a blobs pipe consisting of two MQs of type Persistent_mq_handle (template arg...
Definition: channel.hpp:1180
A Channel with a blobs pipe consisting of 2 MQs of type Persistent_mq_handle (template arg); and a ha...
Definition: channel.hpp:1278
static const Shared_name & S_RESOURCE_TYPE_ID
Shared_name relative-folder fragment (no separators) identifying this resource type.
Dummy type for use as a template param to Channel when either the blobs pipe or handles pipe is disab...
Definition: channel.hpp:1000
Implements the Persistent_mq_handle concept by wrapping the POSIX message queue API (see man mq_overv...
A Channel with a blobs pipe only (no handles pipe) that uses a Unix domain socket connection as the u...
Definition: channel.hpp:1097
A Channel with a handles pipe only (no blobs pipe) that uses a Unix domain socket connection as the u...
Definition: channel.hpp:1029
Owning and wrapping a pre-connected transport::Channel peer (an endpoint of an established channel ov...
Definition: channel.hpp:589
bool expect_msg(Msg_which_in which, On_msg_handler &&on_msg_func)
Registers the expectation of up to 1 notification in-message whose Msg_which equals which.
Definition: channel.hpp:2041
Implements Struct_builder concept by straightforwardly allocating fixed-size segments on-demand in th...
Implements both sync_io::Native_handle_sender and sync_io::Native_handle_receiver concepts by using a...
String-wrapping abstraction representing a name uniquely distinguishing a kernel-persistent entity fr...
static const Shared_name S_SENTINEL
A Shared_name fragment, with no S_SEPARATOR characters inside, that represents a path component that ...
static Shared_name ct(const Source &src)
Copy-constructs from a char-sequence container (including string, util::String_view,...
static const Shared_name S_RESOURCE_TYPE_ID_MUTEX
Relative-folder fragment (no separators) identifying the resource type for: boost....
@ S_SESSION_FINISHED
The opposing end of the session in question has been closed gracefully by the user invoking the end-s...
Flow-IPC module providing the broad lifecycle and shared-resource organization – via the session conc...
Definition: app.cpp:27
Shared_name build_conventional_shared_name(const Shared_name &resource_type, const Shared_name &srv_app_name, const Shared_name &srv_namespace, const Shared_name &cli_app_name, const Shared_name &cli_namespace_or_sentinel)
Builds an absolute name according to the path convention explained in Shared_name class doc header; t...
Builder< ipc::shm::classic::Pool_arena > Builder
Convenience alias: transport::struc::shm::Builder that works with boost.ipc.shm pools from ipc::shm::...
Definition: classic_fwd.hpp:36
Reader< ipc::shm::classic::Pool_arena > Reader
Convenience alias: transport::struc::shm::Reader that works with boost.ipc.shm pools from ipc::shm::c...
Definition: classic_fwd.hpp:39
const fs::path IPC_KERNEL_PERSISTENT_RUN_DIR
Absolute path to the directory (without trailing separator) in the file system where kernel-persisten...
Definition: util.cpp:63
flow::Fine_duration Fine_duration
Short-hand for Flow's Fine_duration.
Definition: util_fwd.hpp:111
Log_component
The flow::log::Component payload enumeration containing various log components used by Flow-IPC inter...
Definition: common.hpp:322
flow::Error_code Error_code
Short-hand for flow::Error_code which is very common.
Definition: common.hpp:297
flow::Function< Signature > Function
Short-hand for polymorphic functor holder which is very common. This is essentially std::function.
Definition: common.hpp:301
#define TEMPLATE_SESSION_BASE
Internally used macro; public API users should disregard (same deal as in struc/channel....
An App that is used as a client in at least one client-server IPC split.
Definition: app.hpp:185
An App that is used as a server in at least one client-server IPC split.
Definition: app.hpp:206
Implements Struct_builder::Config sub-concept.
Implements Struct_reader::Config sub-concept.