Flow-IPC 1.0.0
Flow-IPC project: Full implementation reference.
persistent_mq_handle.hpp
Go to the documentation of this file.
1/* Flow-IPC: Core
2 * Copyright 2023 Akamai Technologies, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the
5 * "License"); you may not use this file except in
6 * compliance with the License. You may obtain a copy
7 * of the License at
8 *
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in
12 * writing, software distributed under the License is
13 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
14 * CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing
16 * permissions and limitations under the License. */
17
18/// @file
19
20// Not compiled: for documentation only. Contains concept docs as of this writing.
21#ifndef IPC_DOXYGEN_ONLY
22# error "As of this writing this is a documentation-only "header" (the "source" is for humans and Doxygen only)."
23#else // ifdef IPC_DOXYGEN_ONLY
24
25namespace ipc::transport
26{
27
28// Types.
29
30/**
31 * A documentation-only *concept* defining the behavior of an object representing a light-weight handle
32 * to a message queue, capable of receiving/sending discrete messages in non-blocking/blocking/timed-blocking
33 * fashion, as well as some support for polling/waiting and interruptions thereof. The message queue (MQ)
34 * is assumed to be of at least kernel persistence, meaning a handle to an MQ
35 * closing does not delete the MQ, and another handle may reattach to it; in order to delete the underlying MQ
36 * a separate remove op is needed (and is provided as well as a `static` method).
37 *
38 * ### How to use ###
39 * The following are relevant operations to do with MQs. (Read Rationale for background.)
40 * - Create (named) MQ: Persistent_mq_handle() ctor, with either util::Create_only or util::Open_or_create tag arg.
41 * A Shared_name specifies the name.
42 * If one does not need a handle simply destroy the resulting object: the underlying created MQ continues existing.
43 * - Create handle to the (named) MQ: Persistent_mq_handle() ctor, with either util::Open_only or
44 * util::Open_or_create tag arg.
45 * A Shared_name specifies the name.
46 * - Send message into the MQ, via handle: try_send() (non-blocking), send() (blocking), timed_send() (blocking with
47 * timeout).
48 * - Await writability without writing: is_sendable() (instant poll), wait_sendable() (blocking),
49 * timed_wait_sendable() (blocking with timeout).
50 * - Interrupt and/or preempt all above blocking operations including concurrently/preempt is_sendable():
51 * interrupt_sends(). Undo: allow_sends().
52 * - Receive message out of the MQ, via handle: try_receive() (non-blocking), receive() (blocking),
53 * timed_receive() (blocking with timeout).
54 * - Await readability without reading: is_receivable() (instant poll), wait_receivable() (blocking),
55 * timed_wait_receivable() (blocking with timeout).
56 * - Interrupt and/or preempt all above blocking operations including concurrently/preempt is_receivable():
57 * interrupt_receives(). Undo: allow_receives().
58 * - Destroy handle to the MQ: ~Persistent_mq_handle() destructor. The underlying MQ continues existing.
59 * - Destroy (named) MQ: remove_persistent(). Underlying MQ name disappears, but MQ continues existing until
60 * all handles machine-wide are closed. A Shared_name specifies the name.
61 *
62 * ### Thread safety ###
63 * Concurrent ops safe on different objects (including when the same-named queue is being accessed).
64 * Concurrent non-`const` ops (where at least 1 is non-`const`) not safe on the same `*this`.
65 *
66 * The notable exception to the latter: each of `interrupt_*()` and `allow_*()` is safe to call on the same
67 * `*this` concurrently to any method except `interrupt_*()` and `allow_*()` themselves. In fact
68 * `interrupt_*()` ability to interrupt a concurrent timed-waiting of infinitely-waiting read or write is
69 * most of those methods' utility.
70 *
71 * ### Rationale ###
72 * Here's why this concept exists and how it was designed to be the way it is. It's actually quite simple;
73 * the only question is why it is this way and not some other way. Update: It's somewhat less simple now but not bad.
74 *
75 * ipc::transport features the Blob_sender and Blob_receiver concepts, each of which is a peer endpoint of a
76 * connected one-way pipe capable of transmitting discrete messages, each consisting of a binary blob.
77 * They are first-class citizens of ipc::transport, or at least of its core-layer portion, meaning they're likely
78 * to be used by the user directly or at least in highly visible fashion by another aspect of ipc::transport impl.
79 * They have a vaguely socket-like API, with an asynchronous receive method and a convenient non-blocking/synchronous
80 * sender method (that cannot yield would-block). Lastly the destructor of either ensures the underlying transport
81 * cannot be used again by somehow opening another handle to the same thing.
82 *
83 * One implementation of Blob_sender and Blob_receiver simultaneously might be Native_socket_stream which
84 * uses Unix domain stream connection as underlying transport. That one is a natural fit for the concept,
85 * including the fact that closing a Unix domain socket hoses the connection entirely (one cannot somehow "reattach"
86 * to it). It's good. However, benchmark tests (details omitted) show that, when need not also send native handles
87 * over such a connection, *message queue* low-level transports might be more performant or at least competitive.
88 *
89 * In our ecosystem, two message queue (MQ) transports are candidates: the POSIX message queue (provided by Linux)
90 * (see `man mq_overview`) and the boost.interprocess (bipc) message queue (web-search for
91 * `boost::interprocess::message_queue`). We wanted to support both and then provide the choice of which one to use.
92 * So the goal is to write Blob_sender and Blob_receiver concept impls for each type of MQ (and possibly others
93 * over time). So how to do that nicely?
94 *
95 * The 2 low-level MQ APIs (POSIX MQ and bipc MQ) are extremely different, conceptually and in practice, from
96 * stream socket APIs of any kind. For one thing, the MQs have kernel persistence -- they're like files in memory that
97 * disappear at reboot but otherwise stay around, so one can reattach a handle to one even after closing another one;
98 * and deleting it is a separate operation not involving any handle. For another, they don't use socket-descriptors
99 * and hence do not participate in `epoll/poll/select()` mechanisms and feature only blocking, non-blocking, and
100 * blocking-with-timeout send and receive APIs. (POSIX MQ does have `mq_notify()`, but this async-notify mechanism
101 * is quite limited, either creating a new thread for each notification, or using a signal. POSIX MQ in Linux
102 * *does* happen to have a non-portable property wherein the descriptor is really an FD, so it *can* participate
103 * in `epoll/poll/select()` mechanisms; and boost.asio *does* have a `posix::descriptor` that can wrap such a
104 * native descriptor. However: bipc MQ lacks anything like this at all.)
105 *
106 * The 2 APIs are, however, extremely similar to each other. In fact, one gets the impression that the authors
107 * of bipc MQ considered wrapping POSIX MQs but reconsidered for whatever reason (maybe the obscure limit semantics,
108 * lack of availability in all Linux versions and other Unixes and Windows) -- but fairly closely mirrored the API
109 * therein.
110 *
111 * Knowing this, it looked pretty natural to write some Blob_sender/Blob_receiver impls holding
112 * some `bipc::message_queue` handles internally. However, while conceptually similar, the POSIX MQ API is a
113 * C API, not a Boost-style C++ thing. It would be entirely possible to still write Blob_sender/Blob_receiver impls
114 * around this API, but since conceptually it's so similar to bipc MQ, why waste the effort? Code reuse for the win.
115 *
116 * That brings us to this concept. What it is is: A thing I (ygoldfel) came up with by simply taking
117 * `bipc::message_queue` and working backwards, making a concept for an MQ handle, Persistent_mq_handle, to which
118 * `bipc::message_queue` directly conforms already in most key ways (to wit: the send/receive methods).
119 * Then one could write a very thin non-polymorphic HAS-A wrapper class (with no added data stored) and a couple other
120 * utility functions with very thin adaptations to match ipc::transport style (like using Shared_name instead of
121 * strings when creating or removing kernel-persistent queue instances). This class is Bipc_mq_handle; it
122 * implements this Persistent_mq_handle.
123 *
124 * Next, a class can be written to mirror this API but for POSIX MQs. This is a little harder, since it can't just
125 * expose some existing super-class's send/receive functions, but it would just wrap the Linux C API and store
126 * an MQ handle; done. This class is Posix_mq_handle; it also implements Persistent_mq_handle.
127 *
128 * Now that Persistent_mq_handle provides a uniform API, a Blob_sender/Blob_receiver can be written around a
129 * `typename Persistent_mq_handle` which can be either Bipc_mq_handle or Posix_mq_handle (or a future wrapper around
130 * something else).
131 *
132 * ### Poll/wait and interrupt facilities ###
133 * Somewhat later we added a major feature to the concept (and both known impls): For each type of transmission
134 * operation in a given direction (instant poll, blocking, blocking with timeout), there is a poll/wait counterpart
135 * which is almost the same except it does not actually transmit but merely returns the fact transmission
136 * is possible. E.g., try_receive() <=> is_receivable(), receive() <=> wait_receivable(),
137 * timed_receive() <=> timed_wait_receivable(). In fact -- in the absence of a competing `*this` --
138 * receive() = wait_receivable() + try_receive() (succeeds), timed_receive() = timed_wait_receivable() + try_receive()
139 * (succeeds).
140 *
141 * This is useful at least in that it allows one to defer deciding on a target buffer for receiving, until
142 * receiving would (probably) actually work. In the other direction it can still provide useful flexibility in
143 * a multi-threaded setup (one thread blockingly-waits, the other is signaled about readiness and tries to
144 * transmit but without blocking -- useful in a thread U/thread W setup).
145 *
146 * Lastly, any blocking (with or without timeout) can be interrupted via the `interrupt_*()` methods.
147 * For example a receive() or wait_receivable() may be ongoing in thread W, and thread U can interrupt_receives()
148 * to immediately make the blocking op exit with error::Code::S_INTERRUPTED.
149 *
150 * ### Discussion: The road to boost.asio-like async I/O ###
151 * What would be cool, which isn't here, is if this concept described a boost.asio I/O object, capable of plugging
152 * into a boost.asio loop; or ~equivalently something a-la Blob_sender and/or Blob_receiver. However that was not
153 * our aim -- we only want a thin wrapper as noted in "Rationale" above -- and Blob_stream_mq_receiver and
154 * Blob_stream_mq_sender achieve it (and arguably more -- albeit with the limit of a single writer and single reader).
155 * That said: it may be useful to contemplate what parts the concept does have that are conducive to that type of work.
156 *
157 * Without the poll/wait facilities combined with interrupt facilities, it was possible but somewhat clunky,
158 * having to at least start a background thread in which to perform blocking transmit calls; they'd have to
159 * be broken up into subject-to-timeout shorter calls, so that the thread could be stopped and joined during
160 * deinit. Even then full deinit could only occur with a potential pause until the current short call in the
161 * background thread W could return. By having thread W do (indefinite) waits only, and allowing thread U to
162 * do non-blocking transmits only *and* `interrupt_*()`, we achieve pretty good flexibility and responsiveness.
163 * That is what Blob_stream_mq_sender and Blob_stream_mq_receiver can do given the present concept.
164 *
165 * However -- it *can* be more advanced still. Consider the specific impl Posix_mq_handle, which has the
166 * additional-to-concept (edit: see below update) method Posix_mq_handle::native_handle(); in Linux a #Native_handle.
167 * This can be waited-on, natively with `[e]poll*()`; with boost.asio via util::sync_io::Asio_waitable_native_handle
168 * (more or less a `boost::asio::posix::descriptor`). *Now* no background thread W is necessary: thread U
169 * can ask the *kernel* to report readability/writability -- when active it can do non-blocking stuff.
170 *
171 * The wrinkle: Posix_mq_handle has it; but Bipc_mq_handle does not (and there is no FD inside its impl either;
172 * it is fully SHM-based internally). Can it be generalized nevertheless? Yes and no. Yes: in that it can be
173 * simulated by "secretly" having a thread W and having it use a pipe (or something) to translate readable/writable
174 * events into a live FD that could be detected via `[e]poll*()` or boost.asio. No: in that it turns a
175 * hypothetical Persistent_mq_handle impl, namely Bipc_mq_handle, into something complex as opposed to any kind
176 * of thin wrapper around an MQ API. Therefore we did not do it.
177 *
178 * However -- we did ~such a thing with sync_io::Blob_stream_mq_sender and sync_io::Blob_stream_mq_receiver which
179 * are templated on Persistent_mq_handle as a template-parameter; and, being
180 * `sync_io`-pattern-impls (see util::sync_io), they each expose a waitable-on #Native_handle.
181 * Indeed as of this writing each of these templates keeps a "secret" thread W that performs
182 * blocking waits, while the user-accessible API makes it look like a nice, kernel-reported-via-FDs
183 * reactor/proactor-supporting I/O object. By operating on what is directly available via the present concept,
184 * this setup of Blob_stream_mq_sender and Blob_stream_mq_receiver internals is agnostic to the type of MQ.
185 *
186 * However, if one wanted to take advantage of the non-concept (edit: see below update) ability to be watched
187 * (via FD) with the kernel's help and without an added thread, they could specialize `Blob_stream_mq_*er`
188 * for Posix_mq_handle which does offer a kernel-FD accessor `.native_handle()`. Update: This is now done
189 * (in fact it is not specialized as of this writing but rather uses a few simple `if constexpr
190 * ()`s). Accordingly the concept now allows for `native_handle()` optionally: see
191 * Persistent_mq_handle::S_HAS_NATIVE_HANDLE.
192 */
193class Persistent_mq_handle // Note: movable but not copyable.
194{
195public:
196 // Constants.
197
198 /// Shared_name relative-folder fragment (no separators) identifying this resource type.
200
201 /// `true` if and only if native_handle() method exists, and the returned value may be waited-on by `poll()`/etc.
202 static constexpr bool S_HAS_NATIVE_HANDLE = unspecified;
203
204 // Constructors/destructor.
205
206 /**
207 * Construct null handle, suitable only for being subsequently moved-to or destroyed.
208 * If you do anything on `*this`, other than invoking dtor or move-assignment, behavior is undefined.
209 */
211
212 /**
213 * Construct handle to non-existing named MQ, creating it first. If it already exists, it is an error.
214 * If an error is emitted via `*err_code`, and you do anything on `*this` other than invoking dtor or
215 * move-assignment, behavior is undefined.
216 *
217 * @param logger_ptr
218 * Logger to use for subsequently logging.
219 * @param absolute_name
220 * Absolute name at which the persistent MQ lives.
221 * @param mode_tag
222 * API-choosing tag util::CREATE_ONLY.
223 * @param perms
224 * Permissions to use for creation. Suggest the use of util::shared_resource_permissions() to translate
225 * from one of a small handful of levels of access; these apply almost always in practice.
226 * The applied permissions shall *ignore* the process umask and shall thus exactly match `perms`,
227 * unless an error occurs.
228 * @param max_n_msg
229 * Max # of unpopped messages in created queue.
230 * @param max_msg_sz
231 * Max # of bytes in any one message in created queue.
232 * @param err_code
233 * See `flow::Error_code` docs for error reporting semantics. #Error_code generated:
234 * various. Most likely creation failed due to permissions, or it already existed.
235 */
236 explicit Persistent_mq_handle(flow::log::Logger* logger_ptr, const Shared_name& absolute_name,
237 util::Create_only mode_tag, size_t max_n_msg, size_t max_msg_sz,
238 const util::Permissions& perms = util::Permissions(),
239 Error_code* err_code = 0);
240
241 /**
242 * Construct handle to existing named MQ, or else if it does not exist creates it first and opens it (atomically).
243 * If an error is emitted via `*err_code`, and you do anything on `*this` other than invoking dtor or
244 * move-assignment, behavior is undefined.
245 *
246 * @param logger_ptr
247 * Logger to use for subsequently logging.
248 * @param absolute_name
249 * Absolute name at which the persistent MQ lives.
250 * @param mode_tag
251 * API-choosing tag util::OPEN_OR_CREATE.
252 * @param perms_on_create
253 * Permissions to use if creation is required. Suggest the use of util::shared_resource_permissions() to
254 * translate from one of a small handful of levels of access; these apply almost always in practice.
255 * The applied permissions shall *ignore* the process umask and shall thus exactly match `perms_on_create`,
256 * unless an error occurs.
257 * @param max_n_msg_on_create
258 * Max # of unpopped messages in created queue if creation is required.
259 * @param max_msg_sz_on_create
260 * Max # of bytes in any one message in created queue if creation is required.
261 * @param err_code
262 * See `flow::Error_code` docs for error reporting semantics. #Error_code generated:
263 * various. Most likely creation failed due to permissions.
264 */
265 explicit Persistent_mq_handle(flow::log::Logger* logger_ptr, const Shared_name& absolute_name,
266 util::Open_or_create mode_tag, size_t max_n_msg_on_create, size_t max_msg_sz_on_create,
267 const util::Permissions& perms_on_create = util::Permissions(),
268 Error_code* err_code = 0);
269
270 /**
271 * Construct handle to existing named MQ. If it does not exist, it is an error.
272 * If an error is emitted via `*err_code`, and you do anything on `*this` other than invoking dtor or
273 * move-assignment, behavior is undefined.
274 *
275 * @param logger_ptr
276 * Logger to use for subsequently logging.
277 * @param absolute_name
278 * Absolute name at which the persistent MQ lives.
279 * @param mode_tag
280 * API-choosing tag util::OPEN_ONLY.
281 * @param err_code
282 * See `flow::Error_code` docs for error reporting semantics. #Error_code generated:
283 * various. Most likely it already existed.
284 */
285 explicit Persistent_mq_handle(flow::log::Logger* logger_ptr, const Shared_name& absolute_name,
286 util::Open_only mode_tag, Error_code* err_code = 0);
287
288 /**
289 * Constructs handle from the source handle while making the latter invalid.
290 * If you do anything on `src` after this, other than invoking dtor or move-assignment, behavior is undefined.
291 *
292 * Informally: this is a light-weight op.
293 *
294 * @param src
295 * Source object which is nullified.
296 */
298
299 /// Copying of handles is prohibited.
301
302 /**
303 * Destroys this handle (or no-op if no handle was successfully constructed, or if it's a moved-from or default-cted
304 * handle). The underlying MQ (if any) is *not* destroyed and can be attached-to by another handle.
305 */
307
308 // Methods.
309
310 /**
311 * Replaces handle with the source handle while making the latter invalid.
312 * If you do anything on `src` after this, other than invoking dtor or move-assignment, behavior is undefined.
313 *
314 * Informally: this is a light-weight op.
315 *
316 * @param src
317 * Source object which is nullified.
318 * @return `*this`.
319 */
321
322 /// Copying of handles is prohibited.
324
325 /**
326 * Removes the named persistent MQ. The name `name` is removed from the system immediately; and
327 * the function is non-blocking. However the underlying MQ if any continues to exist until all handles to it are
328 * closed; their presence in this or other process is *not* an error.
329 *
330 * @see `util::remove_each_persistent_*`() for a convenient way to remove more than one item. E.g.,
331 * `util::remove_each_persistent_with_name_prefix<Pool_arena>()` combines remove_persistent() and
332 * for_each_persistent() in a common-sense way to remove only those `name`s starting with a given prefix;
333 * or simply all of them.
334 *
335 * Trying to remove a non-existent name *is* an error.
336 *
337 * Logs INFO message.
338 *
339 * @warning The impl should be carefully checked to conform to this. As of this writing the 2 relevant
340 * low-level MQ APIs (bipc and POSIX) do, but there could be more.
341 *
342 * @param logger_ptr
343 * Logger to use for subsequently logging.
344 * @param name
345 * Absolute name at which the persistent MQ lives.
346 * @param err_code
347 * See `flow::Error_code` docs for error reporting semantics. #Error_code generated:
348 * various.
349 */
350 static void remove_persistent(flow::log::Logger* logger_ptr, const Shared_name& name,
351 Error_code* err_code = 0);
352
353 /**
354 * Lists all named persistent MQs currently persisting, invoking the given handler synchronously on each one.
355 *
356 * Note that, in a sanely set-up OS install, all existing pools will be listed by this function;
357 * but permissions/ownership may forbid certain operations the user may typically want to invoke on
358 * a given listed name -- for example remove_persistent(). This function does *not* filter-out any
359 * potentially inaccessible items.
360 *
361 * @tparam Handle_name_func
362 * Function object matching signature `void F(const Shared_name&)`.
363 * @param handle_name_func
364 * `handle_name_func()` shall be invoked for each (matching, if applicable) item. See `Handle_name_func`.
365 */
366 template<typename Handle_name_func>
367 static void for_each_persistent(const Handle_name_func& handle_name_func);
368
369 /**
370 * Non-blocking send: pushes copy of message to queue and returns `true`; if queue is full then no-op and returns
371 * `false`. A null blob (`blob.size() == 0`) is allowed.
372 *
373 * If error is emitted, `*this` shall be considered hosed: Behavior is undefined except dtor or move-assignment.
374 * Exception to this: If `blob.size()` exceeds message size limit (if any), a particular error, which shall be
375 * documented below, is emitted; this is not fatal to `*this`.
376 *
377 * @param blob
378 * Buffer to copy into MQ; if empty then an empty message is pushed.
379 * @param err_code
380 * See `flow::Error_code` docs for error reporting semantics. #Error_code generated:
381 * various. Would-block shall not be emitted.
382 * error::Code::S_MQ_MESSAGE_SIZE_OVER_OR_UNDERFLOW (excessive-size error: buffer exceeds
383 * max_msg_size()).
384 * @return `true` on success; `false` on failure; in the latter case `*err_code` distinguishes
385 * between would-block and fatal error.
386 */
387 bool try_send(const util::Blob_const& blob, Error_code* err_code = 0);
388
389 /**
390 * Blocking send: pushes copy of message to queue; if queue is full blocks until it is not.
391 * A null blob (`blob.size() == 0`) is allowed.
392 *
393 * If error is emitted, `*this` shall be considered hosed: Behavior is undefined except dtor or move-assignment.
394 * Exception to this: If `blob.size()` exceeds message size limit (if any), a particular error, which shall be
395 * documented below, is emitted; this is not fatal to `*this`. Exception to this: interrupt_sends()
396 * leads to the emission of a particular error which shall be documented below; this is not fatal to `*this.
397 *
398 * @param blob
399 * See try_send().
400 * @param err_code
401 * See `flow::Error_code` docs for error reporting semantics. #Error_code generated:
402 * various. Would-block shall not be emitted.
403 * error::Code::S_MQ_MESSAGE_SIZE_OVER_OR_UNDERFLOW (excessive-size error: buffer exceeds
404 * max_msg_size()).
405 * error::Code::S_INTERRUPTED (preempted or interrupted by interrupt_sends()).
406 */
407 void send(const util::Blob_const& blob, Error_code* err_code = 0);
408
409 /**
410 * Blocking timed send: pushes copy of message to queue; if queue is full blocks until it is not, or the
411 * specified time passes, whichever happens first. Returns `true` on success; `false` on timeout or error.
412 * A null blob (`blob.size() == 0`) is allowed.
413 *
414 * If error is emitted, `*this` shall be considered hosed: Behavior is undefined except dtor or move-assignment.
415 * Exception to this: If `blob.size()` exceeds message size limit (if any), a particular error, which shall be
416 * documented below, is emitted; this is not fatal to `*this`. Exception to this: interrupt_sends()
417 * leads to the emission of a particular error which shall be documented below; this is not fatal to `*this.
418 *
419 * @warning The user must not count on precision/stability -- unlike with, say, boost.asio timers -- here.
420 * If timing precision is required, the user will have to add an async layer with more precise timing
421 * and not rely on this. For example, informally, suggest: Use timed_send() with 250msec increments
422 * to model an interruptible indefinitely-blocking send(). In a parallel thread, if it's time to
423 * interrupt the modeled endless send(), set some flag that'll cause the 250msec-long attempts to cease.
424 * The last one (that straddles the interruption point) can just be ignored.
425 *
426 * @param blob
427 * See try_send().
428 * @param timeout_from_now
429 * Now + (this long) = the timeout time point.
430 * @param err_code
431 * See `flow::Error_code` docs for error reporting semantics. #Error_code generated:
432 * various. Would-block shall not be emitted. Timeout shall not be emitted.
433 * error::Code::S_MQ_MESSAGE_SIZE_OVER_OR_UNDERFLOW (excessive-size error: buffer exceeds
434 * max_msg_size()).
435 * error::Code::S_INTERRUPTED (preempted or interrupted by interrupt_sends()).
436 * @return `true` on success; `false` on failure; in the latter case `*err_code` distinguishes
437 * between timeout and fatal error.
438 */
439 bool timed_send(const util::Blob_const& blob, util::Fine_duration timeout_from_now, Error_code* err_code = 0);
440
441 /**
442 * Equivalent to try_send() except stops short of writing anything, with `true` result indicating that
443 * try_send() *would* work at that moment.
444 *
445 * If error is emitted, `*this` shall be considered hosed: Behavior is undefined except dtor or move-assignment.
446 * Exception to this: interrupt_sends() leads to the emission of a particular error which shall be documented
447 * below; this is not fatal to `*this.
448 *
449 * @param err_code
450 * See `flow::Error_code` docs for error reporting semantics. #Error_code generated:
451 * various. Would-block shall not be emitted.
452 * error::Code::S_INTERRUPTED (preempted by interrupt_sends()).
453 * @return `true` if transmissible; `false` if not, or on error.
454 */
455 bool is_sendable(Error_code* err_code = 0);
456
457 /**
458 * Equivalent to send() except stops short of writing anything, with non-error return indicating that
459 * try_send() *would* work at that moment. It shall block indefinitely until fatal error, or
460 * error::Code::S_INTERRUPTED, or transmissibility.
461 *
462 * If error is emitted, `*this` shall be considered hosed: Behavior is undefined except dtor or move-assignment.
463 * Exception to this: interrupt_sends() leads to the emission of a particular error which shall be documented
464 * below; this is not fatal to `*this.
465 *
466 * @param err_code
467 * See `flow::Error_code` docs for error reporting semantics. #Error_code generated:
468 * various. Would-block shall not be emitted.
469 * error::Code::S_INTERRUPTED (preempted or interrupted by interrupt_sends()).
470 */
471 void wait_sendable(Error_code* err_code = 0);
472
473 /**
474 * Equivalent to timed_send() except stops short of writing anything, with `true` result indicating that
475 * try_send() *would* work at that moment. It shall block until fatal error, or
476 * error::Code::S_INTERRUPTED, or timeout, or transmissibility.
477 *
478 * If error is emitted, `*this` shall be considered hosed: Behavior is undefined except dtor or move-assignment.
479 * Exception to this: interrupt_sends() leads to the emission of a particular error which shall be documented
480 * below; this is not fatal to `*this.
481 *
482 * @param timeout_from_now
483 * See timed_send().
484 * @param err_code
485 * See `flow::Error_code` docs for error reporting semantics. #Error_code generated:
486 * various. Would-block shall not be emitted.
487 * error::Code::S_INTERRUPTED (preempted or interrupted by interrupt_sends()).
488 * @return `true` if transmissible; `false` if not, or on error or timeout; in the latter case `*err_code`
489 * distinguishes between timeout and fatal error.
490 */
491 bool timed_wait_sendable(util::Fine_duration timeout_from_now, Error_code* err_code = 0);
492
493 /**
494 * Non-blocking receive: pops copy of message from queue into buffer and returns `true`; if queue is empty then no-op
495 * and returns `false`. On `true` `blob->size()` is modified to store the # of bytes received.
496 * A null blob (`blob->size() == 0` post-condition) is possible.
497 *
498 * If error is emitted, `*this` shall be considered hosed: Behavior is undefined except dtor or move-assignment.
499 * Exception to this: If `blob->size()` is less than the required message size (or upper limit on this, if any),
500 * a particular error, which shall be documented below, is emitted; this is not fatal to `*this`.
501 *
502 * @param blob
503 * Buffer into which to copy into MQ and whose `->size()` to update to the # of bytes received;
504 * if message empty it is set to zero. Original `->size()` value indicates capacity of buffer;
505 * if this is insufficient based on either the popped message size or an upper limit (if any), it is an error.
506 * @param err_code
507 * See `flow::Error_code` docs for error reporting semantics. #Error_code generated:
508 * various. Would-block shall not be emitted.
509 * error::Code::S_MQ_MESSAGE_SIZE_OVER_OR_UNDERFLOW (size-underflow error: buffer is smaller than
510 * max_msg_size()).
511 * @return `true` on success; `false` on failure; in the latter case `*err_code` distinguishes
512 * between would-block and fatal error.
513 */
514 bool try_receive(util::Blob_mutable* blob, Error_code* err_code = 0);
515
516 /**
517 * Blocking receive: pops copy of message from queue into buffer; if queue is empty blocks until it is not.
518 * On `true` `blob->size()` is modified to store the # of bytes received.
519 * A null blob (`blob->size() == 0` post-condition) is possible.
520 *
521 * If error is emitted, `*this` shall be considered hosed: Behavior is undefined except dtor or move-assignment.
522 * Exception to this: If `blob->size()` is less than the required message size (or upper limit on this, if any),
523 * a particular error, which shall be documented below, is emitted; this is not fatal to `*this`.
524 * Exception to this: interrupt_receives()
525 * leads to the emission of a particular error which shall be documented below; this is not fatal to `*this.
526 *
527 * @param blob
528 * See try_receive().
529 * @param err_code
530 * See `flow::Error_code` docs for error reporting semantics. #Error_code generated:
531 * various. Would-block shall not be emitted.
532 * error::Code::S_MQ_MESSAGE_SIZE_OVER_OR_UNDERFLOW (size-underflow error: buffer is smaller than
533 * max_msg_size()).
534 * error::Code::S_INTERRUPTED (preempted or interrupted by interrupt_receives()).
535 */
536 void receive(util::Blob_mutable* blob, Error_code* err_code = 0);
537
538 /**
539 * Blocking timed receive: pops copy of message from queue into buffer; if queue is empty blocks until it is not, or
540 * the specified time passes, whichever happens first. Returns `true` on success; `false` on timeout or error.
541 * On `true` `blob->size()` is modified to store the # of bytes received.
542 * A null blob (`blob->size() == 0` post-condition) is possible.
543 *
544 * If error is emitted, `*this` shall be considered hosed: Behavior is undefined except dtor or move-assignment.
545 * Exception to this: If `blob->size()` is less than the required message size (or upper limit on this, if any),
546 * a particular error, which shall be documented below, is emitted; this is not fatal to `*this`.
547 *
548 * @warning Read the warning on timed_send() about the accuracy of the timeout. Same applies here.
549 *
550 * @param blob
551 * See try_receive().
552 * @param timeout_from_now
553 * Now + (this long) = the timeout time point.
554 * @param err_code
555 * See `flow::Error_code` docs for error reporting semantics. #Error_code generated:
556 * various. Would-block shall not be emitted. Timeout shall not be emitted.
557 * error::Code::S_MQ_MESSAGE_SIZE_OVER_OR_UNDERFLOW (size-underflow error: buffer is smaller than
558 * max_msg_size()).
559 * error::Code::S_INTERRUPTED (preempted or interrupted by interrupt_receives()).
560 * @return `true` on success; `false` on failure; in the latter case `*err_code` distinguishes
561 * between timeout and fatal error.
562 */
563 bool timed_receive(util::Blob_mutable* blob, util::Fine_duration timeout_from_now, Error_code* err_code = 0);
564
565 /**
566 * Equivalent to try_receive() except stops short of reading anything, with `true` result indicating that
567 * try_receive() *would* work at that moment.
568 *
569 * If error is emitted, `*this` shall be considered hosed: Behavior is undefined except dtor or move-assignment.
570 * Exception to this: interrupt_receives() leads to the emission of a particular error which shall be documented
571 * below; this is not fatal to `*this.
572 *
573 * @param err_code
574 * See `flow::Error_code` docs for error reporting semantics. #Error_code generated:
575 * various. Would-block shall not be emitted.
576 * error::Code::S_INTERRUPTED (preempted by interrupt_receives()).
577 * @return `true` if transmissible; `false` if not, or on error.
578 */
579 bool is_receivable(Error_code* err_code = 0);
580
581 /**
582 * Equivalent to receive() except stops short of reading anything, with non-error return indicating that
583 * try_receive() *would* work at that moment. It shall block indefinitely until fatal error, or
584 * error::Code::S_INTERRUPTED, or transmissibility.
585 *
586 * If error is emitted, `*this` shall be considered hosed: Behavior is undefined except dtor or move-assignment.
587 * Exception to this: interrupt_receives() leads to the emission of a particular error which shall be documented
588 * below; this is not fatal to `*this.
589 *
590 * @param err_code
591 * See `flow::Error_code` docs for error reporting semantics. #Error_code generated:
592 * various. Would-block shall not be emitted.
593 * error::Code::S_INTERRUPTED (preempted or interrupted by interrupt_receives()).
594 */
595 void wait_receivable(Error_code* err_code = 0);
596
597 /**
598 * Equivalent to timed_receive() except stops short of reading anything, with `true` result indicating that
599 * try_receive() *would* work at that moment. It shall block until fatal error, or
600 * error::Code::S_INTERRUPTED, or timeout, or transmissibility.
601 *
602 * If error is emitted, `*this` shall be considered hosed: Behavior is undefined except dtor or move-assignment.
603 * Exception to this: interrupt_receives() leads to the emission of a particular error which shall be documented
604 * below; this is not fatal to `*this.
605 *
606 * @param timeout_from_now
607 * See timed_receive().
608 * @param err_code
609 * See `flow::Error_code` docs for error reporting semantics. #Error_code generated:
610 * various. Would-block shall not be emitted.
611 * error::Code::S_INTERRUPTED (preempted or interrupted by interrupt_receives()).
612 * @return `true` if transmissible; `false` if not, or on error or timeout; in the latter case `*err_code`
613 * distinguishes between timeout and fatal error.
614 */
615 bool timed_wait_receivable(util::Fine_duration timeout_from_now, Error_code* err_code = 0);
616
617 /**
618 * Enables sends-interrupted mode: is_sendable() (future calls), send() (future or concurrent calls),
619 * timed_send() (ditto), wait_sendable() (ditto), timed_wait_sendable() (ditto) shall emit
620 * error::Code::S_INTERRUPTED as soon as possible.
621 *
622 * @return `true` on success; `false` is already enabled.
623 */
625
626 /**
627 * Disables mode enabled by interrupt_sends().
628 *
629 * @return `true` on success; `false` if already disabled.
630 */
632
633 /**
634 * Enables receives-interrupted mode: is_receivable() (future calls), receive() (future or concurrent calls),
635 * timed_receive() (ditto), wait_receivable() (ditto), timed_wait_receivable() (ditto) shall emit
636 * error::Code::S_INTERRUPTED as soon as possible.
637 *
638 * @return `true` on success; `false` is already enabled.
639 */
641
642 /**
643 * Disables mode enabled by interrupt_receives().
644 *
645 * @return `true` on success; `false` if already disabled.
646 */
648
649 /**
650 * Returns name equal to `absolute_name` passed to ctor.
651 * @return See above.
652 */
654
655 /**
656 * Returns the max message size of the underlying queue. This should at least approximately equal
657 * what was passed to ctor when creating the MQ; however the underlying transport may tweak it, such as rounding
658 * to page size.
659 *
660 * @return See above.
661 */
662 size_t max_msg_size() const;
663
664 /**
665 * Returns the max message count of the underlying queue. This should at least approximately equal
666 * what was passed to ctor when creating the MQ; however the underlying transport may tweak it if it desires.
667 *
668 * @return See above.
669 */
670 size_t max_n_msgs() const;
671
672 /**
673 * Available if and only if #S_HAS_NATIVE_HANDLE is `true`: Returns the stored native MQ handle; null means a
674 * constructor failed, or `*this` has been moved-from, hence the handle is to not-a-queue.
675 * See class doc header for important background discussion w/r/t the rationale behind this method.
676 *
677 * Medium-length story short: this is (in Linux impl of POSIX MQ) really an FD and as such can
678 * be wrapped via `boost::asio::posix::descriptor` and hence participate
679 * in a `Task_engine` event loop which internally will use `epoll()` together with all the other waited-on
680 * resources; or simply be `poll()`ed (etc.). E.g., boost.asio `async_wait()` for readability;
681 * then invoke `this->try_receive()` once it is readable.
682 *
683 * Behavior is undefined if you operate on this value, such as sending or receiving, concurrently to any
684 * transmission APIs acting on `*this`.
685 *
686 * @note It is possible to get away with not using this by using the `*_sendable()` and `*_receivable()`
687 * methods. However the blocking and timed variations cannot be straightforwardly interrupted.
688 *
689 * @return See above.
690 */
692}; // class Persistent_mq_handle
693
694// Free functions.
695
696/**
697 * Swaps two objects. Constant-time. Suitable for standard ADL-swap pattern `using std::swap; swap(val1, val2);`.
698 *
699 * @relatesalso Persistent_mq_handle
700 *
701 * @param val1
702 * Object.
703 * @param val2
704 * Object.
705 */
707
708} // namespace ipc::transport
A documentation-only concept defining the behavior of an object representing a light-weight handle to...
~Persistent_mq_handle()
Destroys this handle (or no-op if no handle was successfully constructed, or if it's a moved-from or ...
bool interrupt_receives()
Enables receives-interrupted mode: is_receivable() (future calls), receive() (future or concurrent ca...
size_t max_n_msgs() const
Returns the max message count of the underlying queue.
Persistent_mq_handle()
Construct null handle, suitable only for being subsequently moved-to or destroyed.
void wait_sendable(Error_code *err_code=0)
Equivalent to send() except stops short of writing anything, with non-error return indicating that tr...
Native_handle native_handle() const
Available if and only if S_HAS_NATIVE_HANDLE is true: Returns the stored native MQ handle; null means...
static void remove_persistent(flow::log::Logger *logger_ptr, const Shared_name &name, Error_code *err_code=0)
Removes the named persistent MQ.
Persistent_mq_handle & operator=(Persistent_mq_handle &&src)
Replaces handle with the source handle while making the latter invalid.
Persistent_mq_handle(flow::log::Logger *logger_ptr, const Shared_name &absolute_name, util::Create_only mode_tag, size_t max_n_msg, size_t max_msg_sz, const util::Permissions &perms=util::Permissions(), Error_code *err_code=0)
Construct handle to non-existing named MQ, creating it first.
const Shared_name & absolute_name() const
Returns name equal to absolute_name passed to ctor.
Persistent_mq_handle(Persistent_mq_handle &&src)
Constructs handle from the source handle while making the latter invalid.
void swap(Persistent_mq_handle &val1, Persistent_mq_handle &val2)
Swaps two objects.
void wait_receivable(Error_code *err_code=0)
Equivalent to receive() except stops short of reading anything, with non-error return indicating that...
bool try_receive(util::Blob_mutable *blob, Error_code *err_code=0)
Non-blocking receive: pops copy of message from queue into buffer and returns true; if queue is empty...
void send(const util::Blob_const &blob, Error_code *err_code=0)
Blocking send: pushes copy of message to queue; if queue is full blocks until it is not.
bool timed_wait_sendable(util::Fine_duration timeout_from_now, Error_code *err_code=0)
Equivalent to timed_send() except stops short of writing anything, with true result indicating that t...
bool allow_receives()
Disables mode enabled by interrupt_receives().
bool timed_send(const util::Blob_const &blob, util::Fine_duration timeout_from_now, Error_code *err_code=0)
Blocking timed send: pushes copy of message to queue; if queue is full blocks until it is not,...
bool is_sendable(Error_code *err_code=0)
Equivalent to try_send() except stops short of writing anything, with true result indicating that try...
static void for_each_persistent(const Handle_name_func &handle_name_func)
Lists all named persistent MQs currently persisting, invoking the given handler synchronously on each...
static constexpr bool S_HAS_NATIVE_HANDLE
true if and only if native_handle() method exists, and the returned value may be waited-on by poll()/...
bool timed_wait_receivable(util::Fine_duration timeout_from_now, Error_code *err_code=0)
Equivalent to timed_receive() except stops short of reading anything, with true result indicating tha...
bool is_receivable(Error_code *err_code=0)
Equivalent to try_receive() except stops short of reading anything, with true result indicating that ...
static const Shared_name S_RESOURCE_TYPE_ID
Shared_name relative-folder fragment (no separators) identifying this resource type.
bool try_send(const util::Blob_const &blob, Error_code *err_code=0)
Non-blocking send: pushes copy of message to queue and returns true; if queue is full then no-op and ...
void receive(util::Blob_mutable *blob, Error_code *err_code=0)
Blocking receive: pops copy of message from queue into buffer; if queue is empty blocks until it is n...
size_t max_msg_size() const
Returns the max message size of the underlying queue.
Persistent_mq_handle(const Persistent_mq_handle &)=delete
Copying of handles is prohibited.
Persistent_mq_handle & operator=(const Persistent_mq_handle &)=delete
Copying of handles is prohibited.
Persistent_mq_handle(flow::log::Logger *logger_ptr, const Shared_name &absolute_name, util::Open_or_create mode_tag, size_t max_n_msg_on_create, size_t max_msg_sz_on_create, const util::Permissions &perms_on_create=util::Permissions(), Error_code *err_code=0)
Construct handle to existing named MQ, or else if it does not exist creates it first and opens it (at...
bool allow_sends()
Disables mode enabled by interrupt_sends().
bool timed_receive(util::Blob_mutable *blob, util::Fine_duration timeout_from_now, Error_code *err_code=0)
Blocking timed receive: pops copy of message from queue into buffer; if queue is empty blocks until i...
Persistent_mq_handle(flow::log::Logger *logger_ptr, const Shared_name &absolute_name, util::Open_only mode_tag, Error_code *err_code=0)
Construct handle to existing named MQ.
bool interrupt_sends()
Enables sends-interrupted mode: is_sendable() (future calls), send() (future or concurrent calls),...
String-wrapping abstraction representing a name uniquely distinguishing a kernel-persistent entity fr...
Flow-IPC module providing transmission of structured messages and/or low-level blobs (and more) betwe...
bipc::permissions Permissions
Short-hand for Unix (POSIX) permissions class.
Definition: util_fwd.hpp:155
bipc::open_only_t Open_only
Tag type indicating an ideally-atomic open-if-exists-else-fail operation.
Definition: util_fwd.hpp:149
boost::asio::mutable_buffer Blob_mutable
Short-hand for an mutable blob somewhere in memory, stored as exactly a void* and a size_t.
Definition: util_fwd.hpp:134
bipc::open_or_create_t Open_or_create
Tag type indicating an atomic open-if-exists-else-create operation.
Definition: util_fwd.hpp:146
bipc::create_only_t Create_only
Tag type indicating a create-unless-exists-else-fail operation.
Definition: util_fwd.hpp:152
flow::Fine_duration Fine_duration
Short-hand for Flow's Fine_duration.
Definition: util_fwd.hpp:111
boost::asio::const_buffer Blob_const
Short-hand for an immutable blob somewhere in memory, stored as exactly a void const * and a size_t.
Definition: util_fwd.hpp:128
flow::Error_code Error_code
Short-hand for flow::Error_code which is very common.
Definition: common.hpp:297
A monolayer-thin wrapper around a native handle, a/k/a descriptor a/k/a FD.