Flow-IPC 1.0.2
Flow-IPC project: Full implementation reference.
posix_mq_handle.hpp
Go to the documentation of this file.
1/* Flow-IPC: Core
2 * Copyright 2023 Akamai Technologies, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the
5 * "License"); you may not use this file except in
6 * compliance with the License. You may obtain a copy
7 * of the License at
8 *
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in
12 * writing, software distributed under the License is
13 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
14 * CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing
16 * permissions and limitations under the License. */
17
18/// @file
19#pragma once
20
21#include "ipc/util/util_fwd.hpp"
26#include <flow/log/log.hpp>
27#include <flow/common.hpp>
28#include <mqueue.h>
29
30namespace ipc::transport
31{
32
33// Types.
34
35#ifndef FLOW_OS_LINUX
36static_assert(false, "Posix_mq_handle relies on Linux semantics and has not been tested in other POSIX OS; "
37 "likely does not exist in Windows.");
38#endif
39
40/**
41 * Implements the Persistent_mq_handle concept by wrapping the POSIX message queue API (see `man mq_overview`).
42 *
43 * @see Persistent_mq_handle: implemented concept.
44 *
45 * Reminder: This is available publicly in case it is useful; but it is more likely one would use a
46 * Blob_stream_mq_sender or Blob_stream_mq_receiver which provides a far more convenient boost.asio-like async-capable
47 * API. It uses class(es) like this one in its impl.
48 *
49 * native_handle() returns the underlying MQ descriptor; in Linux (the only OS supported as of this writing)
50 * this happens to be an FD; and as an FD it can (according to `man mq_overview`) participate in `epoll/poll/select()`.
51 * It is simple to wrap this descriptor in a boost.asio `posix::descriptor`. Having done that, one can
52 * `async_wait()` on it, awaiting writability and readability in async fashion not supported by the
53 * Persistent_mq_handle concept. Accordingly Posix_mq_handle::S_HAS_NATIVE_HANDLE is `true`.
54 *
55 * @internal
56 * ### Implementation ###
57 * I (ygoldfel) wrote this immediately after Bipc_mq_handle. Notably the thing that thinly wraps,
58 * `bipc::message_queue`, appears to be heavily influenced by the POSIX MQ API in terms of its API and functionality.
59 * So the implementation here is conceptually similar.
60 */
61class Posix_mq_handle : // Note: movable but not copyable.
62 public flow::log::Log_context
63{
64public:
65 // Constants.
66
67 /// Implements concept API.
69
70 /// Implements concept API. Contrast this value with Bipc_mq_handle::S_HAS_NATIVE_HANDLE.
71 static constexpr bool S_HAS_NATIVE_HANDLE = true;
72
73 // Constructors/destructor.
74
75 /// Implements Persistent_mq_handle API: Construct null handle.
77
78 /**
79 * Implements Persistent_mq_handle API: Construct handle to non-existing named MQ, creating it first. If it already
80 * exists, it is an error.
81 *
82 * @see Persistent_mq_handle::Persistent_mq_handle(): implemented concept.
83 *
84 * `max_n_msg` and `max_msg_sz` are subject to certain OS limits, according to `man mq_overview`. Watch out for
85 * those: we have no control over them here. The `man` page should give you the necessary information.
86 *
87 * @param logger_ptr
88 * See above.
89 * @param absolute_name
90 * See above.
91 * @param mode_tag
92 * See above.
93 * @param perms
94 * See above.
95 * Reminder: Suggest the use of util::shared_resource_permissions() to translate
96 * from one of a small handful of levels of access; these apply almost always in practice.
97 * @param max_n_msg
98 * See above.
99 * @param max_msg_sz
100 * See above.
101 * @param err_code
102 * See above.
103 */
104 explicit Posix_mq_handle(flow::log::Logger* logger_ptr, const Shared_name& absolute_name,
105 util::Create_only mode_tag, size_t max_n_msg, size_t max_msg_sz,
106 const util::Permissions& perms = util::Permissions(),
107 Error_code* err_code = 0);
108 /**
109 * Implements Persistent_mq_handle API: Construct handle to existing named MQ, or else if it does not exist creates
110 * it first and opens it (atomically).
111 *
112 * @see Persistent_mq_handle::Persistent_mq_handle(): implemented concept.
113 *
114 * @param logger_ptr
115 * See above.
116 * @param absolute_name
117 * See above.
118 * @param mode_tag
119 * See above.
120 * @param perms_on_create
121 * See above.
122 * Reminder: Suggest the use of util::shared_resource_permissions() to translate
123 * from one of a small handful of levels of access; these apply almost always in practice.
124 * @param max_n_msg_on_create
125 * See above.
126 * @param max_msg_sz_on_create
127 * See above.
128 * @param err_code
129 * See above.
130 */
131 explicit Posix_mq_handle(flow::log::Logger* logger_ptr, const Shared_name& absolute_name,
132 util::Open_or_create mode_tag, size_t max_n_msg_on_create, size_t max_msg_sz_on_create,
133 const util::Permissions& perms_on_create = util::Permissions(),
134 Error_code* err_code = 0);
135 /**
136 * Implements Persistent_mq_handle API: Construct handle to existing named MQ. If it does not exist, it is an error.
137 *
138 * @param logger_ptr
139 * See above.
140 * @param absolute_name
141 * See above.
142 * @param mode_tag
143 * See above.
144 * @param err_code
145 * See above.
146 */
147 explicit Posix_mq_handle(flow::log::Logger* logger_ptr, const Shared_name& absolute_name,
148 util::Open_only mode_tag, Error_code* err_code = 0);
149
150 /**
151 * Implements Persistent_mq_handle API: Constructs handle from the source handle while making the latter as-if
152 * default-cted. Reminder, informally: This is a light-weight op.
153 *
154 * @see Persistent_mq_handle::Persistent_mq_handle(): implemented concept.
155 *
156 * @param src
157 * See above.
158 */
160
161 /// Copying of handles is prohibited, per Persistent_mq_handle concept.
163
164 /**
165 * Implements Persistent_mq_handle API: Destroys this handle (or no-op if no handle was successfully constructed, or
166 * if it's a moved-from or default-cted handle). Reminder: The underlying MQ (if any) is *not* destroyed and can
167 * be attached-to by another handle.
168 *
169 * @see Persistent_mq_handle::~Persistent_mq_handle(): implemented concept.
170 */
172
173 // Methods.
174
175 /**
176 * Implements Persistent_mq_handle API: Replaces handle with the source handle while making the latter invalid as-if
177 * default-cted. Reminder, informally: this is a light-weight op.
178 *
179 * @param src
180 * See above.
181 * @return `*this`.
182 */
184
185 /// Copying of handles is prohibited, per Persistent_mq_handle concept.
187
188 /**
189 * Implements Persistent_mq_handle API: Removes the named persistent MQ. Reminder: name is removed immediately
190 * (if present -- otherwise error), but underlying MQ continues to exist until all system-wide handles to it
191 * are closed.
192 *
193 * @see Persistent_mq_handle::remove_persistent(): implemented concept.
194 *
195 * @see Reminder: see also `util::remove_each_persistent_*()`.
196 *
197 * @param logger_ptr
198 * See above.
199 * @param name
200 * See above.
201 * @param err_code
202 * See above.
203 */
204 static void remove_persistent(flow::log::Logger* logger_ptr, const Shared_name& name, Error_code* err_code = 0);
205
206 /**
207 * Implements Persistent_mq_handle API. Impl note for exposition: we use the fact that, e.g., in Linux
208 * the POSIX MQ devices are listed in flat fashion in /dev/mqueue.
209 *
210 * @see Persistent_mq_handle::for_each_persistent(): implemented concept.
211 *
212 * @tparam Handle_name_func
213 * See above.
214 * @param handle_name_func
215 * See above.
216 */
217 template<typename Handle_name_func>
218 static void for_each_persistent(const Handle_name_func& handle_name_func);
219
220 /**
221 * Implements Persistent_mq_handle API: Non-blocking send: pushes copy of message to queue and returns `true`;
222 * if queue is full then no-op and returns `false`.
223 *
224 * @see Persistent_mq_handle::try_send(): implemented concept.
225 *
226 * ### INFO+ logging ###
227 * WARNING on error. (You may use the `flow::log::Config::this_thread_verbosity_override_auto()` to
228 * temporarily, in that thread only, disable/reduce logging. This is quite easy and performant.)
229 *
230 * @param blob
231 * See above.
232 * @param err_code
233 * See above.
234 * @return See above.
235 */
236 bool try_send(const util::Blob_const& blob, Error_code* err_code = 0);
237
238 /**
239 * Implements Persistent_mq_handle API: Blocking send: pushes copy of message to queue; if queue is full blocks
240 * until it is not.
241 *
242 * @see Persistent_mq_handle::send(): implemented concept.
243 *
244 * ### INFO+ logging ###
245 * WARNING on error. (You may use the `flow::log::Config::this_thread_verbosity_override_auto()`
246 * to temporarily, in that thread only, disable/reduce logging. This is quite easy and performant.)
247 *
248 * @param blob
249 * See above.
250 * @param err_code
251 * See above.
252 */
253 void send(const util::Blob_const& blob, Error_code* err_code = 0);
254
255 /**
256 * Implements Persistent_mq_handle API: Blocking timed send: pushes copy of message to queue; if queue is full
257 * blocks until it is not, or the specified time passes, whichever happens first.
258 *
259 * @see Persistent_mq_handle::timed_send(): implemented concept.
260 *
261 * ### INFO+ logging ###
262 * WARNING on error or timed out.
263 * (You may use the `flow::log::Config::this_thread_verbosity_override_auto()` to
264 * temporarily, in that thread only, disable/reduce logging. This is quite easy and performant.)
265 *
266 * @param blob
267 * See above.
268 * @param timeout_from_now
269 * See above.
270 * @param err_code
271 * See above.
272 * @return See above.
273 */
274 bool timed_send(const util::Blob_const& blob, util::Fine_duration timeout_from_now, Error_code* err_code = 0);
275
276 /**
277 * Implements Persistent_mq_handle API: Like try_send() but without the actual pushing of a message.
278 *
279 * @see Persistent_mq_handle::is_sendable(): implemented concept.
280 *
281 * ### INFO+ logging ###
282 * WARNING on error.
283 *
284 * @param err_code
285 * See above.
286 * @return See above.
287 */
288 bool is_sendable(Error_code* err_code = 0);
289
290 /**
291 * Implements Persistent_mq_handle API: Like send() but without the actual pushing of a message.
292 *
293 * @see Persistent_mq_handle::wait_sendable(): implemented concept.
294 *
295 * ### INFO+ logging ###
296 * WARNING on error.
297 *
298 * @param err_code
299 * See above.
300 */
301 void wait_sendable(Error_code* err_code = 0);
302
303 /**
304 * Implements Persistent_mq_handle API: Like timed_send() but without the actual pushing of a message.
305 *
306 * @see Persistent_mq_handle::timed_wait_sendable(): implemented concept.
307 *
308 * ### INFO+ logging ###
309 * WARNING on error.
310 *
311 * @param err_code
312 * See above.
313 * @param timeout_from_now
314 * See above.
315 * @return See above.
316 */
317 bool timed_wait_sendable(util::Fine_duration timeout_from_now, Error_code* err_code = 0);
318
319 /**
320 * Implements Persistent_mq_handle API: Non-blocking receive: pops copy of message from queue into buffer and
321 * returns `true`; if queue is empty then no-op and returns `false`.
322 *
323 * @see Persistent_mq_handle::try_receive(): implemented concept.
324 *
325 * ### INFO+ logging ###
326 * WARNING on error. (You may use the `flow::log::Config::this_thread_verbosity_override_auto()` to
327 * temporarily, in that thread only, disable/reduce logging. This is quite easy and performant.)
328 *
329 * @param blob
330 * See above.
331 * @param err_code
332 * See above.
333 * @return See above.
334 */
335 bool try_receive(util::Blob_mutable* blob, Error_code* err_code = 0);
336
337 /**
338 * Implements Persistent_mq_handle API: Blocking receive: pops copy of message from queue into buffer; if queue
339 * is empty blocks until it is not.
340 *
341 * @see Persistent_mq_handle::receive(): implemented concept.
342 *
343 * ### INFO+ logging ###
344 * WARNING on error. (You may use the `flow::log::Config::this_thread_verbosity_override_auto()` to
345 * temporarily, in that thread only, disable/reduce logging. This is quite easy and performant.)
346 *
347 * @param blob
348 * See above.
349 * @param err_code
350 * See above.
351 */
352 void receive(util::Blob_mutable* blob, Error_code* err_code = 0);
353
354 /**
355 * Implements Persistent_mq_handle API: Blocking timed receive: pops copy of message from queue into buffer;
356 * if queue is empty blocks until it is not, or the specified time passes, whichever happens first.
357 *
358 * @see Persistent_mq_handle::timed_receive(): implemented concept.
359 *
360 * ### INFO+ logging ###
361 * WARNING on error or timed out. (You may use the `flow::log::Config::this_thread_verbosity_override_auto()` to
362 * temporarily, in that thread only, disable/reduce logging. This is quite easy and performant.)
363 *
364 * @param blob
365 * See above.
366 * @param timeout_from_now
367 * See above.
368 * @param err_code
369 * See above.
370 * @return See above.
371 */
372 bool timed_receive(util::Blob_mutable* blob, util::Fine_duration timeout_from_now, Error_code* err_code = 0);
373
374 /**
375 * Implements Persistent_mq_handle API: Like try_receive() but without the actual popping of a message.
376 *
377 * @see Persistent_mq_handle::is_receivable(): implemented concept.
378 *
379 * ### INFO+ logging ###
380 * WARNING on error.
381 *
382 * @param err_code
383 * See above.
384 * @return See above.
385 */
386 bool is_receivable(Error_code* err_code = 0);
387
388 /**
389 * Implements Persistent_mq_handle API: Like receive() but without the actual popping of a message.
390 *
391 * @see Persistent_mq_handle::wait_receivable(): implemented concept.
392 *
393 * ### INFO+ logging ###
394 * WARNING on error.
395 *
396 * @param err_code
397 * See above.
398 */
399 void wait_receivable(Error_code* err_code = 0);
400
401 /**
402 * Implements Persistent_mq_handle API: Like timed_receive() but without the actual popping of a message.
403 *
404 * @see Persistent_mq_handle::timed_wait_receivable(): implemented concept.
405 *
406 * ### INFO+ logging ###
407 * WARNING on error.
408 *
409 * @param err_code
410 * See above.
411 * @param timeout_from_now
412 * See above.
413 * @return See above.
414 */
415 bool timed_wait_receivable(util::Fine_duration timeout_from_now, Error_code* err_code = 0);
416
417 /**
418 * Implements Persistent_mq_handle API:
419 * Turn on preemptive/concurrent interruption of blocking-sends and sendable-waits/polls.
420 *
421 * @see Persistent_mq_handle::interrupt_sends(): implemented concept.
422 *
423 * ### INFO+ logging ###
424 * WARNING on duplicate use, INFO otherwise.
425 *
426 * @return See above.
427 */
428 bool interrupt_sends();
429
430 /**
431 * Implements Persistent_mq_handle API:
432 * Turn off preemptive/concurrent interruption of blocking-sends and sendable-waits/polls.
433 *
434 * @see Persistent_mq_handle::allow_sends(): implemented concept.
435 *
436 * ### INFO+ logging ###
437 * WARNING on duplicate use, INFO otherwise.
438 *
439 * @return See above.
440 */
441 bool allow_sends();
442
443 /**
444 * Implements Persistent_mq_handle API:
445 * Turn on preemptive/concurrent interruption of blocking-receives and receivable-waits/polls.
446 *
447 * @see Persistent_mq_handle::interrupt_receives(): implemented concept.
448 *
449 * ### INFO+ logging ###
450 * WARNING on duplicate use, INFO otherwise.
451 *
452 * @return See above.
453 */
454 bool interrupt_receives();
455
456 /**
457 * Implements Persistent_mq_handle API:
458 * Turn off preemptive/concurrent interruption of blocking-receives and receivable-waits/polls.
459 *
460 * @see Persistent_mq_handle::allow_receives(): implemented concept.
461 *
462 * ### INFO+ logging ###
463 * WARNING on duplicate use, INFO otherwise.
464 *
465 * @return See above.
466 */
467 bool allow_receives();
468
469 /**
470 * Implements Persistent_mq_handle API: Returns name equal to `absolute_name` passed to ctor.
471 * @return See above.
472 * @see Persistent_mq_handle::absolute_name(): implemented concept.
473 */
474 const Shared_name& absolute_name() const;
475
476 /**
477 * Implements Persistent_mq_handle API: Returns the max message size of the underlying queue. Reminder:
478 * This is not required to match was was passed to `Create_only` or `Open_or_create` ctor.
479 *
480 * @return See above.
481 * @see Persistent_mq_handle::max_msg_size(): implemented concept.
482 */
483 size_t max_msg_size() const;
484
485 /**
486 * Implements Persistent_mq_handle API: Returns the max message count of the underlying queue. Reminder:
487 * This is not required to match was was passed to `Create_only` or `Open_or_create` ctor.
488 *
489 * @return See above.
490 * @see Persistent_mq_handle::max_n_msgs(): implemented concept.
491 */
492 size_t max_n_msgs() const;
493
494 /**
495 * Implements Persistent_mq_handle API: Returns the stored native MQ handle; null if not open.
496 *
497 * @return See above.
498 * @see Persistent_mq_handle::native_handle(): implemented concept.
499 */
501
502private:
503 // Types.
504
505 /// Short-hand for anonymous pipe write end.
507
508 /// Short-hand for anonymous pipe read end.
510
511 // Friends.
512
513 // Friend of Posix_mq_handle.
514 friend void swap(Posix_mq_handle& val1, Posix_mq_handle& val2);
515
516 // Constructors/destructor.
517
518 /**
519 * Helper ctor delegated by the 2 `public` ctors that take `Open_or_create` or `Create_only` mode.
520 *
521 * @tparam Mode_tag
522 * Either util::Open_or_create or util::Create_only.
523 * @param logger_ptr
524 * See `public` ctors.
525 * @param absolute_name
526 * See `public` ctors.
527 * @param mode_tag
528 * See `public` ctors.
529 * @param max_n_msg_on_create
530 * See `public` ctors.
531 * @param max_msg_sz_on_create
532 * See `public` ctors.
533 * @param perms_on_create
534 * See `public` ctors.
535 * @param err_code
536 * See `public` ctors.
537 */
538 template<typename Mode_tag>
539 explicit Posix_mq_handle(Mode_tag mode_tag, flow::log::Logger* logger_ptr, const Shared_name& absolute_name,
540 size_t max_n_msg_on_create, size_t max_msg_sz_on_create,
541 const util::Permissions& perms_on_create,
542 Error_code* err_code);
543
544 // Methods.
545
546 /**
547 * Ctor helper that sets up `m_interrupt*` pipe items. If it fails it returns truthy code
548 * and cleans up what it did. It ignores everything else like #m_mq.
549 *
550 * @return See above.
551 */
553
554 /**
555 * Ctor helper that sets up #m_epoll_hndl_snd and #m_epoll_hndl_rcv. If it fails it returns truthy code
556 * and puts everything back to as-if-ctor-failed state, including #m_mq being null.
557 *
558 * @return See above.
559 */
561
562 /**
563 * Sets #m_mq to blocking or non-blocking and returns `true` on success and clears `*err_code`; otherwise returns
564 * `false` and sets truthy `*err_code`.
565 *
566 * @param err_code
567 * To set. If null behavior is undefined (assertion may trip).
568 * @param nb
569 * Non-blocking if `true`, else blocking.
570 * @return `true` <=> success.
571 */
572 bool set_non_blocking(bool nb, Error_code* err_code);
573
574 /**
575 * Helper that handles the result of an `mq_*()` call by logging WARNING(s) on error; setting `*err_code` on error;
576 * clearing it on success. `errno` is presumably set by the native API on error going into this.
577 *
578 * @param result
579 * What the native API returned.
580 * @param err_code
581 * To set. If null behavior is undefined (assertion may trip).
582 * @param context
583 * See Bipc_mq_handle::op_with_possible_bipc_mq_exception().
584 * @return `true` if `*err_code` was set to success (falsy); else `false` (it was set to truthy).
585 */
586 bool handle_mq_api_result(int result, Error_code* err_code, util::String_view context) const;
587
588 /**
589 * Impl body for `interrupt_*()`.
590 *
591 * @tparam SND_ELSE_RCV
592 * True for `*_sends()`, else `*_receives()`.
593 * @return See callers.
594 */
595 template<bool SND_ELSE_RCV>
596 bool interrupt_impl();
597
598 /**
599 * Impl body for `allow_*()`.
600 *
601 * @tparam SND_ELSE_RCV
602 * True for `*_sends()`, else `*_receives()`.
603 * @return See callers.
604 */
605 template<bool SND_ELSE_RCV>
606 bool allow_impl();
607
608 /**
609 * Impl body for `*_sendable()` and `*_receivable()`.
610 *
611 * @param timeout_from_now_or_none
612 * `timeout_from_now`; or 0 for `is_*()`, or `Fine_duration::max()` for non-timed-blocking variant.
613 * @param snd_else_rcv
614 * True for `*_sendable()`, else `*_receivable()`.
615 * @param err_code
616 * See callers.
617 * @return See callers.
618 */
619 bool wait_impl(util::Fine_duration timeout_from_now_or_none, bool snd_else_rcv, Error_code* err_code);
620
621 // Data.
622
623 /**
624 * Underlying MQ handle. We are a thin wrapper around this really. `.null()` if creation
625 * fails in ctor, or if `*this` was moved-from. This is very light-weight; probably `int`.
626 */
628
629 /// See absolute_name().
631
632 /**
633 * `epoll_*()` handle (`.null()` if and only if #m_mq is null) that is level-triggered to be active
634 * (with only 1 event registered) if and only if #m_mq is currently capable of sending at least 1 message
635 * (we could push 1+ messages right now). Used by wait_impl() and its various public `*_sendable()` forms.
636 */
638
639 /**
640 * `epoll_*()` handle (`.null()` if and only if #m_mq is null) that is level-triggered to be active
641 * (with only 1 event registered) if and only if #m_mq is currently storing at least 1 message
642 * (we could pop 1+ messages right now). Used by wait_impl() and its various public `*_receivable()` forms.
643 */
645
646 /**
647 * Starting at `false`, this is made `true` via interrupt_sends(), and back by allow_sends(); acts as a guard
648 * against doing it when already in effect. Note that that is its only purpose; as wait_impl() never checks it;
649 * instead it relies on `epoll_wait()` detecting a readable #m_interrupt_detector_rcv.
650 */
652
653 /// Other-direction counterpart to #m_interrupting_snd.
655
656 /// Never used for `.run()` or `.async()` -- just so we can construct #Pipe_reader, #Pipe_writer.
657 flow::util::Task_engine m_nb_task_engine;
658
659 /**
660 * A byte is written to this end by interrupt_sends() to make it readable for the poll-wait in wait_impl()
661 * indicating that #m_interrupting_snd mode is on.
662 */
664
665 /**
666 * A byte is read from this end by allow_sends() to make it not-readable for the poll-wait in wait_impl()
667 * indicating that #m_interrupting_snd mode is off; and wait_impl() poll-waits on this along with
668 * #m_mq -- if it is readable, then the mode is on.
669 */
671
672 /// Other-direction counterpart to #m_interrupter_snd.
674
675 /// Other-direction counterpart to #m_interrupt_detector_snd.
677}; // class Posix_mq_handle
678
679// Free functions: in *_fwd.hpp.
680
681// Template implementations.
682
683template<typename Handle_name_func>
684void Posix_mq_handle::for_each_persistent(const Handle_name_func& handle_name_func) // Static.
685{
686#ifndef FLOW_OS_LINUX
687 static_assert(false, "This method relies on/has been tested only with Linux /dev/mqueue semantics.");
688#endif
689 util::for_each_persistent_impl("/dev/mqueue", handle_name_func);
690}
691
692} // namespace ipc::transport
Implements the Persistent_mq_handle concept by wrapping the POSIX message queue API (see man mq_overv...
bool allow_sends()
Implements Persistent_mq_handle API: Turn off preemptive/concurrent interruption of blocking-sends an...
bool interrupt_receives()
Implements Persistent_mq_handle API: Turn on preemptive/concurrent interruption of blocking-receives ...
bool interrupt_sends()
Implements Persistent_mq_handle API: Turn on preemptive/concurrent interruption of blocking-sends and...
util::Pipe_reader Pipe_reader
Short-hand for anonymous pipe read end.
void wait_receivable(Error_code *err_code=0)
Implements Persistent_mq_handle API: Like receive() but without the actual popping of a message.
static const Shared_name S_RESOURCE_TYPE_ID
Implements concept API.
static void for_each_persistent(const Handle_name_func &handle_name_func)
Implements Persistent_mq_handle API.
bool handle_mq_api_result(int result, Error_code *err_code, util::String_view context) const
Helper that handles the result of an mq_*() call by logging WARNING(s) on error; setting *err_code on...
Pipe_reader m_interrupt_detector_snd
A byte is read from this end by allow_sends() to make it not-readable for the poll-wait in wait_impl(...
Native_handle m_epoll_hndl_snd
epoll_*() handle (.null() if and only if m_mq is null) that is level-triggered to be active (with onl...
bool is_receivable(Error_code *err_code=0)
Implements Persistent_mq_handle API: Like try_receive() but without the actual popping of a message.
bool allow_receives()
Implements Persistent_mq_handle API: Turn off preemptive/concurrent interruption of blocking-receives...
bool is_sendable(Error_code *err_code=0)
Implements Persistent_mq_handle API: Like try_send() but without the actual pushing of a message.
size_t max_msg_size() const
Implements Persistent_mq_handle API: Returns the max message size of the underlying queue.
Pipe_reader m_interrupt_detector_rcv
Other-direction counterpart to m_interrupt_detector_snd.
Error_code epoll_setup()
Ctor helper that sets up m_epoll_hndl_snd and m_epoll_hndl_rcv.
util::Pipe_writer Pipe_writer
Short-hand for anonymous pipe write end.
size_t max_n_msgs() const
Implements Persistent_mq_handle API: Returns the max message count of the underlying queue.
bool try_send(const util::Blob_const &blob, Error_code *err_code=0)
Implements Persistent_mq_handle API: Non-blocking send: pushes copy of message to queue and returns t...
Pipe_writer m_interrupter_rcv
Other-direction counterpart to m_interrupter_snd.
Native_handle native_handle() const
Implements Persistent_mq_handle API: Returns the stored native MQ handle; null if not open.
bool timed_wait_receivable(util::Fine_duration timeout_from_now, Error_code *err_code=0)
Implements Persistent_mq_handle API: Like timed_receive() but without the actual popping of a message...
bool wait_impl(util::Fine_duration timeout_from_now_or_none, bool snd_else_rcv, Error_code *err_code)
Impl body for *_sendable() and *_receivable().
friend void swap(Posix_mq_handle &val1, Posix_mq_handle &val2)
Implements Persistent_mq_handle related concept: Swaps two objects.
static void remove_persistent(flow::log::Logger *logger_ptr, const Shared_name &name, Error_code *err_code=0)
Implements Persistent_mq_handle API: Removes the named persistent MQ.
void receive(util::Blob_mutable *blob, Error_code *err_code=0)
Implements Persistent_mq_handle API: Blocking receive: pops copy of message from queue into buffer; i...
void send(const util::Blob_const &blob, Error_code *err_code=0)
Implements Persistent_mq_handle API: Blocking send: pushes copy of message to queue; if queue is full...
Posix_mq_handle()
Implements Persistent_mq_handle API: Construct null handle.
bool m_interrupting_snd
Starting at false, this is made true via interrupt_sends(), and back by allow_sends(); acts as a guar...
Posix_mq_handle(const Posix_mq_handle &)=delete
Copying of handles is prohibited, per Persistent_mq_handle concept.
const Shared_name & absolute_name() const
Implements Persistent_mq_handle API: Returns name equal to absolute_name passed to ctor.
Native_handle m_epoll_hndl_rcv
epoll_*() handle (.null() if and only if m_mq is null) that is level-triggered to be active (with onl...
Posix_mq_handle & operator=(const Posix_mq_handle &)=delete
Copying of handles is prohibited, per Persistent_mq_handle concept.
bool allow_impl()
Impl body for allow_*().
bool interrupt_impl()
Impl body for interrupt_*().
bool timed_wait_sendable(util::Fine_duration timeout_from_now, Error_code *err_code=0)
Implements Persistent_mq_handle API: Like timed_send() but without the actual pushing of a message.
bool timed_receive(util::Blob_mutable *blob, util::Fine_duration timeout_from_now, Error_code *err_code=0)
Implements Persistent_mq_handle API: Blocking timed receive: pops copy of message from queue into buf...
~Posix_mq_handle()
Implements Persistent_mq_handle API: Destroys this handle (or no-op if no handle was successfully con...
bool set_non_blocking(bool nb, Error_code *err_code)
Sets m_mq to blocking or non-blocking and returns true on success and clears *err_code; otherwise ret...
Shared_name m_absolute_name
See absolute_name().
bool timed_send(const util::Blob_const &blob, util::Fine_duration timeout_from_now, Error_code *err_code=0)
Implements Persistent_mq_handle API: Blocking timed send: pushes copy of message to queue; if queue i...
bool try_receive(util::Blob_mutable *blob, Error_code *err_code=0)
Implements Persistent_mq_handle API: Non-blocking receive: pops copy of message from queue into buffe...
Error_code pipe_setup()
Ctor helper that sets up m_interrupt* pipe items.
void wait_sendable(Error_code *err_code=0)
Implements Persistent_mq_handle API: Like send() but without the actual pushing of a message.
bool m_interrupting_rcv
Other-direction counterpart to m_interrupting_snd.
Pipe_writer m_interrupter_snd
A byte is written to this end by interrupt_sends() to make it readable for the poll-wait in wait_impl...
Native_handle m_mq
Underlying MQ handle.
flow::util::Task_engine m_nb_task_engine
Never used for .run() or .async() – just so we can construct Pipe_reader, Pipe_writer.
Posix_mq_handle & operator=(Posix_mq_handle &&src)
Implements Persistent_mq_handle API: Replaces handle with the source handle while making the latter i...
static constexpr bool S_HAS_NATIVE_HANDLE
Implements concept API. Contrast this value with Bipc_mq_handle::S_HAS_NATIVE_HANDLE.
String-wrapping abstraction representing a name uniquely distinguishing a kernel-persistent entity fr...
Flow-IPC module providing transmission of structured messages and/or low-level blobs (and more) betwe...
bipc::permissions Permissions
Short-hand for Unix (POSIX) permissions class.
Definition: util_fwd.hpp:161
bipc::open_only_t Open_only
Tag type indicating an ideally-atomic open-if-exists-else-fail operation.
Definition: util_fwd.hpp:155
void for_each_persistent_impl(const fs::path &persistent_obj_dev_dir_path, const Handle_name_func &handle_name_func)
Implementation of Persistent_object::for_each_persistent(); for example see shm::classic::Pool_arena:...
Definition: util.hpp:90
boost::asio::mutable_buffer Blob_mutable
Short-hand for an mutable blob somewhere in memory, stored as exactly a void* and a size_t.
Definition: util_fwd.hpp:140
boost::asio::writable_pipe Pipe_writer
Short-hand for anonymous pipe write end.
Definition: util_fwd.hpp:32
bipc::open_or_create_t Open_or_create
Tag type indicating an atomic open-if-exists-else-create operation.
Definition: util_fwd.hpp:152
bipc::create_only_t Create_only
Tag type indicating a create-unless-exists-else-fail operation.
Definition: util_fwd.hpp:158
boost::asio::readable_pipe Pipe_reader
Short-hand for anonymous pipe read end.
Definition: util_fwd.hpp:35
flow::Fine_duration Fine_duration
Short-hand for Flow's Fine_duration.
Definition: util_fwd.hpp:117
boost::asio::const_buffer Blob_const
Short-hand for an immutable blob somewhere in memory, stored as exactly a void const * and a size_t.
Definition: util_fwd.hpp:134
flow::util::String_view String_view
Short-hand for Flow's String_view.
Definition: util_fwd.hpp:115
flow::Error_code Error_code
Short-hand for flow::Error_code which is very common.
Definition: common.hpp:298
A monolayer-thin wrapper around a native handle, a/k/a descriptor a/k/a FD.