Flow-IPC 2.0.0
Flow-IPC project: Full implementation reference.
posix_mq_handle.cpp
Go to the documentation of this file.
1/* Flow-IPC: Core
2 * Copyright 2023 Akamai Technologies, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the
5 * "License"); you may not use this file except in
6 * compliance with the License. You may obtain a copy
7 * of the License at
8 *
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in
12 * writing, software distributed under the License is
13 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
14 * CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing
16 * permissions and limitations under the License. */
17
18/// @file
22#include <flow/error/error.hpp>
23#include <boost/chrono/floor.hpp>
24#include <boost/array.hpp>
25
26namespace ipc::transport
27{
28
29namespace
30{
31
32/**
33 * File-local helper: takes Shared_name; returns name suitable for `mq_open()` and buddies.
34 * @param name
35 * Shared_name.
36 * @return See above.
37 */
38std::string shared_name_to_mq_name(const Shared_name& name);
39
40}; // namespace (anon)
41
42// Initializers.
43
45
46// Implementations.
47
49 m_interrupting_snd(false), // (Not algorithmically needed as of this writing, but avoid some sanitizer complaints.)
50 m_interrupting_rcv(false), // (Ditto. E.g., swap() through clang's UBSAN => complains it's uninitialized.)
51 m_interrupter_snd(m_nb_task_engine),
52 m_interrupt_detector_snd(m_nb_task_engine),
53 m_interrupter_rcv(m_nb_task_engine),
54 m_interrupt_detector_rcv(m_nb_task_engine)
55{
56 // Done.
57}
58
59template<typename Mode_tag>
60Posix_mq_handle::Posix_mq_handle(Mode_tag, flow::log::Logger* logger_ptr, const Shared_name& absolute_name_arg,
61 size_t max_n_msg, size_t max_msg_sz,
62 const util::Permissions& perms, Error_code* err_code) :
63 flow::log::Log_context(logger_ptr, Log_component::S_TRANSPORT),
64 m_absolute_name(absolute_name_arg),
65 m_interrupting_snd(false),
66 m_interrupting_rcv(false),
67 m_interrupter_snd(m_nb_task_engine),
68 m_interrupt_detector_snd(m_nb_task_engine),
69 m_interrupter_rcv(m_nb_task_engine),
70 m_interrupt_detector_rcv(m_nb_task_engine)
71{
73 using flow::error::Runtime_error;
74 using boost::io::ios_all_saver;
75 using boost::system::system_category;
76 using ::mq_open;
77 using ::mq_close;
78 using ::mq_attr;
79 // using ::O_RDWR; // A macro apparently.
80 // using ::O_CREAT; // A macro apparently.
81 // using ::O_EXCL; // A macro apparently.
82
83 assert(max_n_msg >= 1);
84 assert(max_msg_sz >= 1);
85
86 static_assert(std::is_same_v<Mode_tag, util::Create_only> || std::is_same_v<Mode_tag, util::Open_or_create>,
87 "Can only delegate to this ctor with Mode_tag = Create_only or Open_or_create.");
88 constexpr bool CREATE_ONLY_ELSE_MAYBE = std::is_same_v<Mode_tag, util::Create_only>;
89 constexpr char const * MODE_STR = CREATE_ONLY_ELSE_MAYBE ? "create-only" : "open-or-create";
90
91 {
92 ios_all_saver saver(*(get_logger()->this_thread_ostream())); // Revert std::oct/etc. soon.
93
94 FLOW_LOG_TRACE
95 ("Posix_mq_handle [" << *this << "]: Constructing MQ handle to MQ at name [" << absolute_name() << "] in "
96 "[" << MODE_STR << "] mode; max msg size [" << max_msg_sz << "] x [" << max_n_msg << "] msgs; "
97 "perms = [" << std::setfill('0') << std::setw(4) << std::oct << perms.get_permissions() << "].");
98 }
99
100 // Get pipe stuff out of the way, as it does not need m_mq.
101 auto sys_err_code = pipe_setup();
102
103 if (!sys_err_code)
104 {
105 /* Naively: Use mq_open() to both create-or-open/exclusively-create and to set mode `perms`.
106 * In reality though there are two subtleties that make it tougher than that. Fortunately this has been
107 * adjudicated out there, and internal boost.interprocess code very nicely confirms that adjudication/sets
108 * a sufficiently-authoritative precedent.
109 *
110 * Subtlety 1 is that mq_open(), like all POSIX/Linux ::open()-y calls, doesn't listen to `perms` verbatim
111 * but rather makes it subject to the process umask. The proper way (citation omitted) to deal with it is
112 * to follow it up with ::[f]chmod() or equivalent, with the resource descriptor if available (or path if not).
113 * (We do have the descriptor -- m_mq -- and we have a util::set_resource_permissions() overload for this.)
114 *
115 * Subtlety 2 is that we should only do that last thing if, indeed, we are *creating* the resource (MQ);
116 * if CREATE_ONLY_ELSE_MAYBE==false, and it already exists, then we mustn't do any such thing; we've succeeded.
117 * Sadly there's no (atomic/direct) way to find out whether mq_open() (in ==false case) did in fact create
118 * the thing. Therefore, we have to do things in a kludgy way -- that is, nevertheless, reasonable in practice
119 * and indeed is validated by bipc's use of it internally (search headers for `case ipcdetail::DoOpenOrCreate:`
120 * followed by shm_open(), fchmod(), etc.). We explain it here for peace of mind.
121 *
122 * In the CREATE_ONLY_ELSE_MAYBE=true case, we can just do what we'd do anyway -- shm_open() in create-only
123 * mode; then set_resource_permissions(); and that's that. Otherwise:
124 *
125 * We can no longer use the built-in atomic create-or-open mode (O_CREAT sans O_EXCL). Instead, we logically
126 * split it into two parts:try to create-only (O_CREAT|O_EXCL); if it succeeds, done (can
127 * set_resource_permissions()); if it fails with file-exists error, then mq_open() again, this time in open-only
128 * mode (and no need for set_resource_permissions()). Great! The problem, of course, is that this no longer uses a
129 * guaranteed-atomic open-or-create OS-call semantic. Namely it means set_resource_permissions() could fail due to
130 * file-not-found, because during the non-atomic gap the MQ got removed by someone. Then we spin-lock (in a way)
131 * through it: just try the whole procedure again. Sooner or later -- assuming no other errors of course --
132 * either the first mq_open() will succeed, or it'll "fail" with file-exists, yet the 2nd mq_open() will succeed.
133 * Pegging processor is not a serious concern in this context. */
134
135 const auto mq_name = shared_name_to_mq_name(absolute_name());
136 const auto do_mq_open_func = [&](bool create_else_open) -> bool // Just a helper.
137 {
138 /* Note: O_NONBLOCK is something we are forced to set/unset depending on the transmission API being called.
139 * So just don't worry about it here. */
140 ::mqd_t raw;
141 if (create_else_open)
142 {
143 mq_attr attr;
144 attr.mq_maxmsg = max_n_msg;
145 attr.mq_msgsize = max_msg_sz;
146
147 raw = mq_open(mq_name.c_str(),
148 O_RDWR | O_CREAT | O_EXCL, // Create-only always: per above. Can't use create-or-open.
149 perms.get_permissions(),
150 &attr);
151 }
152 else
153 {
154 raw = mq_open(mq_name.c_str(), O_RDWR);
155 }
156 if (raw != -1)
157 {
158 m_mq = Native_handle(raw);
159 }
160
161 if (m_mq.null())
162 {
163 sys_err_code = Error_code(errno, system_category());
164 return false;
165 }
166 // else
167 return true;
168 }; // do_mq_open_func =
169
170 if (CREATE_ONLY_ELSE_MAYBE)
171 {
172 // Simple case.
173 if (do_mq_open_func(true))
174 {
175 set_resource_permissions(get_logger(), m_mq, perms, &sys_err_code);
176 }
177 // sys_err_code is either success or failure. Fall through.
178 }
179 else // if (!CREATE_ONLY_ELSE_MAYBE)
180 {
181 // Complicated case.
182
183 bool success = false; // While this remains false, sys_err_code indicates whether it's false b/c of fatal error.
184 do
185 {
186 if (do_mq_open_func(true))
187 {
188 // Created! Only situation where we must indeed set_resource_permissions().
189 set_resource_permissions(get_logger(), m_mq, perms, &sys_err_code);
190 success = !sys_err_code;
191 continue; // `break`, really. One of success or sys_err_code is true (latter <= s_r_p() failed).
192 }
193 // else if (!do_mq_open_func(true)) // I.e., it failed for some reason.
194 if (sys_err_code != boost::system::errc::file_exists)
195 {
196 // Real error. GTFO.
197 continue; // `break;`, really (sys_err_code==true).
198 }
199 // else if (file_exists): Create failed, because it already exists. Open the existing guy!
200
201 if (do_mq_open_func(false))
202 {
203 // Opened!
204 success = true;
205 continue; // `break;`, really (success==true).
206 }
207 // else if (!do_mq_open_func(false)) // I.e., it failed for some reason.
208 if (sys_err_code != boost::system::errc::no_such_file_or_directory)
209 {
210 // Real error. GTFO.
211 continue; // `break;`, really (sys_err_code==true).
212 }
213 // else if (no_such_file_or_directory): Open failed, because MQ *just* got unlinked. Try again!
214
215 // This is fun and rare enough to warrant an INFO message.
216 {
217 ios_all_saver saver(*(get_logger()->this_thread_ostream())); // Revert std::oct/etc. soon.
218 FLOW_LOG_INFO
219 ("Posix_mq_handle [" << *this << "]: Create-or-open algorithm encountered the rare concurrency: "
220 "MQ at name [" << absolute_name() << "] existed during the create-only mq_open() but disappeared "
221 "before we were able to complete open-only mq_open(). Retrying in spin-lock fashion. "
222 "Details: max msg size [" << max_msg_sz << "] x [" << max_n_msg << "] msgs; "
223 "perms = [" << std::setfill('0') << std::setw(4) << std::oct << perms.get_permissions() << "].");
224 }
225
226 sys_err_code.clear(); // success remains false, and that wasn't a fatal error, so ensure loop continues.
227 }
228 while ((!success) && (!sys_err_code));
229
230 // Now just encode success-or-not entirely in sys_err_code's truthiness.
231 if (success)
232 {
233 sys_err_code.clear();
234 }
235 // else { sys_err_code is the reason loop exited already. }
236
237 // Now fall through.
238 } // else if (!CREATE_ONLY_ELSE_MAYBE)
239
240 // We never use blocking transmission -- always using *wait_*able() so that interrupt_*() works. Non-blocking 4eva.
241 if ((!sys_err_code) && (!set_non_blocking(true, &sys_err_code)))
242 {
243 assert(sys_err_code);
244
245 // Clean up.
246
247 // Disregard any error. In Linux, by the way, only EBADF is possible apparently; should be fine.
248 mq_close(m_mq.m_native_handle);
250 }
251
252 if (!sys_err_code)
253 {
254 sys_err_code = epoll_setup(); // It cleans up everything if it fails.
255 }
256 } // if (!sys_err_code) (but might have become true inside)
257
258 if (sys_err_code)
259 {
260 {
261 ios_all_saver saver(*(get_logger()->this_thread_ostream())); // Revert std::oct/etc. soon.
262
263 FLOW_LOG_WARNING
264 ("Posix_mq_handle [" << *this << "]: mq_open() or set_resource_permissions() error (if the latter, details "
265 "above and repeated below; otherwise error details only follow) while "
266 "constructing MQ handle to MQ at name [" << absolute_name() << "] in "
267 "create-only mode; max msg size [" << max_msg_sz << "] x [" << max_n_msg << "] msgs; "
268 "perms = [" << std::setfill('0') << std::setw(4) << std::oct << perms.get_permissions() << "].");
269 }
270 FLOW_ERROR_SYS_ERROR_LOG_WARNING();
271
272 if (err_code)
273 {
274 *err_code = sys_err_code;
275 }
276 else
277 {
278 throw Runtime_error(sys_err_code, FLOW_UTIL_WHERE_AM_I_STR());
279 }
280 } // if (sys_err_code)
281 // else { Cool! }
282} // Posix_mq_handle::Posix_mq_handle(Mode_tag)
283
284Posix_mq_handle::Posix_mq_handle(flow::log::Logger* logger_ptr, const Shared_name& absolute_name_arg,
285 util::Create_only, size_t max_n_msg, size_t max_msg_sz,
286 const util::Permissions& perms, Error_code* err_code) :
287 Posix_mq_handle(util::CREATE_ONLY, logger_ptr, absolute_name_arg, max_n_msg, max_msg_sz, perms, err_code)
288{
289 // Cool.
290}
291
292Posix_mq_handle::Posix_mq_handle(flow::log::Logger* logger_ptr, const Shared_name& absolute_name_arg,
293 util::Open_or_create, size_t max_n_msg, size_t max_msg_sz,
294 const util::Permissions& perms, Error_code* err_code) :
295 Posix_mq_handle(util::OPEN_OR_CREATE, logger_ptr, absolute_name_arg, max_n_msg, max_msg_sz, perms, err_code)
296{
297 // Cool.
298}
299
300Posix_mq_handle::Posix_mq_handle(flow::log::Logger* logger_ptr, const Shared_name& absolute_name_arg,
301 util::Open_only, Error_code* err_code) :
302 flow::log::Log_context(logger_ptr, Log_component::S_TRANSPORT),
303 m_absolute_name(absolute_name_arg),
304 m_interrupting_snd(false),
305 m_interrupting_rcv(false),
306 m_interrupter_snd(m_nb_task_engine),
307 m_interrupt_detector_snd(m_nb_task_engine),
308 m_interrupter_rcv(m_nb_task_engine),
309 m_interrupt_detector_rcv(m_nb_task_engine)
310{
311 using flow::log::Sev;
312 using flow::error::Runtime_error;
313 using boost::system::system_category;
314 using ::mq_open;
315 // using ::O_RDWR; // A macro apparently.
316
317 FLOW_LOG_TRACE
318 ("Posix_mq_handle [" << *this << "]: Constructing MQ handle to MQ at name [" << absolute_name() << "] in "
319 "open-only mode.");
320
321 // Get pipe stuff out of the way, as it does not need m_mq.
322 auto sys_err_code = pipe_setup();
323
324 if (!sys_err_code)
325 {
326 /* Note: O_NONBLOCK is something we are forced to set/unset depending on the transmission API being called.
327 * So just don't worry about it here. */
328 const auto raw = mq_open(shared_name_to_mq_name(absolute_name()).c_str(), O_RDWR);
329 if (raw != -1)
330 {
331 m_mq = Native_handle(raw);
332 }
333
334 if (m_mq.null())
335 {
336 FLOW_LOG_WARNING
337 ("Posix_mq_handle [" << *this << "]: mq_open() error (error details follow) while "
338 "constructing MQ handle to MQ at name [" << absolute_name() << "] in open-only mode.");
339 sys_err_code = Error_code(errno, system_category());
340 }
341 else
342 {
343 // We never use blocking transmission -- always using *wait_*able() => interrupt_*() works. Non-blocking 4eva.
344 if (!set_non_blocking(true, &sys_err_code))
345 {
346 assert(sys_err_code);
347
348 // Clean up.
349
350 // Disregard any error. In Linux, by the way, only EBADF is possible apparently; it's fine.
351 mq_close(m_mq.m_native_handle);
353 }
354 else
355 {
356 sys_err_code = epoll_setup(); // It logged on error; also then it cleaned up everything.
357 }
358 } // if (!m_mq.null()) (but it may have become null, and sys_err_code therefore truthy, inside)
359 } // if (!sys_err_code) (but it may have become truthy inside)
360
361 if (sys_err_code)
362 {
363 FLOW_ERROR_SYS_ERROR_LOG_WARNING();
364
365 if (err_code)
366 {
367 *err_code = sys_err_code;
368 }
369 else
370 {
371 throw Runtime_error(sys_err_code, FLOW_UTIL_WHERE_AM_I_STR());
372 }
373 }
374 // else { Cool! }
375} // Posix_mq_handle::Posix_mq_handle(Open_only)
376
378{
379 using boost::asio::connect_pipe;
380
381 Error_code sys_err_code;
382 connect_pipe(m_interrupt_detector_snd, m_interrupter_snd, sys_err_code);
383 if (!sys_err_code)
384 {
385 connect_pipe(m_interrupt_detector_rcv, m_interrupter_rcv, sys_err_code);
386 }
387
388 if (sys_err_code)
389 {
390 FLOW_LOG_WARNING
391 ("Posix_mq_handle [" << *this << "]: Constructing MQ handle to MQ at name [" << absolute_name() << "]: "
392 "connect-pipe failed. Details follow.");
393 FLOW_ERROR_SYS_ERROR_LOG_WARNING();
394
395 // Clean up first guys (might be no-op).
396 Error_code sink;
397 m_interrupter_snd.close(sink);
398 m_interrupt_detector_snd.close(sink);
399 }
400
401 return sys_err_code;
402} // Posix_mq_handle::pipe_setup()
403
405{
406 using boost::system::system_category;
407 using ::epoll_create1;
408 using ::epoll_ctl;
409 using ::mq_close;
410 using ::close;
411 using Epoll_event = ::epoll_event;
412 // using ::EPOLL_CTL_ADD; // A macro apparently.
413 // using ::EPOLLIN; // A macro apparently.
414
415 /* m_mq is good to go.
416 *
417 * Set up one epoll handle for checking for writability; another for readability.
418 * Into each one add an interrupter FD (readability of read end of an anonymous pipe).
419 *
420 * There's some lame cleanup logic below specific to the 2-handle situation. It's okay. Just, if setting
421 * up the outgoing-direction epolliness fails, it cleans up itself. If that succeeds, but incoming-direction
422 * setup then fails, then after it cleans up after itself, we have to undo the outgoing-direction setup. */
423
424 Error_code sys_err_code;
425
426 const auto setup = [&](Native_handle* epoll_hndl_ptr, bool snd_else_rcv)
427 {
428 auto& epoll_hndl = *epoll_hndl_ptr;
429 auto& interrupt_detector = snd_else_rcv ? m_interrupt_detector_snd : m_interrupt_detector_rcv;
430
431 epoll_hndl = Native_handle(epoll_create1(0));
432 if (epoll_hndl.m_native_handle == -1)
433 {
434 FLOW_LOG_WARNING("Posix_mq_handle [" << *this << "]: Created MQ handle fine, but epoll_create1() failed; "
435 "details follow.");
436 sys_err_code = Error_code(errno, system_category());
437
438 // Clean up.
439
440 epoll_hndl = Native_handle(); // No-op as of this writing, but just to keep it maintainable do it anyway.
441
442 Error_code sink;
443 m_interrupt_detector_snd.close(sink);
444 m_interrupter_snd.close(sink);
445 m_interrupt_detector_rcv.close(sink);
446 m_interrupter_rcv.close(sink);
447
448 // Disregard any error. In Linux, by the way, only EBADF is possible apparently; should be fine.
449 mq_close(m_mq.m_native_handle);
451 return;
452 }
453 // else if (epoll_hndl.m_native_handle != -1)
454
455 // Each epoll_wait() will listen for transmissibility of the queue itself + readability of interrupt-pipe.
456 Epoll_event event_of_interest1;
457 event_of_interest1.events = snd_else_rcv ? EPOLLOUT : EPOLLIN;
458 event_of_interest1.data.fd = m_mq.m_native_handle;
459 Epoll_event event_of_interest2;
460 event_of_interest2.events = EPOLLIN;
461 event_of_interest2.data.fd = interrupt_detector.native_handle();
462 if ((epoll_ctl(epoll_hndl.m_native_handle, EPOLL_CTL_ADD, event_of_interest1.data.fd, &event_of_interest1) == -1) ||
463 (epoll_ctl(epoll_hndl.m_native_handle, EPOLL_CTL_ADD, event_of_interest2.data.fd, &event_of_interest2) == -1))
464 {
465 FLOW_LOG_WARNING("Posix_mq_handle [" << *this << "]: Created MQ handle fine, but an epoll_ctl() failed; "
466 "snd_else_rcv = [" << snd_else_rcv << "]; details follow.");
467 sys_err_code = Error_code(errno, system_category());
468
469 // Clean up everything.
470 close(epoll_hndl.m_native_handle);
471 epoll_hndl = Native_handle();
472 // Disregard any error. In Linux, by the way, only EBADF is possible apparently; should be fine.
473 mq_close(m_mq.m_native_handle);
475 return;
476 }
477 }; // const auto setup =
478
479 setup(&m_epoll_hndl_snd, true);
480 if (!sys_err_code)
481 {
482 setup(&m_epoll_hndl_rcv, false);
483 if (sys_err_code)
484 {
485 // Have to undo first setup(), except m_mq+pipes cleanup was already done by 2nd setup().
488 }
489 }
490 // else { 1st setup() cleaned everything up. }
491
492 return sys_err_code;
493} // Posix_mq_handle::epoll_setup()
494
497{
498 operator=(std::move(src));
499}
500
502{
503 using ::mq_close;
504 using ::close;
505
506 if (m_mq.null())
507 {
508 FLOW_LOG_TRACE("Posix_mq_handle [" << *this << "]: No MQ handle to close in destructor.");
509 assert(m_epoll_hndl_snd.null());
510 assert(m_epoll_hndl_rcv.null());
511 }
512 else
513 {
514 FLOW_LOG_TRACE("Posix_mq_handle [" << *this << "]: Closing MQ handle (and epoll set).");
515 // Disregard any error. In Linux, by the way, only EBADF is possible apparently; we should be fine.
516 mq_close(m_mq.m_native_handle);
517
518 assert(!m_epoll_hndl_snd.null());
519 assert(!m_epoll_hndl_rcv.null());
520 close(m_epoll_hndl_snd.m_native_handle); // Ditto regarding errors.
522 }
523} // Posix_mq_handle::~Posix_mq_handle()
524
526{
527 using std::swap;
528
529 if (&src != this)
530 {
539
540 swap(*this, src);
541 }
542 return *this;
543}
544
546{
547 using util::Pipe_writer;
548 using util::Pipe_reader;
549 using flow::log::Log_context;
550 using std::swap;
551 using boost::array;
552
553 // This is a bit faster than un-specialized std::swap() which would require a move ction + 3 move assignments.
554
555 swap(static_cast<Log_context&>(val1), static_cast<Log_context&>(val2));
556 swap(val1.m_mq, val2.m_mq);
562
563 /* This is annoying! Maybe we should just wrap all these in unique_ptr<>s to avoid this nonsense.
564 * As it is, it might be unsafe to swap asio objects apart from their Task_engines, so doing this blech. */
565 try
566 {
567 array<Native_handle, 4> fds1;
568 array<Native_handle, 4> fds2;
569 const auto unload = [&](Posix_mq_handle& val, auto& fds)
570 {
571 if (val.m_interrupter_snd.is_open())
572 {
573 fds[0].m_native_handle = val.m_interrupter_snd.release();
574 }
575 if (val.m_interrupter_rcv.is_open())
576 {
577 fds[1].m_native_handle = val.m_interrupter_rcv.release();
578 }
579 if (val.m_interrupt_detector_snd.is_open())
580 {
581 fds[2].m_native_handle = val.m_interrupt_detector_snd.release();
582 }
583 if (val.m_interrupt_detector_rcv.is_open())
584 {
585 fds[3].m_native_handle = val.m_interrupt_detector_rcv.release();
586 }
587 };
588 unload(val1, fds1);
589 unload(val2, fds2);
590
591 const auto reload = [&](Posix_mq_handle& val, auto& fds)
592 {
593 // Leave their stupid task engines in-place. Do need to reassociate them with the swapped FDs though.
595 = fds[0].null() ? Pipe_writer(val.m_nb_task_engine)
596 : Pipe_writer(val.m_nb_task_engine, fds[0].m_native_handle);
598 = fds[1].null() ? Pipe_writer(val.m_nb_task_engine)
599 : Pipe_writer(val.m_nb_task_engine, fds[1].m_native_handle);
601 = fds[2].null() ? Pipe_reader(val.m_nb_task_engine)
602 : Pipe_reader(val.m_nb_task_engine, fds[2].m_native_handle);
604 = fds[3].null() ? Pipe_reader(val.m_nb_task_engine)
605 : Pipe_reader(val.m_nb_task_engine, fds[3].m_native_handle);
606 };
607 reload(val1, fds2); // Swap 'em.
608 reload(val2, fds1);
609 }
610 catch (...) // .release() and ctor can throw, and they really shouldn't at all.
611 {
612 assert(false && "Horrible exception."); std::abort();
613 }
614} // swap()
615
617{
618 using ::mq_attr;
619 using ::mq_getattr;
620
621 assert((!m_mq.null())
622 && "As advertised: max_msg_size() => undefined behavior if not successfully cted or was moved-from.");
623
624 mq_attr attr;
625 if (mq_getattr(m_mq.m_native_handle, &attr) != 0)
626 {
627 /* We could handle error gracefully, as we do elsewhere, but EBADF is the only possibility, and there's zero
628 * reason it should happen unles m_mq is null at this point. It'd be fine to handle it, but then we'd have
629 * to emit an Error_code, and the API would change, and the user would have more to worry about; I (ygoldfel)
630 * decided it's overkill. So if it does happen, log and assert(). @todo Maybe reconsider. */
631 Error_code sink;
632 handle_mq_api_result(-1, &sink, "Posix_mq_handle::max_msg_size: mq_getattr()");
633 assert(false && "mq_getattr() failed (details logged); this is too bizarre.");
634 return 0;
635 }
636 // else
637
638 return size_t(attr.mq_msgsize);
639} // Posix_mq_handle::max_msg_size()
640
642{
643 // Very similar to max_msg_size(). @todo Maybe code reuse. Though line count might be even greater then so....
644
645 using ::mq_attr;
646 using ::mq_getattr;
647 assert((!m_mq.null())
648 && "As advertised: max_n_msgs() => undefined behavior if not successfully cted or was moved-from.");
649 mq_attr attr;
650 if (mq_getattr(m_mq.m_native_handle, &attr) != 0)
651 {
652 Error_code sink;
653 handle_mq_api_result(-1, &sink, "Posix_mq_handle::max_msg_size: mq_getattr()");
654 assert(false && "mq_getattr() failed (details logged); this is too bizarre.");
655 return 0;
656 }
657 return size_t(attr.mq_maxmsg);
658} // Posix_mq_handle::max_msg_size()
659
661{
662 using boost::system::system_category;
663 using ::mq_attr;
664 using ::mq_setattr;
665 // using ::O_NONBLOCK; // A macro apparently.
666
667 assert(err_code);
668
669 /* By the way -- no, this cannot be done in mq_attr that goes into mq_open(). It is ignored there according to docs.
670 * Would've been nice actually.... */
671
672 mq_attr attr;
673 attr.mq_flags = nb ? O_NONBLOCK : 0;
674 return handle_mq_api_result(mq_setattr(m_mq.m_native_handle, &attr, 0),
675 err_code, "Posix_mq_handle::set_non_blocking(): mq_setattr()");
676} // Posix_mq_handle::set_non_blocking()
677
679{
680 using flow::util::buffers_dump_string;
681 using ::mq_send;
682 // using ::EAGAIN; // A macro apparently.
683
684 FLOW_ERROR_EXEC_AND_THROW_ON_ERROR(bool, try_send, blob, _1);
685 // ^-- Call ourselves and return if err_code is null. If got to present line, err_code is not null.
686
687 assert((!m_mq.null())
688 && "As advertised: try_send() => undefined behavior if not successfully cted or was moved-from.");
689
690 auto blob_data = blob.data();
691 FLOW_LOG_TRACE("Posix_mq_handle [" << *this << "]: Nb-push of blob @[" << blob_data << "], "
692 "size [" << blob.size() << "].");
693 if (blob.size() == 0)
694 {
695 /* ::mq_send(..., nullptr, N), even when N == 0, is (1) empirically speaking harmless but (2) technically
696 * in violation of arg 2's non-null decl (even though `N == 0` is explicitly allowed per `man` page) hence
697 * (3) causes a clang UBSAN sanitizer error. So waste a couple cycles by feeding it this dummy
698 * non-null value. */
699 blob_data = static_cast<const void*>(&blob_data);
700 }
701 else // if (blob.size() != 0)
702 {
703 FLOW_LOG_DATA("Blob contents: [\n" << buffers_dump_string(blob, " ") << "].");
704 }
705
706 if (mq_send(m_mq.m_native_handle,
707 static_cast<const char*>(blob_data),
708 blob.size(), 0) == 0)
709 {
710 err_code->clear();
711 return true; // Instant success.
712 }
713 // else
714 if (errno == EAGAIN)
715 {
716 FLOW_LOG_TRACE("Posix_mq_handle [" << *this << "]: Nb-push of blob @[" << blob_data << "], "
717 "size [" << blob.size() << "]: would-block.");
718 err_code->clear();
719 return false; // Queue full.
720 }
721 // else
722
723 handle_mq_api_result(-1, err_code, "Posix_mq_handle::try_send(): mq_send(nb)");
724 assert(*err_code);
725 return false;
726} // Posix_mq_handle::try_send()
727
729{
730 using flow::util::buffers_dump_string;
731 using ::mq_send;
732 // using ::EAGAIN; // A macro apparently.
733
734 if (flow::error::exec_void_and_throw_on_error
735 ([&](Error_code* actual_err_code) { send(blob, actual_err_code); },
736 err_code, "Posix_mq_handle::send()"))
737 {
738 return;
739 }
740 // else
741
742 assert((!m_mq.null())
743 && "As advertised: send() => undefined behavior if not successfully cted or was moved-from.");
744
745 auto blob_data = blob.data();
746 FLOW_LOG_TRACE("Posix_mq_handle [" << *this << "]: Blocking-push of blob @[" << blob_data << "], "
747 "size [" << blob.size() << "]. Trying nb-push first; if it succeeds -- great. "
748 "Else will wait/retry/wait/retry/....");
749 if (blob.size() == 0)
750 {
751 // See similarly-placed comment in try_send() which explains this.
752 blob_data = static_cast<const void*>(&blob_data);
753 }
754 else // if (blob.size() != 0)
755 {
756 FLOW_LOG_DATA("Blob contents: [\n" << buffers_dump_string(blob, " ") << "].");
757 }
758
759 /* We could just invoke blocking mq_send(m_mq), but to get the promised logging we go a tiny bit fancier as follows.
760 * Update: Now that we have to be interrupt_*()ible, also reuse wait_*() instead of using native
761 * blocking mq_*(). */
762
763 while (true)
764 {
765 if (mq_send(m_mq.m_native_handle,
766 static_cast<const char*>(blob_data),
767 blob.size(), 0) == 0)
768 {
769 err_code->clear();
770 return; // Instant success.
771 }
772 // else
773 if (errno != EAGAIN)
774 {
775 handle_mq_api_result(-1, err_code, "Posix_mq_handle::send(): mq_send(nb)");
776 assert(*err_code);
777 return;
778 }
779 // else if (would-block): as promised, INFO logs.
780
781 FLOW_LOG_TRACE("Posix_mq_handle [" << *this << "]: Nb-push of blob @[" << blob_data << "], "
782 "size [" << blob.size() << "]: would-block. Executing blocking-wait.");
783
784 wait_sendable(err_code);
785 if (*err_code)
786 {
787 // Whether interrupted or true error, we're done.
788 return;
789 }
790 // else
791
792 FLOW_LOG_TRACE("Blocking-wait reported transmissibility. Retrying.");
793 } // while (true)
794} // Posix_mq_handle::send()
795
797 Error_code* err_code)
798{
799 using flow::util::time_since_posix_epoch;
800 using flow::util::buffers_dump_string;
801 using flow::Fine_clock;
802 using boost::chrono::floor;
803 using boost::chrono::round;
804 using boost::chrono::seconds;
805 using boost::chrono::microseconds;
806 using boost::chrono::nanoseconds;
807 using ::mq_send;
808 // using ::EAGAIN; // A macro apparently.
809
810 FLOW_ERROR_EXEC_AND_THROW_ON_ERROR(bool, timed_send, blob, timeout_from_now, _1);
811 // ^-- Call ourselves and return if err_code is null. If got to present line, err_code is not null.
812
813 // Similar to a combo of (blocking) send() and (non-blocking) try_send() -- keeping comments light where redundant.
814
815 assert((!m_mq.null())
816 && "As advertised: timed_send() => undefined behavior if not successfully cted or was moved-from.");
817
818 auto blob_data = blob.data();
819 FLOW_LOG_TRACE("Posix_mq_handle [" << *this << "]: Blocking-timed-push of blob @[" << blob_data << "], "
820 "size [" << blob.size() << "]; timeout ~[" << round<microseconds>(timeout_from_now) << "]. "
821 "Trying nb-push first; if it succeeds -- great. Else will wait/retry/wait/retry/....");
822 if (blob.size() == 0)
823 {
824 // See similarly-placed comment in try_send() which explains this.
825 blob_data = static_cast<const void*>(&blob_data);
826 }
827 else // if (blob.size() != 0)
828 {
829 FLOW_LOG_DATA("Blob contents: [\n" << buffers_dump_string(blob, " ") << "].");
830 }
831
832 auto now = Fine_clock::now();
833 auto after = now;
834
835 while (true)
836 {
837 if (mq_send(m_mq.m_native_handle,
838 static_cast<const char*>(blob_data),
839 blob.size(), 0) == 0)
840 {
841 err_code->clear();
842 break; // Instant success.
843 }
844 // else
845 if (errno != EAGAIN)
846 {
847 handle_mq_api_result(-1, err_code, "Posix_mq_handle::send(): mq_send(nb)");
848 assert(*err_code);
849 return false;
850 }
851 // else if (would-block): as promised, INFO logs.
852
853 FLOW_LOG_TRACE("Posix_mq_handle [" << *this << "]: Nb-push of blob @[" << blob_data << "], "
854 "size [" << blob.size() << "]: would-block. Executing blocking-wait.");
855
856 timeout_from_now -= (after - now); // No-op the first time; after that reduces time left.
857 const bool ready = timed_wait_sendable(timeout_from_now, err_code);
858 if (*err_code)
859 {
860 // Whether interrupted or true error, we're done.
861 return false;
862 }
863 // else:
864
865 if (!ready) // I.e., if (timed out).
866 {
867 FLOW_LOG_TRACE("Did not finish before timeout.");
868 return false;
869 }
870 // else: successful wait for transmissibility. Try nb-transmitting again.
871 FLOW_LOG_TRACE("Blocking-wait reported transmissibility. Retrying.");
872
873 after = Fine_clock::now();
874 assert((after >= now) && "Fine_clock is supposed to never go backwards.");
875 } // while (true)
876
877 return true;
878} // Posix_mq_handle::timed_send()
879
881{
882 using util::Blob_mutable;
883 using flow::util::buffers_dump_string;
884 using ::mq_receive;
885 // using ::EAGAIN; // A macro apparently.
886
887 FLOW_ERROR_EXEC_AND_THROW_ON_ERROR(bool, try_receive, blob, _1);
888 // ^-- Call ourselves and return if err_code is null. If got to present line, err_code is not null.
889
890 assert((!m_mq.null())
891 && "As advertised: try_receive() => undefined behavior if not successfully cted or was moved-from.");
892
893 FLOW_LOG_TRACE("Posix_mq_handle [" << *this << "]: Nb-pop to blob @[" << blob->data() << "], "
894 "max-size [" << blob->size() << "].");
895
896 ssize_t n_rcvd;
897 unsigned int pri_ignored;
898 if ((n_rcvd = mq_receive(m_mq.m_native_handle,
899 static_cast<char*>(blob->data()),
900 blob->size(), &pri_ignored)) >= 0)
901 {
902 *blob = Blob_mutable(blob->data(), n_rcvd);
903 FLOW_LOG_TRACE("Received message sized [" << n_rcvd << "].");
904 if (blob->size() != 0)
905 {
906 FLOW_LOG_DATA("Blob contents: [\n" << buffers_dump_string(*blob, " ") << "].");
907 }
908 err_code->clear();
909 return true; // Instant success.
910 }
911 // else
912 if (errno == EAGAIN)
913 {
914 FLOW_LOG_TRACE("Posix_mq_handle [" << *this << "]: Nb-pop to blob @[" << blob->data() << "], "
915 "max-size [" << blob->size() << "]: would-block.");
916 err_code->clear();
917 return false; // Queue full.
918 }
919 // else
920
921 handle_mq_api_result(-1, err_code, "Posix_mq_handle::try_receive(): mq_receive(nb)");
922 assert(*err_code);
923 return false;
924} // Posix_mq_handle::try_receive()
925
927{
928 using util::Blob_mutable;
929 using flow::util::buffers_dump_string;
930 using ::mq_receive;
931 // using ::EAGAIN; // A macro apparently.
932
933 if (flow::error::exec_void_and_throw_on_error
934 ([&](Error_code* actual_err_code) { receive(blob, actual_err_code); },
935 err_code, "Posix_mq_handle::receive()"))
936 {
937 return;
938 }
939 // else
940
941 assert((!m_mq.null())
942 && "As advertised: receive() => undefined behavior if not successfully cted or was moved-from.");
943
944 FLOW_LOG_TRACE("Posix_mq_handle [" << *this << "]: Blocking-pop to blob @[" << blob->data() << "], "
945 "max-size [" << blob->size() << "]. Trying nb-pop first; if it succeeds -- great. "
946 "Else will wait/retry/wait/retry/....");
947
948 /* We could just invoke blocking mq_receive(m_mq), but to get the promised logging we go tiny bit fancier as follows.
949 * Update: Now that we have to be interrupt_*()ible, also reuse wait_*() instead of using native
950 * blocking mq_*(). */
951
952 ssize_t n_rcvd;
953 unsigned int pri_ignored;
954
955 while (true)
956 {
957 if ((n_rcvd = mq_receive(m_mq.m_native_handle,
958 static_cast<char*>(blob->data()),
959 blob->size(), &pri_ignored)) >= 0)
960 {
961 *blob = Blob_mutable(blob->data(), n_rcvd);
962 FLOW_LOG_TRACE("Received message sized [" << n_rcvd << "].");
963 if (blob->size() != 0)
964 {
965 FLOW_LOG_DATA("Blob contents: [\n" << buffers_dump_string(*blob, " ") << "].");
966 }
967 err_code->clear();
968 return; // Instant success.
969 }
970 // else
971 if (errno != EAGAIN)
972 {
973 handle_mq_api_result(-1, err_code, "Posix_mq_handle::receive(): mq_receive(nb)");
974 assert(*err_code);
975 return;
976 }
977 // else if (would-block): as promised, INFO logs.
978
979 FLOW_LOG_TRACE("Posix_mq_handle [" << *this << "]: Nb-pop to blob @[" << blob->data() << "], "
980 "max-size [" << blob->size() << "]: would-block. Executing blocking-wait.");
981
982 wait_receivable(err_code);
983 if (*err_code)
984 {
985 // Whether interrupted or true error, we're done.
986 return;
987 }
988 // else
989
990 FLOW_LOG_TRACE("Blocking-wait reported transmissibility. Retrying.");
991 } // while (true)
992} // Posix_mq_handle::receive()
993
995 Error_code* err_code)
996{
997 using util::Blob_mutable;
998 using flow::Fine_clock;
999 using flow::util::time_since_posix_epoch;
1000 using flow::util::buffers_dump_string;
1001 using boost::chrono::floor;
1002 using boost::chrono::round;
1003 using boost::chrono::seconds;
1004 using boost::chrono::microseconds;
1005 using boost::chrono::nanoseconds;
1006 using ::mq_receive;
1007 // using ::EAGAIN; // A macro apparently.
1008
1009 FLOW_ERROR_EXEC_AND_THROW_ON_ERROR(bool, timed_receive, blob, timeout_from_now, _1);
1010 // ^-- Call ourselves and return if err_code is null. If got to present line, err_code is not null.
1011
1012 // Similar to a combo of (blocking) receive() and (non-blocking) try_receive() -- keeping comments light if redundant.
1013
1014 assert((!m_mq.null())
1015 && "As advertised: timed_receive() => undefined behavior if not successfully cted or was moved-from.");
1016
1017 FLOW_LOG_TRACE("Posix_mq_handle [" << *this << "]: Blocking-timed-pop to blob @[" << blob->data() << "], "
1018 "max-size [" << blob->size() << "]; timeout ~[" << round<microseconds>(timeout_from_now) << "]. "
1019 "Trying nb-pop first; if it succeeds -- great. Else will wait/retry/wait/retry/....");
1020
1021 ssize_t n_rcvd;
1022 unsigned int pri_ignored;
1023
1024 auto now = Fine_clock::now();
1025 auto after = now;
1026
1027 while (true)
1028 {
1029 if ((n_rcvd = mq_receive(m_mq.m_native_handle,
1030 static_cast<char*>(blob->data()),
1031 blob->size(), &pri_ignored)) >= 0)
1032 {
1033 *blob = Blob_mutable(blob->data(), n_rcvd);
1034 FLOW_LOG_TRACE("Received message sized [" << n_rcvd << "].");
1035 if (blob->size() != 0)
1036 {
1037 FLOW_LOG_DATA("Blob contents: [\n" << buffers_dump_string(*blob, " ") << "].");
1038 }
1039 err_code->clear();
1040 break; // Instant success.
1041 }
1042 // else
1043 if (errno != EAGAIN)
1044 {
1045 handle_mq_api_result(-1, err_code, "Posix_mq_handle::timed_receive(): mq_receive(nb)");
1046 assert(*err_code);
1047 return false;
1048 }
1049 // else if (would-block): as promised, INFO logs.
1050
1051 FLOW_LOG_TRACE("Posix_mq_handle [" << *this << "]: Nb-pop to blob @[" << blob->data() << "], "
1052 "max-size [" << blob->size() << "]: would-block. Executing blocking-wait.");
1053
1054 timeout_from_now -= (after - now); // No-op the first time; after that reduces time left.
1055 const bool ready = timed_wait_receivable(timeout_from_now, err_code);
1056 if (*err_code)
1057 {
1058 // Whether interrupted or true error, we're done.
1059 return false;
1060 }
1061 // else:
1062
1063 if (!ready) // I.e., if (timed out).
1064 {
1065 FLOW_LOG_TRACE("Did not finish before timeout.");
1066 return false;
1067 }
1068 // else: successful wait for transmissibility. Try nb-transmitting again.
1069 FLOW_LOG_TRACE("Blocking-wait reported transmissibility. Retrying.");
1070
1071 after = Fine_clock::now();
1072 assert((after >= now) && "Fine_clock is supposed to never go backwards.");
1073 } // while (true)
1074
1075 return true;
1076} // Posix_mq_handle::timed_receive()
1077
1078template<bool SND_ELSE_RCV>
1080{
1081 using util::Blob_const;
1082
1083 assert((!m_mq.null())
1084 && "As advertised: interrupt_impl() => undefined behavior if not successfully cted or was moved-from.");
1085
1086 Pipe_writer* interrupter_ptr;
1087 bool* interrupting_ptr;
1088 if constexpr(SND_ELSE_RCV)
1089 {
1090 interrupter_ptr = &m_interrupter_snd; interrupting_ptr = &m_interrupting_snd;
1091 }
1092 else
1093 {
1094 interrupter_ptr = &m_interrupter_rcv; interrupting_ptr = &m_interrupting_rcv;
1095 }
1096 auto& interrupter = *interrupter_ptr;
1097 auto& interrupting = *interrupting_ptr;
1098
1099 if (interrupting)
1100 {
1101 FLOW_LOG_WARNING("Posix_mq_handle [" << *this << "]: Interrupt mode already ON for "
1102 "snd_else_rcv [" << SND_ELSE_RCV << "]. Ignoring.");
1103 return false;
1104 }
1105 // else
1106
1107 interrupting = true; // Reminder: This member is for the above guard *only*. wait_impl() relies on FD state.
1108 FLOW_LOG_INFO("Posix_mq_handle [" << *this << "]: Interrupt mode turning ON for "
1109 "snd_else_rcv [" << SND_ELSE_RCV << "].");
1110
1111 util::pipe_produce(get_logger(), &interrupter);
1112
1113 // Now any level-triggered poll-wait will detect that this mode is on.
1114 return true;
1115} // Posix_mq_handle::interrupt_impl()
1116
1117template<bool SND_ELSE_RCV>
1119{
1120 using util::Blob_mutable;
1121
1122 assert((!m_mq.null())
1123 && "As advertised: allow_impl() => undefined behavior if not successfully cted or was moved-from.");
1124
1125 // Inverse of interrupt_impl(). Keeping comments light.
1126
1127 Pipe_reader* interrupt_detector_ptr;
1128 bool* interrupting_ptr;
1129 if constexpr(SND_ELSE_RCV)
1130 {
1131 interrupt_detector_ptr = &m_interrupt_detector_snd; interrupting_ptr = &m_interrupting_snd;
1132 }
1133 else
1134 {
1135 interrupt_detector_ptr = &m_interrupt_detector_rcv; interrupting_ptr = &m_interrupting_rcv;
1136 }
1137 auto& interrupt_detector = *interrupt_detector_ptr;
1138 auto& interrupting = *interrupting_ptr;
1139
1140 if (!interrupting)
1141 {
1142 FLOW_LOG_WARNING("Posix_mq_handle [" << *this << "]: Interrupt mode already OFF for "
1143 "snd_else_rcv [" << SND_ELSE_RCV << "]. Ignoring.");
1144 return false;
1145 }
1146 // else
1147
1148 interrupting = false;
1149 FLOW_LOG_INFO("Posix_mq_handle [" << *this << "]: Interrupt mode turning OFF for "
1150 "snd_else_rcv [" << SND_ELSE_RCV << "].");
1151
1152 util::pipe_consume(get_logger(), &interrupt_detector);
1153
1154 // Now any level-triggered poll-wait will detect that this mode is off.
1155 return true;
1156} // Posix_mq_handle::allow_impl()
1157
1159{
1160 return interrupt_impl<true>();
1161}
1162
1164{
1165 return allow_impl<true>();
1166}
1167
1169{
1170 return interrupt_impl<false>();
1171}
1172
1174{
1175 return allow_impl<false>();
1176}
1177
1178bool Posix_mq_handle::wait_impl(util::Fine_duration timeout_from_now_or_none, bool snd_else_rcv, Error_code* err_code)
1179{
1180 using util::Fine_time_pt;
1181 using util::Fine_duration;
1182 using flow::util::time_since_posix_epoch;
1183 using boost::chrono::round;
1184 using boost::chrono::milliseconds;
1185 using boost::system::system_category;
1186 using boost::array;
1187 using ::epoll_wait;
1188 using Epoll_event = ::epoll_event;
1189
1190 FLOW_ERROR_EXEC_AND_THROW_ON_ERROR(bool, wait_impl, timeout_from_now_or_none, snd_else_rcv, _1);
1191 // ^-- Call ourselves and return if err_code is null. If got to present line, err_code is not null.
1192
1193 assert((!m_mq.null())
1194 && "As advertised: wait_impl() => undefined behavior if not successfully cted or was moved-from.");
1195
1196 /* By the way -- epoll_wait() takes # of milliseconds, so round up to that and no need to do crazy stuff like
1197 * timed_send() et al; just convert to milliseconds. */
1198 int epoll_timeout_from_now_ms;
1199 milliseconds epoll_timeout_from_now;
1200 if (timeout_from_now_or_none == Fine_duration::max())
1201 {
1202 epoll_timeout_from_now_ms = -1;
1203 FLOW_LOG_TRACE("Posix_mq_handle [" << *this << "]: Infinite-await-unstarved for "
1204 "snd_else_rcv [" << snd_else_rcv << "]. Will perform an epoll_wait().");
1205 }
1206 else
1207 {
1208 epoll_timeout_from_now = round<milliseconds>(timeout_from_now_or_none);
1209 FLOW_LOG_TRACE("Posix_mq_handle [" << *this << "]: Blocking-await/poll-unstarved for "
1210 "snd_else_rcv [" << snd_else_rcv << "]; timeout ~[" << epoll_timeout_from_now << "] -- "
1211 "if 0 then poll. Will perform an epoll_wait().");
1212 epoll_timeout_from_now_ms = int(epoll_timeout_from_now.count());
1213 }
1214
1215 array<Epoll_event, 2> evs; // Only one possible event (we choose 1 of 2 event sets).
1216 const auto epoll_result
1217 = epoll_wait((snd_else_rcv ? m_epoll_hndl_snd : m_epoll_hndl_rcv)
1218 .m_native_handle,
1219 evs.begin(), 1, epoll_timeout_from_now_ms);
1220 if (epoll_result == -1)
1221 {
1222 FLOW_LOG_WARNING("Posix_mq_handle [" << *this << "]: epoll_wait() yielded error. Details follow.");
1223
1224 const auto& sys_err_code = *err_code = Error_code(errno, system_category());
1225 FLOW_ERROR_SYS_ERROR_LOG_WARNING();
1226 return false;
1227 }
1228 // else
1229
1230 assert(epoll_result <= 2);
1231 if ((epoll_result == 2) // Max of 2 events; if interrupted then as promised disregard it being also transmissible.
1232 ||
1233 ((epoll_result == 1) // 1 event: need to check whether it's the interruptor as opposed to the actual queue.
1234 && (evs[0].data.fd != m_mq.m_native_handle)))
1235 {
1236 if (timeout_from_now_or_none == Fine_duration::max())
1237 {
1238 FLOW_LOG_INFO("Posix_mq_handle [" << *this << "]: Infinite-await-unstarved for "
1239 "snd_else_rcv [" << snd_else_rcv << "]: interrupted.");
1240 }
1241 else
1242 {
1243 FLOW_LOG_INFO("Posix_mq_handle [" << *this << "]: Blocking-await/poll-unstarved for "
1244 "snd_else_rcv [" << snd_else_rcv << "]; timeout ~[" << epoll_timeout_from_now << "] -- "
1245 "if 0 then poll: interrupted.");
1246 }
1247 *err_code = error::Code::S_INTERRUPTED;
1248 return false;
1249 }
1250 // else if (epoll_result <= 1): Not interrupted.
1251
1252 const bool success = epoll_result == 1;
1253 if (timeout_from_now_or_none == Fine_duration::max())
1254 {
1255 FLOW_LOG_TRACE("Posix_mq_handle [" << *this << "]: Infinite-await-unstarved for "
1256 "snd_else_rcv [" << snd_else_rcv << "]: succeeded? = [" << success << "].");
1257 }
1258 else
1259 {
1260 FLOW_LOG_TRACE("Posix_mq_handle [" << *this << "]: Blocking-await/poll-unstarved for "
1261 "snd_else_rcv [" << snd_else_rcv << "]; timeout ~[" << epoll_timeout_from_now << "] -- "
1262 "if 0 then poll: succeeded? = [" << success << "].");
1263 }
1264
1265 err_code->clear();
1266 return success;
1267} // Posix_mq_handle::wait_impl()
1268
1270{
1271 return wait_impl(util::Fine_duration::zero(), true, err_code);
1272}
1273
1275{
1276 wait_impl(util::Fine_duration::max(), true, err_code);
1277}
1278
1280{
1281 return wait_impl(timeout_from_now, true, err_code);
1282}
1283
1285{
1286 return wait_impl(util::Fine_duration::zero(), false, err_code);
1287}
1288
1290{
1291 wait_impl(util::Fine_duration::max(), false, err_code);
1292}
1293
1295{
1296 return wait_impl(timeout_from_now, false, err_code);
1297}
1298
1300{
1301 return m_mq;
1302}
1303
1304void Posix_mq_handle::remove_persistent(flow::log::Logger* logger_ptr, // Static.
1305 const Shared_name& absolute_name, Error_code* err_code)
1306{
1307 using boost::system::system_category;
1308 using ::mq_unlink;
1309
1310 if (flow::error::exec_void_and_throw_on_error
1311 ([&](Error_code* actual_err_code)
1312 { remove_persistent(logger_ptr, absolute_name, actual_err_code); },
1313 err_code, "Posix_mq_handle::remove_persistent()"))
1314 {
1315 return;
1316 }
1317 // else
1318
1319 FLOW_LOG_SET_CONTEXT(logger_ptr, Log_component::S_TRANSPORT);
1320
1321 FLOW_LOG_INFO("Posix_mq @ Shared_name[" << absolute_name << "]: Removing persistent MQ if possible.");
1322 if (mq_unlink(shared_name_to_mq_name(absolute_name).c_str()) == 0)
1323 {
1324 err_code->clear();
1325 return;
1326 }
1327 // else
1328
1329 FLOW_LOG_WARNING("Posix_mq @ Shared_name[" << absolute_name << "]: While removing persistent MQ:"
1330 "mq_unlink() yielded error. Details follow.");
1331 const auto& sys_err_code = *err_code = Error_code(errno, system_category());
1332 FLOW_ERROR_SYS_ERROR_LOG_WARNING();
1333} // Posix_mq_handle::remove_persistent()
1334
1336{
1337 using boost::system::system_category;
1338
1339 if (result == 0)
1340 {
1341 err_code->clear();
1342 return true;
1343 }
1344 // else
1345
1346 FLOW_LOG_WARNING("Posix_mq_handle [" << *this << "]: mq_*() yielded error; context = [" << context << "]. "
1347 "Details follow.");
1348 const auto& sys_err_code = *err_code
1349 = (errno == EMSGSIZE)
1350 ? error::Code::S_MQ_MESSAGE_SIZE_OVER_OR_UNDERFLOW // By contract must emit this specific code for this.
1351 : Error_code(errno, system_category()); // Otherwise whatever it was.
1352 FLOW_ERROR_SYS_ERROR_LOG_WARNING();
1353
1354 return false;
1355} // Posix_mq_handle::handle_mq_api_result()
1356
1358{
1359 return m_absolute_name;
1360}
1361
1362std::ostream& operator<<(std::ostream& os, const Posix_mq_handle& val)
1363{
1364 os << '@' << &val << ": sh_name[" << val.absolute_name() << "] native_handle[";
1365 const auto native_handle = val.native_handle();
1366 if (native_handle.null())
1367 {
1368 return os << "null]";
1369 }
1370 // else
1371 return os << native_handle << ']';
1372}
1373
1374namespace
1375{
1376
1377std::string shared_name_to_mq_name(const Shared_name& name)
1378{
1379 // Pre-pend slash. See `man mq_overview`.
1380 std::string mq_name("/");
1381 return mq_name += name.str();
1382}
1383
1384} // namespace (anon)
1385
1386} // namespace ipc::transport
Implements the Persistent_mq_handle concept by wrapping the POSIX message queue API (see man mq_overv...
bool allow_sends()
Implements Persistent_mq_handle API: Turn off preemptive/concurrent interruption of blocking-sends an...
bool interrupt_receives()
Implements Persistent_mq_handle API: Turn on preemptive/concurrent interruption of blocking-receives ...
bool interrupt_sends()
Implements Persistent_mq_handle API: Turn on preemptive/concurrent interruption of blocking-sends and...
util::Pipe_reader Pipe_reader
Short-hand for anonymous pipe read end.
void wait_receivable(Error_code *err_code=0)
Implements Persistent_mq_handle API: Like receive() but without the actual popping of a message.
static const Shared_name S_RESOURCE_TYPE_ID
Implements concept API.
bool handle_mq_api_result(int result, Error_code *err_code, util::String_view context) const
Helper that handles the result of an mq_*() call by logging WARNING(s) on error; setting *err_code on...
Pipe_reader m_interrupt_detector_snd
A byte is read from this end by allow_sends() to make it not-readable for the poll-wait in wait_impl(...
Native_handle m_epoll_hndl_snd
epoll_*() handle (.null() if and only if m_mq is null) that is level-triggered to be active (with onl...
bool is_receivable(Error_code *err_code=0)
Implements Persistent_mq_handle API: Like try_receive() but without the actual popping of a message.
bool allow_receives()
Implements Persistent_mq_handle API: Turn off preemptive/concurrent interruption of blocking-receives...
bool is_sendable(Error_code *err_code=0)
Implements Persistent_mq_handle API: Like try_send() but without the actual pushing of a message.
size_t max_msg_size() const
Implements Persistent_mq_handle API: Returns the max message size of the underlying queue.
Pipe_reader m_interrupt_detector_rcv
Other-direction counterpart to m_interrupt_detector_snd.
Error_code epoll_setup()
Ctor helper that sets up m_epoll_hndl_snd and m_epoll_hndl_rcv.
util::Pipe_writer Pipe_writer
Short-hand for anonymous pipe write end.
size_t max_n_msgs() const
Implements Persistent_mq_handle API: Returns the max message count of the underlying queue.
bool try_send(const util::Blob_const &blob, Error_code *err_code=0)
Implements Persistent_mq_handle API: Non-blocking send: pushes copy of message to queue and returns t...
Pipe_writer m_interrupter_rcv
Other-direction counterpart to m_interrupter_snd.
Native_handle native_handle() const
Implements Persistent_mq_handle API: Returns the stored native MQ handle; null if not open.
bool timed_wait_receivable(util::Fine_duration timeout_from_now, Error_code *err_code=0)
Implements Persistent_mq_handle API: Like timed_receive() but without the actual popping of a message...
bool wait_impl(util::Fine_duration timeout_from_now_or_none, bool snd_else_rcv, Error_code *err_code)
Impl body for *_sendable() and *_receivable().
friend void swap(Posix_mq_handle &val1, Posix_mq_handle &val2)
Implements Persistent_mq_handle related concept: Swaps two objects.
static void remove_persistent(flow::log::Logger *logger_ptr, const Shared_name &name, Error_code *err_code=0)
Implements Persistent_mq_handle API: Removes the named persistent MQ.
void receive(util::Blob_mutable *blob, Error_code *err_code=0)
Implements Persistent_mq_handle API: Blocking receive: pops copy of message from queue into buffer; i...
void send(const util::Blob_const &blob, Error_code *err_code=0)
Implements Persistent_mq_handle API: Blocking send: pushes copy of message to queue; if queue is full...
Posix_mq_handle()
Implements Persistent_mq_handle API: Construct null handle.
bool m_interrupting_snd
Starting at false, this is made true via interrupt_sends(), and back by allow_sends(); acts as a guar...
const Shared_name & absolute_name() const
Implements Persistent_mq_handle API: Returns name equal to absolute_name passed to ctor.
Native_handle m_epoll_hndl_rcv
epoll_*() handle (.null() if and only if m_mq is null) that is level-triggered to be active (with onl...
bool allow_impl()
Impl body for allow_*().
bool interrupt_impl()
Impl body for interrupt_*().
bool timed_wait_sendable(util::Fine_duration timeout_from_now, Error_code *err_code=0)
Implements Persistent_mq_handle API: Like timed_send() but without the actual pushing of a message.
bool timed_receive(util::Blob_mutable *blob, util::Fine_duration timeout_from_now, Error_code *err_code=0)
Implements Persistent_mq_handle API: Blocking timed receive: pops copy of message from queue into buf...
~Posix_mq_handle()
Implements Persistent_mq_handle API: Destroys this handle (or no-op if no handle was successfully con...
bool set_non_blocking(bool nb, Error_code *err_code)
Sets m_mq to blocking or non-blocking and returns true on success and clears *err_code; otherwise ret...
Shared_name m_absolute_name
See absolute_name().
bool timed_send(const util::Blob_const &blob, util::Fine_duration timeout_from_now, Error_code *err_code=0)
Implements Persistent_mq_handle API: Blocking timed send: pushes copy of message to queue; if queue i...
bool try_receive(util::Blob_mutable *blob, Error_code *err_code=0)
Implements Persistent_mq_handle API: Non-blocking receive: pops copy of message from queue into buffe...
Error_code pipe_setup()
Ctor helper that sets up m_interrupt* pipe items.
void wait_sendable(Error_code *err_code=0)
Implements Persistent_mq_handle API: Like send() but without the actual pushing of a message.
bool m_interrupting_rcv
Other-direction counterpart to m_interrupting_snd.
Pipe_writer m_interrupter_snd
A byte is written to this end by interrupt_sends() to make it readable for the poll-wait in wait_impl...
Native_handle m_mq
Underlying MQ handle.
flow::util::Task_engine m_nb_task_engine
Never used for .run() or .async() – just so we can construct Pipe_reader, Pipe_writer.
Posix_mq_handle & operator=(Posix_mq_handle &&src)
Implements Persistent_mq_handle API: Replaces handle with the source handle while making the latter i...
String-wrapping abstraction representing a name uniquely distinguishing a kernel-persistent entity fr...
static Shared_name ct(const Source &src)
Copy-constructs from a char-sequence container (including string, util::String_view,...
void clear()
Makes it so empty() == true.
const std::string & str() const
Returns (sans copying) ref to immutable entire wrapped name string, suitable to pass into sys calls w...
@ S_INTERRUPTED
A blocking operation was intentionally interrupted or preemptively canceled.
@ S_MQ_MESSAGE_SIZE_OVER_OR_UNDERFLOW
Low-level message queue send-op buffer overflow (> max size) or receive-op buffer underflow (< max si...
Flow-IPC module providing transmission of structured messages and/or low-level blobs (and more) betwe...
util::Shared_name Shared_name
Convenience alias for the commonly used type util::Shared_name.
std::ostream & operator<<(std::ostream &os, const Bipc_mq_handle &val)
Prints string representation of the given Bipc_mq_handle to the given ostream.
void swap(Posix_mq_handle &val1, Posix_mq_handle &val2)
Implements Persistent_mq_handle related concept: Swaps two objects.
util::Native_handle Native_handle
Convenience alias for the commonly used type util::Native_handle.
void swap(Bipc_mq_handle &val1, Bipc_mq_handle &val2)
Implements Persistent_mq_handle related concept: Swaps two objects.
bipc::permissions Permissions
Short-hand for Unix (POSIX) permissions class.
Definition: util_fwd.hpp:161
const uint8_t * blob_data(const Blob_const &blob)
Syntactic-sugary helper that returns pointer to first byte in an immutable buffer,...
Definition: util.cpp:158
bipc::open_only_t Open_only
Tag type indicating an ideally-atomic open-if-exists-else-fail operation.
Definition: util_fwd.hpp:155
void pipe_produce(flow::log::Logger *logger_ptr, Pipe_writer *pipe)
Writes a byte to the given pipe writer.
Definition: util.cpp:67
const Open_or_create OPEN_OR_CREATE
Tag value indicating an open-if-exists-else-create operation.
Definition: util.cpp:30
boost::asio::mutable_buffer Blob_mutable
Short-hand for an mutable blob somewhere in memory, stored as exactly a void* and a size_t.
Definition: util_fwd.hpp:140
void pipe_consume(flow::log::Logger *logger_ptr, Pipe_reader *pipe)
Reads a byte via the given pipe reader.
Definition: util.cpp:96
boost::asio::writable_pipe Pipe_writer
Short-hand for anonymous pipe write end.
Definition: util_fwd.hpp:32
bipc::open_or_create_t Open_or_create
Tag type indicating an atomic open-if-exists-else-create operation.
Definition: util_fwd.hpp:152
bipc::create_only_t Create_only
Tag type indicating a create-unless-exists-else-fail operation.
Definition: util_fwd.hpp:158
flow::Fine_time_pt Fine_time_pt
Short-hand for Flow's Fine_time_pt.
Definition: util_fwd.hpp:119
boost::asio::readable_pipe Pipe_reader
Short-hand for anonymous pipe read end.
Definition: util_fwd.hpp:35
void set_resource_permissions(flow::log::Logger *logger_ptr, const fs::path &path, const Permissions &perms, Error_code *err_code)
Utility that sets the permissions of the given resource (at the supplied file system path) to specifi...
Definition: util.cpp:46
flow::Fine_duration Fine_duration
Short-hand for Flow's Fine_duration.
Definition: util_fwd.hpp:117
const Create_only CREATE_ONLY
Tag value indicating an atomic create-unless-exists-else-fail operation.
Definition: util.cpp:32
boost::asio::const_buffer Blob_const
Short-hand for an immutable blob somewhere in memory, stored as exactly a void const * and a size_t.
Definition: util_fwd.hpp:134
flow::util::String_view String_view
Short-hand for Flow's String_view.
Definition: util_fwd.hpp:115
Log_component
The flow::log::Component payload enumeration containing various log components used by Flow-IPC inter...
Definition: common.hpp:323
flow::Error_code Error_code
Short-hand for flow::Error_code which is very common.
Definition: common.hpp:298
A monolayer-thin wrapper around a native handle, a/k/a descriptor a/k/a FD.
bool null() const
Returns true if and only if m_native_handle equals S_NULL_HANDLE.
handle_t m_native_handle
The native handle (possibly equal to S_NULL_HANDLE), the exact payload of this Native_handle.