Flow-IPC 1.0.2
Flow-IPC project: Full implementation reference.
persistent_mq_handle.hpp
Go to the documentation of this file.
1/* Flow-IPC: Core
2 * Copyright 2023 Akamai Technologies, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the
5 * "License"); you may not use this file except in
6 * compliance with the License. You may obtain a copy
7 * of the License at
8 *
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in
12 * writing, software distributed under the License is
13 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
14 * CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing
16 * permissions and limitations under the License. */
17
18/// @file
19
20// Not compiled: for documentation only. Contains concept docs as of this writing.
21#ifndef IPC_DOXYGEN_ONLY
22static_assert(false, "As of this writing this is a documentation-only \"header\" "
23 "(the \"source\" is for humans and Doxygen only).");
24#else // ifdef IPC_DOXYGEN_ONLY
25
26namespace ipc::transport
27{
28
29// Types.
30
31/**
32 * A documentation-only *concept* defining the behavior of an object representing a light-weight handle
33 * to a message queue, capable of receiving/sending discrete messages in non-blocking/blocking/timed-blocking
34 * fashion, as well as some support for polling/waiting and interruptions thereof. The message queue (MQ)
35 * is assumed to be of at least kernel persistence, meaning a handle to an MQ
36 * closing does not delete the MQ, and another handle may reattach to it; in order to delete the underlying MQ
37 * a separate remove op is needed (and is provided as well as a `static` method).
38 *
39 * ### How to use ###
40 * The following are relevant operations to do with MQs. (Read Rationale for background.)
41 * - Create (named) MQ: Persistent_mq_handle() ctor, with either util::Create_only or util::Open_or_create tag arg.
42 * A Shared_name specifies the name.
43 * If one does not need a handle simply destroy the resulting object: the underlying created MQ continues existing.
44 * - Create handle to the (named) MQ: Persistent_mq_handle() ctor, with either util::Open_only or
45 * util::Open_or_create tag arg.
46 * A Shared_name specifies the name.
47 * - Send message into the MQ, via handle: try_send() (non-blocking), send() (blocking), timed_send() (blocking with
48 * timeout).
49 * - Await writability without writing: is_sendable() (instant poll), wait_sendable() (blocking),
50 * timed_wait_sendable() (blocking with timeout).
51 * - Interrupt and/or preempt all above blocking operations including concurrently/preempt is_sendable():
52 * interrupt_sends(). Undo: allow_sends().
53 * - Receive message out of the MQ, via handle: try_receive() (non-blocking), receive() (blocking),
54 * timed_receive() (blocking with timeout).
55 * - Await readability without reading: is_receivable() (instant poll), wait_receivable() (blocking),
56 * timed_wait_receivable() (blocking with timeout).
57 * - Interrupt and/or preempt all above blocking operations including concurrently/preempt is_receivable():
58 * interrupt_receives(). Undo: allow_receives().
59 * - Destroy handle to the MQ: ~Persistent_mq_handle() destructor. The underlying MQ continues existing.
60 * - Destroy (named) MQ: remove_persistent(). Underlying MQ name disappears, but MQ continues existing until
61 * all handles machine-wide are closed. A Shared_name specifies the name.
62 *
63 * ### Thread safety ###
64 * Concurrent ops safe on different objects (including when the same-named queue is being accessed).
65 * Concurrent non-`const` ops (where at least 1 is non-`const`) not safe on the same `*this`.
66 *
67 * The notable exception to the latter: each of `interrupt_*()` and `allow_*()` is safe to call on the same
68 * `*this` concurrently to any method except `interrupt_*()` and `allow_*()` themselves. In fact
69 * `interrupt_*()` ability to interrupt a concurrent timed-waiting of infinitely-waiting read or write is
70 * most of those methods' utility.
71 *
72 * ### Rationale ###
73 * Here's why this concept exists and how it was designed to be the way it is. It's actually quite simple;
74 * the only question is why it is this way and not some other way. Update: It's somewhat less simple now but not bad.
75 *
76 * ipc::transport features the Blob_sender and Blob_receiver concepts, each of which is a peer endpoint of a
77 * connected one-way pipe capable of transmitting discrete messages, each consisting of a binary blob.
78 * They are first-class citizens of ipc::transport, or at least of its core-layer portion, meaning they're likely
79 * to be used by the user directly or at least in highly visible fashion by another aspect of ipc::transport impl.
80 * They have a vaguely socket-like API, with an asynchronous receive method and a convenient non-blocking/synchronous
81 * sender method (that cannot yield would-block). Lastly the destructor of either ensures the underlying transport
82 * cannot be used again by somehow opening another handle to the same thing.
83 *
84 * One implementation of Blob_sender and Blob_receiver simultaneously might be Native_socket_stream which
85 * uses Unix domain stream connection as underlying transport. That one is a natural fit for the concept,
86 * including the fact that closing a Unix domain socket hoses the connection entirely (one cannot somehow "reattach"
87 * to it). It's good. However, benchmark tests (details omitted) show that, when need not also send native handles
88 * over such a connection, *message queue* low-level transports might be more performant or at least competitive.
89 *
90 * In our ecosystem, two message queue (MQ) transports are candidates: the POSIX message queue (provided by Linux)
91 * (see `man mq_overview`) and the boost.interprocess (bipc) message queue (web-search for
92 * `boost::interprocess::message_queue`). We wanted to support both and then provide the choice of which one to use.
93 * So the goal is to write Blob_sender and Blob_receiver concept impls for each type of MQ (and possibly others
94 * over time). So how to do that nicely?
95 *
96 * The 2 low-level MQ APIs (POSIX MQ and bipc MQ) are extremely different, conceptually and in practice, from
97 * stream socket APIs of any kind. For one thing, the MQs have kernel persistence -- they're like files in memory that
98 * disappear at reboot but otherwise stay around, so one can reattach a handle to one even after closing another one;
99 * and deleting it is a separate operation not involving any handle. For another, they don't use socket-descriptors
100 * and hence do not participate in `epoll/poll/select()` mechanisms and feature only blocking, non-blocking, and
101 * blocking-with-timeout send and receive APIs. (POSIX MQ does have `mq_notify()`, but this async-notify mechanism
102 * is quite limited, either creating a new thread for each notification, or using a signal. POSIX MQ in Linux
103 * *does* happen to have a non-portable property wherein the descriptor is really an FD, so it *can* participate
104 * in `epoll/poll/select()` mechanisms; and boost.asio *does* have a `posix::descriptor` that can wrap such a
105 * native descriptor. However: bipc MQ lacks anything like this at all.)
106 *
107 * The 2 APIs are, however, extremely similar to each other. In fact, one gets the impression that the authors
108 * of bipc MQ considered wrapping POSIX MQs but reconsidered for whatever reason (maybe the obscure limit semantics,
109 * lack of availability in all Linux versions and other Unixes and Windows) -- but fairly closely mirrored the API
110 * therein.
111 *
112 * Knowing this, it looked pretty natural to write some Blob_sender/Blob_receiver impls holding
113 * some `bipc::message_queue` handles internally. However, while conceptually similar, the POSIX MQ API is a
114 * C API, not a Boost-style C++ thing. It would be entirely possible to still write Blob_sender/Blob_receiver impls
115 * around this API, but since conceptually it's so similar to bipc MQ, why waste the effort? Code reuse for the win.
116 *
117 * That brings us to this concept. What it is is: A thing I (ygoldfel) came up with by simply taking
118 * `bipc::message_queue` and working backwards, making a concept for an MQ handle, Persistent_mq_handle, to which
119 * `bipc::message_queue` directly conforms already in most key ways (to wit: the send/receive methods).
120 * Then one could write a very thin non-polymorphic HAS-A wrapper class (with no added data stored) and a couple other
121 * utility functions with very thin adaptations to match ipc::transport style (like using Shared_name instead of
122 * strings when creating or removing kernel-persistent queue instances). This class is Bipc_mq_handle; it
123 * implements this Persistent_mq_handle.
124 *
125 * Next, a class can be written to mirror this API but for POSIX MQs. This is a little harder, since it can't just
126 * expose some existing super-class's send/receive functions, but it would just wrap the Linux C API and store
127 * an MQ handle; done. This class is Posix_mq_handle; it also implements Persistent_mq_handle.
128 *
129 * Now that Persistent_mq_handle provides a uniform API, a Blob_sender/Blob_receiver can be written around a
130 * `typename Persistent_mq_handle` which can be either Bipc_mq_handle or Posix_mq_handle (or a future wrapper around
131 * something else).
132 *
133 * ### Poll/wait and interrupt facilities ###
134 * Somewhat later we added a major feature to the concept (and both known impls): For each type of transmission
135 * operation in a given direction (instant poll, blocking, blocking with timeout), there is a poll/wait counterpart
136 * which is almost the same except it does not actually transmit but merely returns the fact transmission
137 * is possible. E.g., try_receive() <=> is_receivable(), receive() <=> wait_receivable(),
138 * timed_receive() <=> timed_wait_receivable(). In fact -- in the absence of a competing `*this` --
139 * receive() = wait_receivable() + try_receive() (succeeds), timed_receive() = timed_wait_receivable() + try_receive()
140 * (succeeds).
141 *
142 * This is useful at least in that it allows one to defer deciding on a target buffer for receiving, until
143 * receiving would (probably) actually work. In the other direction it can still provide useful flexibility in
144 * a multi-threaded setup (one thread blockingly-waits, the other is signaled about readiness and tries to
145 * transmit but without blocking -- useful in a thread U/thread W setup).
146 *
147 * Lastly, any blocking (with or without timeout) can be interrupted via the `interrupt_*()` methods.
148 * For example a receive() or wait_receivable() may be ongoing in thread W, and thread U can interrupt_receives()
149 * to immediately make the blocking op exit with error::Code::S_INTERRUPTED.
150 *
151 * ### Discussion: The road to boost.asio-like async I/O ###
152 * What would be cool, which isn't here, is if this concept described a boost.asio I/O object, capable of plugging
153 * into a boost.asio loop; or ~equivalently something a-la Blob_sender and/or Blob_receiver. However that was not
154 * our aim -- we only want a thin wrapper as noted in "Rationale" above -- and Blob_stream_mq_receiver and
155 * Blob_stream_mq_sender achieve it (and arguably more -- albeit with the limit of a single writer and single reader).
156 * That said: it may be useful to contemplate what parts the concept does have that are conducive to that type of work.
157 *
158 * Without the poll/wait facilities combined with interrupt facilities, it was possible but somewhat clunky,
159 * having to at least start a background thread in which to perform blocking transmit calls; they'd have to
160 * be broken up into subject-to-timeout shorter calls, so that the thread could be stopped and joined during
161 * deinit. Even then full deinit could only occur with a potential pause until the current short call in the
162 * background thread W could return. By having thread W do (indefinite) waits only, and allowing thread U to
163 * do non-blocking transmits only *and* `interrupt_*()`, we achieve pretty good flexibility and responsiveness.
164 * That is what Blob_stream_mq_sender and Blob_stream_mq_receiver can do given the present concept.
165 *
166 * However -- it *can* be more advanced still. Consider the specific impl Posix_mq_handle, which has the
167 * additional-to-concept (edit: see below update) method Posix_mq_handle::native_handle(); in Linux a #Native_handle.
168 * This can be waited-on, natively with `[e]poll*()`; with boost.asio via util::sync_io::Asio_waitable_native_handle
169 * (more or less a `boost::asio::posix::descriptor`). *Now* no background thread W is necessary: thread U
170 * can ask the *kernel* to report readability/writability -- when active it can do non-blocking stuff.
171 *
172 * The wrinkle: Posix_mq_handle has it; but Bipc_mq_handle does not (and there is no FD inside its impl either;
173 * it is fully SHM-based internally). Can it be generalized nevertheless? Yes and no. Yes: in that it can be
174 * simulated by "secretly" having a thread W and having it use a pipe (or something) to translate readable/writable
175 * events into a live FD that could be detected via `[e]poll*()` or boost.asio. No: in that it turns a
176 * hypothetical Persistent_mq_handle impl, namely Bipc_mq_handle, into something complex as opposed to any kind
177 * of thin wrapper around an MQ API. Therefore we did not do it.
178 *
179 * However -- we did ~such a thing with sync_io::Blob_stream_mq_sender and sync_io::Blob_stream_mq_receiver which
180 * are templated on Persistent_mq_handle as a template-parameter; and, being
181 * `sync_io`-pattern-impls (see util::sync_io), they each expose a waitable-on #Native_handle.
182 * Indeed as of this writing each of these templates keeps a "secret" thread W that performs
183 * blocking waits, while the user-accessible API makes it look like a nice, kernel-reported-via-FDs
184 * reactor/proactor-supporting I/O object. By operating on what is directly available via the present concept,
185 * this setup of Blob_stream_mq_sender and Blob_stream_mq_receiver internals is agnostic to the type of MQ.
186 *
187 * However, if one wanted to take advantage of the non-concept (edit: see below update) ability to be watched
188 * (via FD) with the kernel's help and without an added thread, they could specialize `Blob_stream_mq_*er`
189 * for Posix_mq_handle which does offer a kernel-FD accessor `.native_handle()`. Update: This is now done
190 * (in fact it is not specialized as of this writing but rather uses a few simple `if constexpr
191 * ()`s). Accordingly the concept now allows for `native_handle()` optionally: see
192 * Persistent_mq_handle::S_HAS_NATIVE_HANDLE.
193 */
194class Persistent_mq_handle // Note: movable but not copyable.
195{
196public:
197 // Constants.
198
199 /// Shared_name relative-folder fragment (no separators) identifying this resource type.
201
202 /// `true` if and only if native_handle() method exists, and the returned value may be waited-on by `poll()`/etc.
203 static constexpr bool S_HAS_NATIVE_HANDLE = unspecified;
204
205 // Constructors/destructor.
206
207 /**
208 * Construct null handle, suitable only for being subsequently moved-to or destroyed.
209 * If you do anything on `*this`, other than invoking dtor or move-assignment, behavior is undefined.
210 */
212
213 /**
214 * Construct handle to non-existing named MQ, creating it first. If it already exists, it is an error.
215 * If an error is emitted via `*err_code`, and you do anything on `*this` other than invoking dtor or
216 * move-assignment, behavior is undefined.
217 *
218 * @param logger_ptr
219 * Logger to use for subsequently logging.
220 * @param absolute_name
221 * Absolute name at which the persistent MQ lives.
222 * @param mode_tag
223 * API-choosing tag util::CREATE_ONLY.
224 * @param perms
225 * Permissions to use for creation. Suggest the use of util::shared_resource_permissions() to translate
226 * from one of a small handful of levels of access; these apply almost always in practice.
227 * The applied permissions shall *ignore* the process umask and shall thus exactly match `perms`,
228 * unless an error occurs.
229 * @param max_n_msg
230 * Max # of unpopped messages in created queue.
231 * @param max_msg_sz
232 * Max # of bytes in any one message in created queue.
233 * @param err_code
234 * See `flow::Error_code` docs for error reporting semantics. #Error_code generated:
235 * various. Most likely creation failed due to permissions, or it already existed.
236 */
237 explicit Persistent_mq_handle(flow::log::Logger* logger_ptr, const Shared_name& absolute_name,
238 util::Create_only mode_tag, size_t max_n_msg, size_t max_msg_sz,
239 const util::Permissions& perms = util::Permissions(),
240 Error_code* err_code = 0);
241
242 /**
243 * Construct handle to existing named MQ, or else if it does not exist creates it first and opens it (atomically).
244 * If an error is emitted via `*err_code`, and you do anything on `*this` other than invoking dtor or
245 * move-assignment, behavior is undefined.
246 *
247 * @param logger_ptr
248 * Logger to use for subsequently logging.
249 * @param absolute_name
250 * Absolute name at which the persistent MQ lives.
251 * @param mode_tag
252 * API-choosing tag util::OPEN_OR_CREATE.
253 * @param perms_on_create
254 * Permissions to use if creation is required. Suggest the use of util::shared_resource_permissions() to
255 * translate from one of a small handful of levels of access; these apply almost always in practice.
256 * The applied permissions shall *ignore* the process umask and shall thus exactly match `perms_on_create`,
257 * unless an error occurs.
258 * @param max_n_msg_on_create
259 * Max # of unpopped messages in created queue if creation is required.
260 * @param max_msg_sz_on_create
261 * Max # of bytes in any one message in created queue if creation is required.
262 * @param err_code
263 * See `flow::Error_code` docs for error reporting semantics. #Error_code generated:
264 * various. Most likely creation failed due to permissions.
265 */
266 explicit Persistent_mq_handle(flow::log::Logger* logger_ptr, const Shared_name& absolute_name,
267 util::Open_or_create mode_tag, size_t max_n_msg_on_create, size_t max_msg_sz_on_create,
268 const util::Permissions& perms_on_create = util::Permissions(),
269 Error_code* err_code = 0);
270
271 /**
272 * Construct handle to existing named MQ. If it does not exist, it is an error.
273 * If an error is emitted via `*err_code`, and you do anything on `*this` other than invoking dtor or
274 * move-assignment, behavior is undefined.
275 *
276 * @param logger_ptr
277 * Logger to use for subsequently logging.
278 * @param absolute_name
279 * Absolute name at which the persistent MQ lives.
280 * @param mode_tag
281 * API-choosing tag util::OPEN_ONLY.
282 * @param err_code
283 * See `flow::Error_code` docs for error reporting semantics. #Error_code generated:
284 * various. Most likely it already existed.
285 */
286 explicit Persistent_mq_handle(flow::log::Logger* logger_ptr, const Shared_name& absolute_name,
287 util::Open_only mode_tag, Error_code* err_code = 0);
288
289 /**
290 * Constructs handle from the source handle while making the latter invalid.
291 * If you do anything on `src` after this, other than invoking dtor or move-assignment, behavior is undefined.
292 *
293 * Informally: this is a light-weight op.
294 *
295 * @param src
296 * Source object which is nullified.
297 */
299
300 /// Copying of handles is prohibited.
302
303 /**
304 * Destroys this handle (or no-op if no handle was successfully constructed, or if it's a moved-from or default-cted
305 * handle). The underlying MQ (if any) is *not* destroyed and can be attached-to by another handle.
306 */
308
309 // Methods.
310
311 /**
312 * Replaces handle with the source handle while making the latter invalid.
313 * If you do anything on `src` after this, other than invoking dtor or move-assignment, behavior is undefined.
314 *
315 * Informally: this is a light-weight op.
316 *
317 * @param src
318 * Source object which is nullified.
319 * @return `*this`.
320 */
322
323 /// Copying of handles is prohibited.
325
326 /**
327 * Removes the named persistent MQ. The name `name` is removed from the system immediately; and
328 * the function is non-blocking. However the underlying MQ if any continues to exist until all handles to it are
329 * closed; their presence in this or other process is *not* an error.
330 *
331 * @see `util::remove_each_persistent_*`() for a convenient way to remove more than one item. E.g.,
332 * `util::remove_each_persistent_with_name_prefix<Pool_arena>()` combines remove_persistent() and
333 * for_each_persistent() in a common-sense way to remove only those `name`s starting with a given prefix;
334 * or simply all of them.
335 *
336 * Trying to remove a non-existent name *is* an error.
337 *
338 * Logs INFO message.
339 *
340 * @warning The impl should be carefully checked to conform to this. As of this writing the 2 relevant
341 * low-level MQ APIs (bipc and POSIX) do, but there could be more.
342 *
343 * @param logger_ptr
344 * Logger to use for subsequently logging.
345 * @param name
346 * Absolute name at which the persistent MQ lives.
347 * @param err_code
348 * See `flow::Error_code` docs for error reporting semantics. #Error_code generated:
349 * various.
350 */
351 static void remove_persistent(flow::log::Logger* logger_ptr, const Shared_name& name,
352 Error_code* err_code = 0);
353
354 /**
355 * Lists all named persistent MQs currently persisting, invoking the given handler synchronously on each one.
356 *
357 * Note that, in a sanely set-up OS install, all existing pools will be listed by this function;
358 * but permissions/ownership may forbid certain operations the user may typically want to invoke on
359 * a given listed name -- for example remove_persistent(). This function does *not* filter-out any
360 * potentially inaccessible items.
361 *
362 * @tparam Handle_name_func
363 * Function object matching signature `void F(const Shared_name&)`.
364 * @param handle_name_func
365 * `handle_name_func()` shall be invoked for each (matching, if applicable) item. See `Handle_name_func`.
366 */
367 template<typename Handle_name_func>
368 static void for_each_persistent(const Handle_name_func& handle_name_func);
369
370 /**
371 * Non-blocking send: pushes copy of message to queue and returns `true`; if queue is full then no-op and returns
372 * `false`. A null blob (`blob.size() == 0`) is allowed.
373 *
374 * If error is emitted, `*this` shall be considered hosed: Behavior is undefined except dtor or move-assignment.
375 * Exception to this: If `blob.size()` exceeds message size limit (if any), a particular error, which shall be
376 * documented below, is emitted; this is not fatal to `*this`.
377 *
378 * @param blob
379 * Buffer to copy into MQ; if empty then an empty message is pushed.
380 * @param err_code
381 * See `flow::Error_code` docs for error reporting semantics. #Error_code generated:
382 * various. Would-block shall not be emitted.
383 * error::Code::S_MQ_MESSAGE_SIZE_OVER_OR_UNDERFLOW (excessive-size error: buffer exceeds
384 * max_msg_size()).
385 * @return `true` on success; `false` on failure; in the latter case `*err_code` distinguishes
386 * between would-block and fatal error.
387 */
388 bool try_send(const util::Blob_const& blob, Error_code* err_code = 0);
389
390 /**
391 * Blocking send: pushes copy of message to queue; if queue is full blocks until it is not.
392 * A null blob (`blob.size() == 0`) is allowed.
393 *
394 * If error is emitted, `*this` shall be considered hosed: Behavior is undefined except dtor or move-assignment.
395 * Exception to this: If `blob.size()` exceeds message size limit (if any), a particular error, which shall be
396 * documented below, is emitted; this is not fatal to `*this`. Exception to this: interrupt_sends()
397 * leads to the emission of a particular error which shall be documented below; this is not fatal to `*this.
398 *
399 * @param blob
400 * See try_send().
401 * @param err_code
402 * See `flow::Error_code` docs for error reporting semantics. #Error_code generated:
403 * various. Would-block shall not be emitted.
404 * error::Code::S_MQ_MESSAGE_SIZE_OVER_OR_UNDERFLOW (excessive-size error: buffer exceeds
405 * max_msg_size()).
406 * error::Code::S_INTERRUPTED (preempted or interrupted by interrupt_sends()).
407 */
408 void send(const util::Blob_const& blob, Error_code* err_code = 0);
409
410 /**
411 * Blocking timed send: pushes copy of message to queue; if queue is full blocks until it is not, or the
412 * specified time passes, whichever happens first. Returns `true` on success; `false` on timeout or error.
413 * A null blob (`blob.size() == 0`) is allowed.
414 *
415 * If error is emitted, `*this` shall be considered hosed: Behavior is undefined except dtor or move-assignment.
416 * Exception to this: If `blob.size()` exceeds message size limit (if any), a particular error, which shall be
417 * documented below, is emitted; this is not fatal to `*this`. Exception to this: interrupt_sends()
418 * leads to the emission of a particular error which shall be documented below; this is not fatal to `*this.
419 *
420 * @warning The user must not count on precision/stability -- unlike with, say, boost.asio timers -- here.
421 * If timing precision is required, the user will have to add an async layer with more precise timing
422 * and not rely on this. For example, informally, suggest: Use timed_send() with 250msec increments
423 * to model an interruptible indefinitely-blocking send(). In a parallel thread, if it's time to
424 * interrupt the modeled endless send(), set some flag that'll cause the 250msec-long attempts to cease.
425 * The last one (that straddles the interruption point) can just be ignored.
426 *
427 * @param blob
428 * See try_send().
429 * @param timeout_from_now
430 * Now + (this long) = the timeout time point.
431 * @param err_code
432 * See `flow::Error_code` docs for error reporting semantics. #Error_code generated:
433 * various. Would-block shall not be emitted. Timeout shall not be emitted.
434 * error::Code::S_MQ_MESSAGE_SIZE_OVER_OR_UNDERFLOW (excessive-size error: buffer exceeds
435 * max_msg_size()).
436 * error::Code::S_INTERRUPTED (preempted or interrupted by interrupt_sends()).
437 * @return `true` on success; `false` on failure; in the latter case `*err_code` distinguishes
438 * between timeout and fatal error.
439 */
440 bool timed_send(const util::Blob_const& blob, util::Fine_duration timeout_from_now, Error_code* err_code = 0);
441
442 /**
443 * Equivalent to try_send() except stops short of writing anything, with `true` result indicating that
444 * try_send() *would* work at that moment.
445 *
446 * If error is emitted, `*this` shall be considered hosed: Behavior is undefined except dtor or move-assignment.
447 * Exception to this: interrupt_sends() leads to the emission of a particular error which shall be documented
448 * below; this is not fatal to `*this.
449 *
450 * @param err_code
451 * See `flow::Error_code` docs for error reporting semantics. #Error_code generated:
452 * various. Would-block shall not be emitted.
453 * error::Code::S_INTERRUPTED (preempted by interrupt_sends()).
454 * @return `true` if transmissible; `false` if not, or on error.
455 */
456 bool is_sendable(Error_code* err_code = 0);
457
458 /**
459 * Equivalent to send() except stops short of writing anything, with non-error return indicating that
460 * try_send() *would* work at that moment. It shall block indefinitely until fatal error, or
461 * error::Code::S_INTERRUPTED, or transmissibility.
462 *
463 * If error is emitted, `*this` shall be considered hosed: Behavior is undefined except dtor or move-assignment.
464 * Exception to this: interrupt_sends() leads to the emission of a particular error which shall be documented
465 * below; this is not fatal to `*this.
466 *
467 * @param err_code
468 * See `flow::Error_code` docs for error reporting semantics. #Error_code generated:
469 * various. Would-block shall not be emitted.
470 * error::Code::S_INTERRUPTED (preempted or interrupted by interrupt_sends()).
471 */
472 void wait_sendable(Error_code* err_code = 0);
473
474 /**
475 * Equivalent to timed_send() except stops short of writing anything, with `true` result indicating that
476 * try_send() *would* work at that moment. It shall block until fatal error, or
477 * error::Code::S_INTERRUPTED, or timeout, or transmissibility.
478 *
479 * If error is emitted, `*this` shall be considered hosed: Behavior is undefined except dtor or move-assignment.
480 * Exception to this: interrupt_sends() leads to the emission of a particular error which shall be documented
481 * below; this is not fatal to `*this.
482 *
483 * @param timeout_from_now
484 * See timed_send().
485 * @param err_code
486 * See `flow::Error_code` docs for error reporting semantics. #Error_code generated:
487 * various. Would-block shall not be emitted.
488 * error::Code::S_INTERRUPTED (preempted or interrupted by interrupt_sends()).
489 * @return `true` if transmissible; `false` if not, or on error or timeout; in the latter case `*err_code`
490 * distinguishes between timeout and fatal error.
491 */
492 bool timed_wait_sendable(util::Fine_duration timeout_from_now, Error_code* err_code = 0);
493
494 /**
495 * Non-blocking receive: pops copy of message from queue into buffer and returns `true`; if queue is empty then no-op
496 * and returns `false`. On `true` `blob->size()` is modified to store the # of bytes received.
497 * A null blob (`blob->size() == 0` post-condition) is possible.
498 *
499 * If error is emitted, `*this` shall be considered hosed: Behavior is undefined except dtor or move-assignment.
500 * Exception to this: If `blob->size()` is less than the required message size (or upper limit on this, if any),
501 * a particular error, which shall be documented below, is emitted; this is not fatal to `*this`.
502 *
503 * @param blob
504 * Buffer into which to copy into MQ and whose `->size()` to update to the # of bytes received;
505 * if message empty it is set to zero. Original `->size()` value indicates capacity of buffer;
506 * if this is insufficient based on either the popped message size or an upper limit (if any), it is an error.
507 * @param err_code
508 * See `flow::Error_code` docs for error reporting semantics. #Error_code generated:
509 * various. Would-block shall not be emitted.
510 * error::Code::S_MQ_MESSAGE_SIZE_OVER_OR_UNDERFLOW (size-underflow error: buffer is smaller than
511 * max_msg_size()).
512 * @return `true` on success; `false` on failure; in the latter case `*err_code` distinguishes
513 * between would-block and fatal error.
514 */
515 bool try_receive(util::Blob_mutable* blob, Error_code* err_code = 0);
516
517 /**
518 * Blocking receive: pops copy of message from queue into buffer; if queue is empty blocks until it is not.
519 * On `true` `blob->size()` is modified to store the # of bytes received.
520 * A null blob (`blob->size() == 0` post-condition) is possible.
521 *
522 * If error is emitted, `*this` shall be considered hosed: Behavior is undefined except dtor or move-assignment.
523 * Exception to this: If `blob->size()` is less than the required message size (or upper limit on this, if any),
524 * a particular error, which shall be documented below, is emitted; this is not fatal to `*this`.
525 * Exception to this: interrupt_receives()
526 * leads to the emission of a particular error which shall be documented below; this is not fatal to `*this.
527 *
528 * @param blob
529 * See try_receive().
530 * @param err_code
531 * See `flow::Error_code` docs for error reporting semantics. #Error_code generated:
532 * various. Would-block shall not be emitted.
533 * error::Code::S_MQ_MESSAGE_SIZE_OVER_OR_UNDERFLOW (size-underflow error: buffer is smaller than
534 * max_msg_size()).
535 * error::Code::S_INTERRUPTED (preempted or interrupted by interrupt_receives()).
536 */
537 void receive(util::Blob_mutable* blob, Error_code* err_code = 0);
538
539 /**
540 * Blocking timed receive: pops copy of message from queue into buffer; if queue is empty blocks until it is not, or
541 * the specified time passes, whichever happens first. Returns `true` on success; `false` on timeout or error.
542 * On `true` `blob->size()` is modified to store the # of bytes received.
543 * A null blob (`blob->size() == 0` post-condition) is possible.
544 *
545 * If error is emitted, `*this` shall be considered hosed: Behavior is undefined except dtor or move-assignment.
546 * Exception to this: If `blob->size()` is less than the required message size (or upper limit on this, if any),
547 * a particular error, which shall be documented below, is emitted; this is not fatal to `*this`.
548 *
549 * @warning Read the warning on timed_send() about the accuracy of the timeout. Same applies here.
550 *
551 * @param blob
552 * See try_receive().
553 * @param timeout_from_now
554 * Now + (this long) = the timeout time point.
555 * @param err_code
556 * See `flow::Error_code` docs for error reporting semantics. #Error_code generated:
557 * various. Would-block shall not be emitted. Timeout shall not be emitted.
558 * error::Code::S_MQ_MESSAGE_SIZE_OVER_OR_UNDERFLOW (size-underflow error: buffer is smaller than
559 * max_msg_size()).
560 * error::Code::S_INTERRUPTED (preempted or interrupted by interrupt_receives()).
561 * @return `true` on success; `false` on failure; in the latter case `*err_code` distinguishes
562 * between timeout and fatal error.
563 */
564 bool timed_receive(util::Blob_mutable* blob, util::Fine_duration timeout_from_now, Error_code* err_code = 0);
565
566 /**
567 * Equivalent to try_receive() except stops short of reading anything, with `true` result indicating that
568 * try_receive() *would* work at that moment.
569 *
570 * If error is emitted, `*this` shall be considered hosed: Behavior is undefined except dtor or move-assignment.
571 * Exception to this: interrupt_receives() leads to the emission of a particular error which shall be documented
572 * below; this is not fatal to `*this.
573 *
574 * @param err_code
575 * See `flow::Error_code` docs for error reporting semantics. #Error_code generated:
576 * various. Would-block shall not be emitted.
577 * error::Code::S_INTERRUPTED (preempted by interrupt_receives()).
578 * @return `true` if transmissible; `false` if not, or on error.
579 */
580 bool is_receivable(Error_code* err_code = 0);
581
582 /**
583 * Equivalent to receive() except stops short of reading anything, with non-error return indicating that
584 * try_receive() *would* work at that moment. It shall block indefinitely until fatal error, or
585 * error::Code::S_INTERRUPTED, or transmissibility.
586 *
587 * If error is emitted, `*this` shall be considered hosed: Behavior is undefined except dtor or move-assignment.
588 * Exception to this: interrupt_receives() leads to the emission of a particular error which shall be documented
589 * below; this is not fatal to `*this.
590 *
591 * @param err_code
592 * See `flow::Error_code` docs for error reporting semantics. #Error_code generated:
593 * various. Would-block shall not be emitted.
594 * error::Code::S_INTERRUPTED (preempted or interrupted by interrupt_receives()).
595 */
596 void wait_receivable(Error_code* err_code = 0);
597
598 /**
599 * Equivalent to timed_receive() except stops short of reading anything, with `true` result indicating that
600 * try_receive() *would* work at that moment. It shall block until fatal error, or
601 * error::Code::S_INTERRUPTED, or timeout, or transmissibility.
602 *
603 * If error is emitted, `*this` shall be considered hosed: Behavior is undefined except dtor or move-assignment.
604 * Exception to this: interrupt_receives() leads to the emission of a particular error which shall be documented
605 * below; this is not fatal to `*this.
606 *
607 * @param timeout_from_now
608 * See timed_receive().
609 * @param err_code
610 * See `flow::Error_code` docs for error reporting semantics. #Error_code generated:
611 * various. Would-block shall not be emitted.
612 * error::Code::S_INTERRUPTED (preempted or interrupted by interrupt_receives()).
613 * @return `true` if transmissible; `false` if not, or on error or timeout; in the latter case `*err_code`
614 * distinguishes between timeout and fatal error.
615 */
616 bool timed_wait_receivable(util::Fine_duration timeout_from_now, Error_code* err_code = 0);
617
618 /**
619 * Enables sends-interrupted mode: is_sendable() (future calls), send() (future or concurrent calls),
620 * timed_send() (ditto), wait_sendable() (ditto), timed_wait_sendable() (ditto) shall emit
621 * error::Code::S_INTERRUPTED as soon as possible.
622 *
623 * @return `true` on success; `false` is already enabled.
624 */
626
627 /**
628 * Disables mode enabled by interrupt_sends().
629 *
630 * @return `true` on success; `false` if already disabled.
631 */
633
634 /**
635 * Enables receives-interrupted mode: is_receivable() (future calls), receive() (future or concurrent calls),
636 * timed_receive() (ditto), wait_receivable() (ditto), timed_wait_receivable() (ditto) shall emit
637 * error::Code::S_INTERRUPTED as soon as possible.
638 *
639 * @return `true` on success; `false` is already enabled.
640 */
642
643 /**
644 * Disables mode enabled by interrupt_receives().
645 *
646 * @return `true` on success; `false` if already disabled.
647 */
649
650 /**
651 * Returns name equal to `absolute_name` passed to ctor.
652 * @return See above.
653 */
655
656 /**
657 * Returns the max message size of the underlying queue. This should at least approximately equal
658 * what was passed to ctor when creating the MQ; however the underlying transport may tweak it, such as rounding
659 * to page size.
660 *
661 * @return See above.
662 */
663 size_t max_msg_size() const;
664
665 /**
666 * Returns the max message count of the underlying queue. This should at least approximately equal
667 * what was passed to ctor when creating the MQ; however the underlying transport may tweak it if it desires.
668 *
669 * @return See above.
670 */
671 size_t max_n_msgs() const;
672
673 /**
674 * Available if and only if #S_HAS_NATIVE_HANDLE is `true`: Returns the stored native MQ handle; null means a
675 * constructor failed, or `*this` has been moved-from, hence the handle is to not-a-queue.
676 * See class doc header for important background discussion w/r/t the rationale behind this method.
677 *
678 * Medium-length story short: this is (in Linux impl of POSIX MQ) really an FD and as such can
679 * be wrapped via `boost::asio::posix::descriptor` and hence participate
680 * in a `Task_engine` event loop which internally will use `epoll()` together with all the other waited-on
681 * resources; or simply be `poll()`ed (etc.). E.g., boost.asio `async_wait()` for readability;
682 * then invoke `this->try_receive()` once it is readable.
683 *
684 * Behavior is undefined if you operate on this value, such as sending or receiving, concurrently to any
685 * transmission APIs acting on `*this`.
686 *
687 * @note It is possible to get away with not using this by using the `*_sendable()` and `*_receivable()`
688 * methods. However the blocking and timed variations cannot be straightforwardly interrupted.
689 *
690 * @return See above.
691 */
693}; // class Persistent_mq_handle
694
695// Free functions.
696
697/**
698 * Swaps two objects. Constant-time. Suitable for standard ADL-swap pattern `using std::swap; swap(val1, val2);`.
699 *
700 * @relatesalso Persistent_mq_handle
701 *
702 * @param val1
703 * Object.
704 * @param val2
705 * Object.
706 */
708
709} // namespace ipc::transport
A documentation-only concept defining the behavior of an object representing a light-weight handle to...
~Persistent_mq_handle()
Destroys this handle (or no-op if no handle was successfully constructed, or if it's a moved-from or ...
bool interrupt_receives()
Enables receives-interrupted mode: is_receivable() (future calls), receive() (future or concurrent ca...
size_t max_n_msgs() const
Returns the max message count of the underlying queue.
Persistent_mq_handle()
Construct null handle, suitable only for being subsequently moved-to or destroyed.
void wait_sendable(Error_code *err_code=0)
Equivalent to send() except stops short of writing anything, with non-error return indicating that tr...
Native_handle native_handle() const
Available if and only if S_HAS_NATIVE_HANDLE is true: Returns the stored native MQ handle; null means...
static void remove_persistent(flow::log::Logger *logger_ptr, const Shared_name &name, Error_code *err_code=0)
Removes the named persistent MQ.
Persistent_mq_handle & operator=(Persistent_mq_handle &&src)
Replaces handle with the source handle while making the latter invalid.
Persistent_mq_handle(flow::log::Logger *logger_ptr, const Shared_name &absolute_name, util::Create_only mode_tag, size_t max_n_msg, size_t max_msg_sz, const util::Permissions &perms=util::Permissions(), Error_code *err_code=0)
Construct handle to non-existing named MQ, creating it first.
const Shared_name & absolute_name() const
Returns name equal to absolute_name passed to ctor.
Persistent_mq_handle(Persistent_mq_handle &&src)
Constructs handle from the source handle while making the latter invalid.
void swap(Persistent_mq_handle &val1, Persistent_mq_handle &val2)
Swaps two objects.
void wait_receivable(Error_code *err_code=0)
Equivalent to receive() except stops short of reading anything, with non-error return indicating that...
bool try_receive(util::Blob_mutable *blob, Error_code *err_code=0)
Non-blocking receive: pops copy of message from queue into buffer and returns true; if queue is empty...
void send(const util::Blob_const &blob, Error_code *err_code=0)
Blocking send: pushes copy of message to queue; if queue is full blocks until it is not.
bool timed_wait_sendable(util::Fine_duration timeout_from_now, Error_code *err_code=0)
Equivalent to timed_send() except stops short of writing anything, with true result indicating that t...
bool allow_receives()
Disables mode enabled by interrupt_receives().
bool timed_send(const util::Blob_const &blob, util::Fine_duration timeout_from_now, Error_code *err_code=0)
Blocking timed send: pushes copy of message to queue; if queue is full blocks until it is not,...
bool is_sendable(Error_code *err_code=0)
Equivalent to try_send() except stops short of writing anything, with true result indicating that try...
static void for_each_persistent(const Handle_name_func &handle_name_func)
Lists all named persistent MQs currently persisting, invoking the given handler synchronously on each...
static constexpr bool S_HAS_NATIVE_HANDLE
true if and only if native_handle() method exists, and the returned value may be waited-on by poll()/...
bool timed_wait_receivable(util::Fine_duration timeout_from_now, Error_code *err_code=0)
Equivalent to timed_receive() except stops short of reading anything, with true result indicating tha...
bool is_receivable(Error_code *err_code=0)
Equivalent to try_receive() except stops short of reading anything, with true result indicating that ...
static const Shared_name S_RESOURCE_TYPE_ID
Shared_name relative-folder fragment (no separators) identifying this resource type.
bool try_send(const util::Blob_const &blob, Error_code *err_code=0)
Non-blocking send: pushes copy of message to queue and returns true; if queue is full then no-op and ...
void receive(util::Blob_mutable *blob, Error_code *err_code=0)
Blocking receive: pops copy of message from queue into buffer; if queue is empty blocks until it is n...
size_t max_msg_size() const
Returns the max message size of the underlying queue.
Persistent_mq_handle(const Persistent_mq_handle &)=delete
Copying of handles is prohibited.
Persistent_mq_handle & operator=(const Persistent_mq_handle &)=delete
Copying of handles is prohibited.
Persistent_mq_handle(flow::log::Logger *logger_ptr, const Shared_name &absolute_name, util::Open_or_create mode_tag, size_t max_n_msg_on_create, size_t max_msg_sz_on_create, const util::Permissions &perms_on_create=util::Permissions(), Error_code *err_code=0)
Construct handle to existing named MQ, or else if it does not exist creates it first and opens it (at...
bool allow_sends()
Disables mode enabled by interrupt_sends().
bool timed_receive(util::Blob_mutable *blob, util::Fine_duration timeout_from_now, Error_code *err_code=0)
Blocking timed receive: pops copy of message from queue into buffer; if queue is empty blocks until i...
Persistent_mq_handle(flow::log::Logger *logger_ptr, const Shared_name &absolute_name, util::Open_only mode_tag, Error_code *err_code=0)
Construct handle to existing named MQ.
bool interrupt_sends()
Enables sends-interrupted mode: is_sendable() (future calls), send() (future or concurrent calls),...
String-wrapping abstraction representing a name uniquely distinguishing a kernel-persistent entity fr...
Flow-IPC module providing transmission of structured messages and/or low-level blobs (and more) betwe...
bipc::permissions Permissions
Short-hand for Unix (POSIX) permissions class.
Definition: util_fwd.hpp:161
bipc::open_only_t Open_only
Tag type indicating an ideally-atomic open-if-exists-else-fail operation.
Definition: util_fwd.hpp:155
boost::asio::mutable_buffer Blob_mutable
Short-hand for an mutable blob somewhere in memory, stored as exactly a void* and a size_t.
Definition: util_fwd.hpp:140
bipc::open_or_create_t Open_or_create
Tag type indicating an atomic open-if-exists-else-create operation.
Definition: util_fwd.hpp:152
bipc::create_only_t Create_only
Tag type indicating a create-unless-exists-else-fail operation.
Definition: util_fwd.hpp:158
flow::Fine_duration Fine_duration
Short-hand for Flow's Fine_duration.
Definition: util_fwd.hpp:117
boost::asio::const_buffer Blob_const
Short-hand for an immutable blob somewhere in memory, stored as exactly a void const * and a size_t.
Definition: util_fwd.hpp:134
flow::Error_code Error_code
Short-hand for flow::Error_code which is very common.
Definition: common.hpp:298
A monolayer-thin wrapper around a native handle, a/k/a descriptor a/k/a FD.