Flow-IPC 1.0.0
Flow-IPC project: Full implementation reference.
posix_mq_handle.hpp
Go to the documentation of this file.
1/* Flow-IPC: Core
2 * Copyright 2023 Akamai Technologies, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the
5 * "License"); you may not use this file except in
6 * compliance with the License. You may obtain a copy
7 * of the License at
8 *
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in
12 * writing, software distributed under the License is
13 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
14 * CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing
16 * permissions and limitations under the License. */
17
18/// @file
19#pragma once
20
21#include "ipc/util/util_fwd.hpp"
26#include <flow/log/log.hpp>
27#include <flow/common.hpp>
28#include <mqueue.h>
29
30namespace ipc::transport
31{
32
33// Types.
34
35#ifndef FLOW_OS_LINUX
36# error "Posix_mq_handle relies on Linux semantics and has not been tested in other Unix; cannot exist in Windows."
37#endif
38
39/**
40 * Implements the Persistent_mq_handle concept by wrapping the POSIX message queue API (see `man mq_overview`).
41 *
42 * @see Persistent_mq_handle: implemented concept.
43 *
44 * Reminder: This is available publicly in case it is useful; but it is more likely one would use a
45 * Blob_stream_mq_sender or Blob_stream_mq_receiver which provides a far more convenient boost.asio-like async-capable
46 * API. It uses class(es) like this one in its impl.
47 *
48 * native_handle() returns the underlying MQ descriptor; in Linux (the only OS supported as of this writing)
49 * this happens to be an FD; and as an FD it can (according to `man mq_overview`) participate in `epoll/poll/select()`.
50 * It is simple to wrap this descriptor in a boost.asio `posix::descriptor`. Having done that, one can
51 * `async_wait()` on it, awaiting writability and readability in async fashion not supported by the
52 * Persistent_mq_handle concept. Accordingly Posix_mq_handle::S_HAS_NATIVE_HANDLE is `true`.
53 *
54 * @internal
55 * ### Implementation ###
56 * I (ygoldfel) wrote this immediately after Bipc_mq_handle. Notably the thing that thinly wraps,
57 * `bipc::message_queue`, appears to be heavily influenced by the POSIX MQ API in terms of its API and functionality.
58 * So the implementation here is conceptually similar.
59 */
60class Posix_mq_handle : // Note: movable but not copyable.
61 public flow::log::Log_context
62{
63public:
64 // Constants.
65
66 /// Implements concept API.
68
69 /// Implements concept API. Contrast this value with Bipc_mq_handle::S_HAS_NATIVE_HANDLE.
70 static constexpr bool S_HAS_NATIVE_HANDLE = true;
71
72 // Constructors/destructor.
73
74 /// Implements Persistent_mq_handle API: Construct null handle.
76
77 /**
78 * Implements Persistent_mq_handle API: Construct handle to non-existing named MQ, creating it first. If it already
79 * exists, it is an error.
80 *
81 * @see Persistent_mq_handle::Persistent_mq_handle(): implemented concept.
82 *
83 * `max_n_msg` and `max_msg_sz` are subject to certain OS limits, according to `man mq_overview`. Watch out for
84 * those: we have no control over them here. The `man` page should give you the necessary information.
85 *
86 * @param logger_ptr
87 * See above.
88 * @param absolute_name
89 * See above.
90 * @param mode_tag
91 * See above.
92 * @param perms
93 * See above.
94 * Reminder: Suggest the use of util::shared_resource_permissions() to translate
95 * from one of a small handful of levels of access; these apply almost always in practice.
96 * @param max_n_msg
97 * See above.
98 * @param max_msg_sz
99 * See above.
100 * @param err_code
101 * See above.
102 */
103 explicit Posix_mq_handle(flow::log::Logger* logger_ptr, const Shared_name& absolute_name,
104 util::Create_only mode_tag, size_t max_n_msg, size_t max_msg_sz,
105 const util::Permissions& perms = util::Permissions(),
106 Error_code* err_code = 0);
107 /**
108 * Implements Persistent_mq_handle API: Construct handle to existing named MQ, or else if it does not exist creates
109 * it first and opens it (atomically).
110 *
111 * @see Persistent_mq_handle::Persistent_mq_handle(): implemented concept.
112 *
113 * @param logger_ptr
114 * See above.
115 * @param absolute_name
116 * See above.
117 * @param mode_tag
118 * See above.
119 * @param perms_on_create
120 * See above.
121 * Reminder: Suggest the use of util::shared_resource_permissions() to translate
122 * from one of a small handful of levels of access; these apply almost always in practice.
123 * @param max_n_msg_on_create
124 * See above.
125 * @param max_msg_sz_on_create
126 * See above.
127 * @param err_code
128 * See above.
129 */
130 explicit Posix_mq_handle(flow::log::Logger* logger_ptr, const Shared_name& absolute_name,
131 util::Open_or_create mode_tag, size_t max_n_msg_on_create, size_t max_msg_sz_on_create,
132 const util::Permissions& perms_on_create = util::Permissions(),
133 Error_code* err_code = 0);
134 /**
135 * Implements Persistent_mq_handle API: Construct handle to existing named MQ. If it does not exist, it is an error.
136 *
137 * @param logger_ptr
138 * See above.
139 * @param absolute_name
140 * See above.
141 * @param mode_tag
142 * See above.
143 * @param err_code
144 * See above.
145 */
146 explicit Posix_mq_handle(flow::log::Logger* logger_ptr, const Shared_name& absolute_name,
147 util::Open_only mode_tag, Error_code* err_code = 0);
148
149 /**
150 * Implements Persistent_mq_handle API: Constructs handle from the source handle while making the latter as-if
151 * default-cted. Reminder, informally: This is a light-weight op.
152 *
153 * @see Persistent_mq_handle::Persistent_mq_handle(): implemented concept.
154 *
155 * @param src
156 * See above.
157 */
159
160 /// Copying of handles is prohibited, per Persistent_mq_handle concept.
162
163 /**
164 * Implements Persistent_mq_handle API: Destroys this handle (or no-op if no handle was successfully constructed, or
165 * if it's a moved-from or default-cted handle). Reminder: The underlying MQ (if any) is *not* destroyed and can
166 * be attached-to by another handle.
167 *
168 * @see Persistent_mq_handle::~Persistent_mq_handle(): implemented concept.
169 */
171
172 // Methods.
173
174 /**
175 * Implements Persistent_mq_handle API: Replaces handle with the source handle while making the latter invalid as-if
176 * default-cted. Reminder, informally: this is a light-weight op.
177 *
178 * @param src
179 * See above.
180 * @return `*this`.
181 */
183
184 /// Copying of handles is prohibited, per Persistent_mq_handle concept.
186
187 /**
188 * Implements Persistent_mq_handle API: Removes the named persistent MQ. Reminder: name is removed immediately
189 * (if present -- otherwise error), but underlying MQ continues to exist until all system-wide handles to it
190 * are closed.
191 *
192 * @see Persistent_mq_handle::remove_persistent(): implemented concept.
193 *
194 * @see Reminder: see also `util::remove_each_persistent_*()`.
195 *
196 * @param logger_ptr
197 * See above.
198 * @param name
199 * See above.
200 * @param err_code
201 * See above.
202 */
203 static void remove_persistent(flow::log::Logger* logger_ptr, const Shared_name& name, Error_code* err_code = 0);
204
205 /**
206 * Implements Persistent_mq_handle API. Impl note for exposition: we use the fact that, e.g., in Linux
207 * the POSIX MQ devices are listed in flat fashion in /dev/mqueue.
208 *
209 * @see Persistent_mq_handle::for_each_persistent(): implemented concept.
210 *
211 * @tparam Handle_name_func
212 * See above.
213 * @param handle_name_func
214 * See above.
215 */
216 template<typename Handle_name_func>
217 static void for_each_persistent(const Handle_name_func& handle_name_func);
218
219 /**
220 * Implements Persistent_mq_handle API: Non-blocking send: pushes copy of message to queue and returns `true`;
221 * if queue is full then no-op and returns `false`.
222 *
223 * @see Persistent_mq_handle::try_send(): implemented concept.
224 *
225 * ### INFO+ logging ###
226 * WARNING on error. (You may use the `flow::log::Config::this_thread_verbosity_override_auto()` to
227 * temporarily, in that thread only, disable/reduce logging. This is quite easy and performant.)
228 *
229 * @param blob
230 * See above.
231 * @param err_code
232 * See above.
233 * @return See above.
234 */
235 bool try_send(const util::Blob_const& blob, Error_code* err_code = 0);
236
237 /**
238 * Implements Persistent_mq_handle API: Blocking send: pushes copy of message to queue; if queue is full blocks
239 * until it is not.
240 *
241 * @see Persistent_mq_handle::send(): implemented concept.
242 *
243 * ### INFO+ logging ###
244 * WARNING on error. (You may use the `flow::log::Config::this_thread_verbosity_override_auto()`
245 * to temporarily, in that thread only, disable/reduce logging. This is quite easy and performant.)
246 *
247 * @param blob
248 * See above.
249 * @param err_code
250 * See above.
251 */
252 void send(const util::Blob_const& blob, Error_code* err_code = 0);
253
254 /**
255 * Implements Persistent_mq_handle API: Blocking timed send: pushes copy of message to queue; if queue is full
256 * blocks until it is not, or the specified time passes, whichever happens first.
257 *
258 * @see Persistent_mq_handle::timed_send(): implemented concept.
259 *
260 * ### INFO+ logging ###
261 * WARNING on error or timed out.
262 * (You may use the `flow::log::Config::this_thread_verbosity_override_auto()` to
263 * temporarily, in that thread only, disable/reduce logging. This is quite easy and performant.)
264 *
265 * @param blob
266 * See above.
267 * @param timeout_from_now
268 * See above.
269 * @param err_code
270 * See above.
271 * @return See above.
272 */
273 bool timed_send(const util::Blob_const& blob, util::Fine_duration timeout_from_now, Error_code* err_code = 0);
274
275 /**
276 * Implements Persistent_mq_handle API: Like try_send() but without the actual pushing of a message.
277 *
278 * @see Persistent_mq_handle::is_sendable(): implemented concept.
279 *
280 * ### INFO+ logging ###
281 * WARNING on error.
282 *
283 * @param err_code
284 * See above.
285 * @return See above.
286 */
287 bool is_sendable(Error_code* err_code = 0);
288
289 /**
290 * Implements Persistent_mq_handle API: Like send() but without the actual pushing of a message.
291 *
292 * @see Persistent_mq_handle::wait_sendable(): implemented concept.
293 *
294 * ### INFO+ logging ###
295 * WARNING on error.
296 *
297 * @param err_code
298 * See above.
299 */
300 void wait_sendable(Error_code* err_code = 0);
301
302 /**
303 * Implements Persistent_mq_handle API: Like timed_send() but without the actual pushing of a message.
304 *
305 * @see Persistent_mq_handle::timed_wait_sendable(): implemented concept.
306 *
307 * ### INFO+ logging ###
308 * WARNING on error.
309 *
310 * @param err_code
311 * See above.
312 * @param timeout_from_now
313 * See above.
314 * @return See above.
315 */
316 bool timed_wait_sendable(util::Fine_duration timeout_from_now, Error_code* err_code = 0);
317
318 /**
319 * Implements Persistent_mq_handle API: Non-blocking receive: pops copy of message from queue into buffer and
320 * returns `true`; if queue is empty then no-op and returns `false`.
321 *
322 * @see Persistent_mq_handle::try_receive(): implemented concept.
323 *
324 * ### INFO+ logging ###
325 * WARNING on error. (You may use the `flow::log::Config::this_thread_verbosity_override_auto()` to
326 * temporarily, in that thread only, disable/reduce logging. This is quite easy and performant.)
327 *
328 * @param blob
329 * See above.
330 * @param err_code
331 * See above.
332 * @return See above.
333 */
334 bool try_receive(util::Blob_mutable* blob, Error_code* err_code = 0);
335
336 /**
337 * Implements Persistent_mq_handle API: Blocking receive: pops copy of message from queue into buffer; if queue
338 * is empty blocks until it is not.
339 *
340 * @see Persistent_mq_handle::receive(): implemented concept.
341 *
342 * ### INFO+ logging ###
343 * WARNING on error. (You may use the `flow::log::Config::this_thread_verbosity_override_auto()` to
344 * temporarily, in that thread only, disable/reduce logging. This is quite easy and performant.)
345 *
346 * @param blob
347 * See above.
348 * @param err_code
349 * See above.
350 */
351 void receive(util::Blob_mutable* blob, Error_code* err_code = 0);
352
353 /**
354 * Implements Persistent_mq_handle API: Blocking timed receive: pops copy of message from queue into buffer;
355 * if queue is empty blocks until it is not, or the specified time passes, whichever happens first.
356 *
357 * @see Persistent_mq_handle::timed_receive(): implemented concept.
358 *
359 * ### INFO+ logging ###
360 * WARNING on error or timed out. (You may use the `flow::log::Config::this_thread_verbosity_override_auto()` to
361 * temporarily, in that thread only, disable/reduce logging. This is quite easy and performant.)
362 *
363 * @param blob
364 * See above.
365 * @param timeout_from_now
366 * See above.
367 * @param err_code
368 * See above.
369 * @return See above.
370 */
371 bool timed_receive(util::Blob_mutable* blob, util::Fine_duration timeout_from_now, Error_code* err_code = 0);
372
373 /**
374 * Implements Persistent_mq_handle API: Like try_receive() but without the actual popping of a message.
375 *
376 * @see Persistent_mq_handle::is_receivable(): implemented concept.
377 *
378 * ### INFO+ logging ###
379 * WARNING on error.
380 *
381 * @param err_code
382 * See above.
383 * @return See above.
384 */
385 bool is_receivable(Error_code* err_code = 0);
386
387 /**
388 * Implements Persistent_mq_handle API: Like receive() but without the actual popping of a message.
389 *
390 * @see Persistent_mq_handle::wait_receivable(): implemented concept.
391 *
392 * ### INFO+ logging ###
393 * WARNING on error.
394 *
395 * @param err_code
396 * See above.
397 */
398 void wait_receivable(Error_code* err_code = 0);
399
400 /**
401 * Implements Persistent_mq_handle API: Like timed_receive() but without the actual popping of a message.
402 *
403 * @see Persistent_mq_handle::timed_wait_receivable(): implemented concept.
404 *
405 * ### INFO+ logging ###
406 * WARNING on error.
407 *
408 * @param err_code
409 * See above.
410 * @param timeout_from_now
411 * See above.
412 * @return See above.
413 */
414 bool timed_wait_receivable(util::Fine_duration timeout_from_now, Error_code* err_code = 0);
415
416 /**
417 * Implements Persistent_mq_handle API:
418 * Turn on preemptive/concurrent interruption of blocking-sends and sendable-waits/polls.
419 *
420 * @see Persistent_mq_handle::interrupt_sends(): implemented concept.
421 *
422 * ### INFO+ logging ###
423 * WARNING on duplicate use, INFO otherwise.
424 *
425 * @return See above.
426 */
427 bool interrupt_sends();
428
429 /**
430 * Implements Persistent_mq_handle API:
431 * Turn off preemptive/concurrent interruption of blocking-sends and sendable-waits/polls.
432 *
433 * @see Persistent_mq_handle::allow_sends(): implemented concept.
434 *
435 * ### INFO+ logging ###
436 * WARNING on duplicate use, INFO otherwise.
437 *
438 * @return See above.
439 */
440 bool allow_sends();
441
442 /**
443 * Implements Persistent_mq_handle API:
444 * Turn on preemptive/concurrent interruption of blocking-receives and receivable-waits/polls.
445 *
446 * @see Persistent_mq_handle::interrupt_receives(): implemented concept.
447 *
448 * ### INFO+ logging ###
449 * WARNING on duplicate use, INFO otherwise.
450 *
451 * @return See above.
452 */
453 bool interrupt_receives();
454
455 /**
456 * Implements Persistent_mq_handle API:
457 * Turn off preemptive/concurrent interruption of blocking-receives and receivable-waits/polls.
458 *
459 * @see Persistent_mq_handle::allow_receives(): implemented concept.
460 *
461 * ### INFO+ logging ###
462 * WARNING on duplicate use, INFO otherwise.
463 *
464 * @return See above.
465 */
466 bool allow_receives();
467
468 /**
469 * Implements Persistent_mq_handle API: Returns name equal to `absolute_name` passed to ctor.
470 * @return See above.
471 * @see Persistent_mq_handle::absolute_name(): implemented concept.
472 */
473 const Shared_name& absolute_name() const;
474
475 /**
476 * Implements Persistent_mq_handle API: Returns the max message size of the underlying queue. Reminder:
477 * This is not required to match was was passed to `Create_only` or `Open_or_create` ctor.
478 *
479 * @return See above.
480 * @see Persistent_mq_handle::max_msg_size(): implemented concept.
481 */
482 size_t max_msg_size() const;
483
484 /**
485 * Implements Persistent_mq_handle API: Returns the max message count of the underlying queue. Reminder:
486 * This is not required to match was was passed to `Create_only` or `Open_or_create` ctor.
487 *
488 * @return See above.
489 * @see Persistent_mq_handle::max_n_msgs(): implemented concept.
490 */
491 size_t max_n_msgs() const;
492
493 /**
494 * Implements Persistent_mq_handle API: Returns the stored native MQ handle; null if not open.
495 *
496 * @return See above.
497 * @see Persistent_mq_handle::native_handle(): implemented concept.
498 */
500
501private:
502 // Types.
503
504 /// Short-hand for anonymous pipe write end.
506
507 /// Short-hand for anonymous pipe read end.
509
510 // Friends.
511
512 // Friend of Posix_mq_handle.
513 friend void swap(Posix_mq_handle& val1, Posix_mq_handle& val2);
514
515 // Constructors/destructor.
516
517 /**
518 * Helper ctor delegated by the 2 `public` ctors that take `Open_or_create` or `Create_only` mode.
519 *
520 * @tparam Mode_tag
521 * Either util::Open_or_create or util::Create_only.
522 * @param logger_ptr
523 * See `public` ctors.
524 * @param absolute_name
525 * See `public` ctors.
526 * @param mode_tag
527 * See `public` ctors.
528 * @param max_n_msg_on_create
529 * See `public` ctors.
530 * @param max_msg_sz_on_create
531 * See `public` ctors.
532 * @param perms_on_create
533 * See `public` ctors.
534 * @param err_code
535 * See `public` ctors.
536 */
537 template<typename Mode_tag>
538 explicit Posix_mq_handle(Mode_tag mode_tag, flow::log::Logger* logger_ptr, const Shared_name& absolute_name,
539 size_t max_n_msg_on_create, size_t max_msg_sz_on_create,
540 const util::Permissions& perms_on_create,
541 Error_code* err_code);
542
543 // Methods.
544
545 /**
546 * Ctor helper that sets up `m_interrupt*` pipe items. If it fails it returns truthy code
547 * and cleans up what it did. It ignores everything else like #m_mq.
548 *
549 * @return See above.
550 */
552
553 /**
554 * Ctor helper that sets up #m_epoll_hndl_snd and #m_epoll_hndl_rcv. If it fails it returns truthy code
555 * and puts everything back to as-if-ctor-failed state, including #m_mq being null.
556 *
557 * @return See above.
558 */
560
561 /**
562 * Sets #m_mq to blocking or non-blocking and returns `true` on success and clears `*err_code`; otherwise returns
563 * `false` and sets truthy `*err_code`.
564 *
565 * @param err_code
566 * To set. If null behavior is undefined (assertion may trip).
567 * @param nb
568 * Non-blocking if `true`, else blocking.
569 * @return `true` <=> success.
570 */
571 bool set_non_blocking(bool nb, Error_code* err_code);
572
573 /**
574 * Helper that handles the result of an `mq_*()` call by logging WARNING(s) on error; setting `*err_code` on error;
575 * clearing it on success. `errno` is presumably set by the native API on error going into this.
576 *
577 * @param result
578 * What the native API returned.
579 * @param err_code
580 * To set. If null behavior is undefined (assertion may trip).
581 * @param context
582 * See Bipc_mq_handle::op_with_possible_bipc_mq_exception().
583 * @return `true` if `*err_code` was set to success (falsy); else `false` (it was set to truthy).
584 */
585 bool handle_mq_api_result(int result, Error_code* err_code, util::String_view context) const;
586
587 /**
588 * Impl body for `interrupt_*()`.
589 *
590 * @tparam SND_ELSE_RCV
591 * True for `*_sends()`, else `*_receives()`.
592 * @return See callers.
593 */
594 template<bool SND_ELSE_RCV>
595 bool interrupt_impl();
596
597 /**
598 * Impl body for `allow_*()`.
599 *
600 * @tparam SND_ELSE_RCV
601 * True for `*_sends()`, else `*_receives()`.
602 * @return See callers.
603 */
604 template<bool SND_ELSE_RCV>
605 bool allow_impl();
606
607 /**
608 * Impl body for `*_sendable()` and `*_receivable()`.
609 *
610 * @param timeout_from_now_or_none
611 * `timeout_from_now`; or 0 for `is_*()`, or `Fine_duration::max()` for non-timed-blocking variant.
612 * @param snd_else_rcv
613 * True for `*_sendable()`, else `*_receivable()`.
614 * @param err_code
615 * See callers.
616 * @return See callers.
617 */
618 bool wait_impl(util::Fine_duration timeout_from_now_or_none, bool snd_else_rcv, Error_code* err_code);
619
620 // Data.
621
622 /**
623 * Underlying MQ handle. We are a thin wrapper around this really. `.null()` if creation
624 * fails in ctor, or if `*this` was moved-from. This is very light-weight; probably `int`.
625 */
627
628 /// See absolute_name().
630
631 /**
632 * `epoll_*()` handle (`.null()` if and only if #m_mq is null) that is level-triggered to be active
633 * (with only 1 event registered) if and only if #m_mq is currently capable of sending at least 1 message
634 * (we could push 1+ messages right now). Used by wait_impl() and its various public `*_sendable()` forms.
635 */
637
638 /**
639 * `epoll_*()` handle (`.null()` if and only if #m_mq is null) that is level-triggered to be active
640 * (with only 1 event registered) if and only if #m_mq is currently storing at least 1 message
641 * (we could pop 1+ messages right now). Used by wait_impl() and its various public `*_receivable()` forms.
642 */
644
645 /**
646 * Starting at `false`, this is made `true` via interrupt_sends(), and back by allow_sends(); acts as a guard
647 * against doing it when already in effect. Note that that is its only purpose; as wait_impl() never checks it;
648 * instead it relies on `epoll_wait()` detecting a readable #m_interrupt_detector_rcv.
649 */
651
652 /// Other-direction counterpart to #m_interrupting_snd.
654
655 /// Never used for `.run()` or `.async()` -- just so we can construct #Pipe_reader, #Pipe_writer.
656 flow::util::Task_engine m_nb_task_engine;
657
658 /**
659 * A byte is written to this end by interrupt_sends() to make it readable for the poll-wait in wait_impl()
660 * indicating that #m_interrupting_snd mode is on.
661 */
663
664 /**
665 * A byte is read from this end by allow_sends() to make it not-readable for the poll-wait in wait_impl()
666 * indicating that #m_interrupting_snd mode is off; and wait_impl() poll-waits on this along with
667 * #m_mq -- if it is readable, then the mode is on.
668 */
670
671 /// Other-direction counterpart to #m_interrupter_snd.
673
674 /// Other-direction counterpart to #m_interrupt_detector_snd.
676}; // class Posix_mq_handle
677
678// Free functions: in *_fwd.hpp.
679
680// Template implementations.
681
682template<typename Handle_name_func>
683void Posix_mq_handle::for_each_persistent(const Handle_name_func& handle_name_func) // Static.
684{
685#ifndef FLOW_OS_LINUX
686# error "This method relies on/has been tested only with Linux /dev/mqueue semantics."
687#endif
688 util::for_each_persistent_impl("/dev/mqueue", handle_name_func);
689}
690
691} // namespace ipc::transport
Implements the Persistent_mq_handle concept by wrapping the POSIX message queue API (see man mq_overv...
bool allow_sends()
Implements Persistent_mq_handle API: Turn off preemptive/concurrent interruption of blocking-sends an...
bool interrupt_receives()
Implements Persistent_mq_handle API: Turn on preemptive/concurrent interruption of blocking-receives ...
bool interrupt_sends()
Implements Persistent_mq_handle API: Turn on preemptive/concurrent interruption of blocking-sends and...
util::Pipe_reader Pipe_reader
Short-hand for anonymous pipe read end.
void wait_receivable(Error_code *err_code=0)
Implements Persistent_mq_handle API: Like receive() but without the actual popping of a message.
static const Shared_name S_RESOURCE_TYPE_ID
Implements concept API.
static void for_each_persistent(const Handle_name_func &handle_name_func)
Implements Persistent_mq_handle API.
bool handle_mq_api_result(int result, Error_code *err_code, util::String_view context) const
Helper that handles the result of an mq_*() call by logging WARNING(s) on error; setting *err_code on...
Pipe_reader m_interrupt_detector_snd
A byte is read from this end by allow_sends() to make it not-readable for the poll-wait in wait_impl(...
Native_handle m_epoll_hndl_snd
epoll_*() handle (.null() if and only if m_mq is null) that is level-triggered to be active (with onl...
bool is_receivable(Error_code *err_code=0)
Implements Persistent_mq_handle API: Like try_receive() but without the actual popping of a message.
bool allow_receives()
Implements Persistent_mq_handle API: Turn off preemptive/concurrent interruption of blocking-receives...
bool is_sendable(Error_code *err_code=0)
Implements Persistent_mq_handle API: Like try_send() but without the actual pushing of a message.
size_t max_msg_size() const
Implements Persistent_mq_handle API: Returns the max message size of the underlying queue.
Pipe_reader m_interrupt_detector_rcv
Other-direction counterpart to m_interrupt_detector_snd.
Error_code epoll_setup()
Ctor helper that sets up m_epoll_hndl_snd and m_epoll_hndl_rcv.
util::Pipe_writer Pipe_writer
Short-hand for anonymous pipe write end.
size_t max_n_msgs() const
Implements Persistent_mq_handle API: Returns the max message count of the underlying queue.
bool try_send(const util::Blob_const &blob, Error_code *err_code=0)
Implements Persistent_mq_handle API: Non-blocking send: pushes copy of message to queue and returns t...
Pipe_writer m_interrupter_rcv
Other-direction counterpart to m_interrupter_snd.
Native_handle native_handle() const
Implements Persistent_mq_handle API: Returns the stored native MQ handle; null if not open.
bool timed_wait_receivable(util::Fine_duration timeout_from_now, Error_code *err_code=0)
Implements Persistent_mq_handle API: Like timed_receive() but without the actual popping of a message...
bool wait_impl(util::Fine_duration timeout_from_now_or_none, bool snd_else_rcv, Error_code *err_code)
Impl body for *_sendable() and *_receivable().
friend void swap(Posix_mq_handle &val1, Posix_mq_handle &val2)
Implements Persistent_mq_handle related concept: Swaps two objects.
static void remove_persistent(flow::log::Logger *logger_ptr, const Shared_name &name, Error_code *err_code=0)
Implements Persistent_mq_handle API: Removes the named persistent MQ.
void receive(util::Blob_mutable *blob, Error_code *err_code=0)
Implements Persistent_mq_handle API: Blocking receive: pops copy of message from queue into buffer; i...
void send(const util::Blob_const &blob, Error_code *err_code=0)
Implements Persistent_mq_handle API: Blocking send: pushes copy of message to queue; if queue is full...
Posix_mq_handle()
Implements Persistent_mq_handle API: Construct null handle.
bool m_interrupting_snd
Starting at false, this is made true via interrupt_sends(), and back by allow_sends(); acts as a guar...
Posix_mq_handle(const Posix_mq_handle &)=delete
Copying of handles is prohibited, per Persistent_mq_handle concept.
const Shared_name & absolute_name() const
Implements Persistent_mq_handle API: Returns name equal to absolute_name passed to ctor.
Native_handle m_epoll_hndl_rcv
epoll_*() handle (.null() if and only if m_mq is null) that is level-triggered to be active (with onl...
Posix_mq_handle & operator=(const Posix_mq_handle &)=delete
Copying of handles is prohibited, per Persistent_mq_handle concept.
bool allow_impl()
Impl body for allow_*().
bool interrupt_impl()
Impl body for interrupt_*().
bool timed_wait_sendable(util::Fine_duration timeout_from_now, Error_code *err_code=0)
Implements Persistent_mq_handle API: Like timed_send() but without the actual pushing of a message.
bool timed_receive(util::Blob_mutable *blob, util::Fine_duration timeout_from_now, Error_code *err_code=0)
Implements Persistent_mq_handle API: Blocking timed receive: pops copy of message from queue into buf...
~Posix_mq_handle()
Implements Persistent_mq_handle API: Destroys this handle (or no-op if no handle was successfully con...
bool set_non_blocking(bool nb, Error_code *err_code)
Sets m_mq to blocking or non-blocking and returns true on success and clears *err_code; otherwise ret...
Shared_name m_absolute_name
See absolute_name().
bool timed_send(const util::Blob_const &blob, util::Fine_duration timeout_from_now, Error_code *err_code=0)
Implements Persistent_mq_handle API: Blocking timed send: pushes copy of message to queue; if queue i...
bool try_receive(util::Blob_mutable *blob, Error_code *err_code=0)
Implements Persistent_mq_handle API: Non-blocking receive: pops copy of message from queue into buffe...
Error_code pipe_setup()
Ctor helper that sets up m_interrupt* pipe items.
void wait_sendable(Error_code *err_code=0)
Implements Persistent_mq_handle API: Like send() but without the actual pushing of a message.
bool m_interrupting_rcv
Other-direction counterpart to m_interrupting_snd.
Pipe_writer m_interrupter_snd
A byte is written to this end by interrupt_sends() to make it readable for the poll-wait in wait_impl...
Native_handle m_mq
Underlying MQ handle.
flow::util::Task_engine m_nb_task_engine
Never used for .run() or .async() – just so we can construct Pipe_reader, Pipe_writer.
Posix_mq_handle & operator=(Posix_mq_handle &&src)
Implements Persistent_mq_handle API: Replaces handle with the source handle while making the latter i...
static constexpr bool S_HAS_NATIVE_HANDLE
Implements concept API. Contrast this value with Bipc_mq_handle::S_HAS_NATIVE_HANDLE.
String-wrapping abstraction representing a name uniquely distinguishing a kernel-persistent entity fr...
Flow-IPC module providing transmission of structured messages and/or low-level blobs (and more) betwe...
bipc::permissions Permissions
Short-hand for Unix (POSIX) permissions class.
Definition: util_fwd.hpp:155
bipc::open_only_t Open_only
Tag type indicating an ideally-atomic open-if-exists-else-fail operation.
Definition: util_fwd.hpp:149
void for_each_persistent_impl(const fs::path &persistent_obj_dev_dir_path, const Handle_name_func &handle_name_func)
Implementation of Persistent_object::for_each_persistent(); for example see shm::classic::Pool_arena:...
Definition: util.hpp:90
boost::asio::mutable_buffer Blob_mutable
Short-hand for an mutable blob somewhere in memory, stored as exactly a void* and a size_t.
Definition: util_fwd.hpp:134
boost::asio::writable_pipe Pipe_writer
Short-hand for anonymous pipe write end.
Definition: util_fwd.hpp:32
bipc::open_or_create_t Open_or_create
Tag type indicating an atomic open-if-exists-else-create operation.
Definition: util_fwd.hpp:146
bipc::create_only_t Create_only
Tag type indicating a create-unless-exists-else-fail operation.
Definition: util_fwd.hpp:152
boost::asio::readable_pipe Pipe_reader
Short-hand for anonymous pipe read end.
Definition: util_fwd.hpp:35
flow::Fine_duration Fine_duration
Short-hand for Flow's Fine_duration.
Definition: util_fwd.hpp:111
boost::asio::const_buffer Blob_const
Short-hand for an immutable blob somewhere in memory, stored as exactly a void const * and a size_t.
Definition: util_fwd.hpp:128
flow::util::String_view String_view
Short-hand for Flow's String_view.
Definition: util_fwd.hpp:109
flow::Error_code Error_code
Short-hand for flow::Error_code which is very common.
Definition: common.hpp:297
A monolayer-thin wrapper around a native handle, a/k/a descriptor a/k/a FD.