Flow-IPC 1.0.0
Flow-IPC project: Full implementation reference.
posix_mq_handle.cpp
Go to the documentation of this file.
1/* Flow-IPC: Core
2 * Copyright 2023 Akamai Technologies, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the
5 * "License"); you may not use this file except in
6 * compliance with the License. You may obtain a copy
7 * of the License at
8 *
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in
12 * writing, software distributed under the License is
13 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
14 * CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing
16 * permissions and limitations under the License. */
17
18/// @file
22#include <flow/error/error.hpp>
23#include <boost/chrono/floor.hpp>
24#include <boost/array.hpp>
25
26namespace ipc::transport
27{
28
29namespace
30{
31
32/**
33 * File-local helper: takes Shared_name; returns name suitable for `mq_open()` and buddies.
34 * @param name
35 * Shared_name.
36 * @return See above.
37 */
38std::string shared_name_to_mq_name(const Shared_name& name);
39
40}; // namespace (anon)
41
42// Initializers.
43
45
46// Implementations.
47
49 m_interrupting_snd(false), // (Not algorithmically needed as of this writing, but avoid some sanitizer complaints.)
50 m_interrupting_rcv(false), // (Ditto. E.g., swap() through clang's UBSAN => complains it's uninitialized.)
51 m_interrupter_snd(m_nb_task_engine),
52 m_interrupt_detector_snd(m_nb_task_engine),
53 m_interrupter_rcv(m_nb_task_engine),
54 m_interrupt_detector_rcv(m_nb_task_engine)
55{
56 // Done.
57}
58
59template<typename Mode_tag>
60Posix_mq_handle::Posix_mq_handle(Mode_tag, flow::log::Logger* logger_ptr, const Shared_name& absolute_name_arg,
61 size_t max_n_msg, size_t max_msg_sz,
62 const util::Permissions& perms, Error_code* err_code) :
63 flow::log::Log_context(logger_ptr, Log_component::S_TRANSPORT),
64 m_absolute_name(absolute_name_arg),
65 m_interrupting_snd(false),
66 m_interrupting_rcv(false),
67 m_interrupter_snd(m_nb_task_engine),
68 m_interrupt_detector_snd(m_nb_task_engine),
69 m_interrupter_rcv(m_nb_task_engine),
70 m_interrupt_detector_rcv(m_nb_task_engine)
71{
73 using flow::error::Runtime_error;
74 using boost::io::ios_all_saver;
75 using boost::system::system_category;
76 using ::mq_open;
77 using ::mq_close;
78 using ::mq_attr;
79 // using ::O_RDWR; // A macro apparently.
80 // using ::O_CREAT; // A macro apparently.
81 // using ::O_EXCL; // A macro apparently.
82
83 assert(max_n_msg >= 1);
84 assert(max_msg_sz >= 1);
85
86 static_assert(std::is_same_v<Mode_tag, util::Create_only> || std::is_same_v<Mode_tag, util::Open_or_create>,
87 "Can only delegate to this ctor with Mode_tag = Create_only or Open_or_create.");
88 constexpr bool CREATE_ONLY_ELSE_MAYBE = std::is_same_v<Mode_tag, util::Create_only>;
89 constexpr char const * MODE_STR = CREATE_ONLY_ELSE_MAYBE ? "create-only" : "open-or-create";
90
91 {
92 ios_all_saver saver(*(get_logger()->this_thread_ostream())); // Revert std::oct/etc. soon.
93
94 FLOW_LOG_TRACE
95 ("Posix_mq_handle [" << *this << "]: Constructing MQ handle to MQ at name [" << absolute_name() << "] in "
96 "[" << MODE_STR << "] mode; max msg size [" << max_msg_sz << "] x [" << max_n_msg << "] msgs; "
97 "perms = [" << std::setfill('0') << std::setw(4) << std::oct << perms.get_permissions() << "].");
98 }
99
100 // Get pipe stuff out of the way, as it does not need m_mq.
101 auto sys_err_code = pipe_setup();
102
103 if (!sys_err_code)
104 {
105 /* Naively: Use mq_open() to both create-or-open/exclusively-create and to set mode `perms`.
106 * In reality though there are two subtleties that make it tougher than that. Fortunately this has been
107 * adjudicated out there, and internal boost.interprocess code very nicely confirms that adjudication/sets
108 * a sufficiently-authoritative precedent.
109 *
110 * Subtlety 1 is that mq_open(), like all POSIX/Linux ::open()-y calls, doesn't listen to `perms` verbatim
111 * but rather makes it subject to the process umask. The proper way (citation omitted) to deal with it is
112 * to follow it up with ::[f]chmod() or equivalent, with the resource descriptor if available (or path if not).
113 * (We do have the descriptor -- m_mq -- and we have a util::set_resource_permissions() overload for this.)
114 *
115 * Subtlety 2 is that we should only do that last thing if, indeed, we are *creating* the resource (MQ);
116 * if CREATE_ONLY_ELSE_MAYBE==false, and it already exists, then we mustn't do any such thing; we've succeeded.
117 * Sadly there's no (atomic/direct) way to find out whether mq_open() (in ==false case) did in fact create
118 * the thing. Therefore, we have to do things in a kludgy way -- that is, nevertheless, reasonable in practice
119 * and indeed is validated by bipc's use of it internally (search headers for `case ipcdetail::DoOpenOrCreate:`
120 * followed by shm_open(), fchmod(), etc.). We explain it here for peace of mind.
121 *
122 * In the CREATE_ONLY_ELSE_MAYBE=true case, we can just do what we'd do anyway -- shm_open() in create-only
123 * mode; then set_resource_permissions(); and that's that. Otherwise:
124 *
125 * We can no longer use the built-in atomic create-or-open mode (O_CREAT sans O_EXCL). Instead, we logically
126 * split it into two parts:try to create-only (O_CREAT|O_EXCL); if it succeeds, done (can
127 * set_resource_permissions()); if it fails with file-exists error, then mq_open() again, this time in open-only
128 * mode (and no need for set_resource_permissions()). Great! The problem, of course, is that this no longer uses a
129 * guaranteed-atomic open-or-create OS-call semantic. Namely it means set_resource_permissions() could fail due to
130 * file-not-found, because during the non-atomic gap the MQ got removed by someone. Then we spin-lock (in a way)
131 * through it: just try the whole procedure again. Sooner or later -- assuming no other errors of course --
132 * either the first mq_open() will succeed, or it'll "fail" with file-exists, yet the 2nd mq_open() will succeed.
133 * Pegging processor is not a serious concern in this context. */
134
135 const auto mq_name = shared_name_to_mq_name(absolute_name());
136 const auto do_mq_open_func = [&](bool create_else_open) -> bool // Just a helper.
137 {
138 /* Note: O_NONBLOCK is something we are forced to set/unset depending on the transmission API being called.
139 * So just don't worry about it here. */
140 ::mqd_t raw;
141 if (create_else_open)
142 {
143 mq_attr attr;
144 attr.mq_maxmsg = max_n_msg;
145 attr.mq_msgsize = max_msg_sz;
146
147 raw = mq_open(mq_name.c_str(),
148 O_RDWR | O_CREAT | O_EXCL, // Create-only always: per above. Can't use create-or-open.
149 perms.get_permissions(),
150 &attr);
151 }
152 else
153 {
154 raw = mq_open(mq_name.c_str(), O_RDWR);
155 }
156 if (raw != -1)
157 {
158 m_mq = Native_handle(raw);
159 }
160
161 if (m_mq.null())
162 {
163 sys_err_code = Error_code(errno, system_category());
164 return false;
165 }
166 // else
167 return true;
168 }; // do_mq_open_func =
169
170 if (CREATE_ONLY_ELSE_MAYBE)
171 {
172 // Simple case.
173 if (do_mq_open_func(true))
174 {
175 set_resource_permissions(get_logger(), m_mq, perms, &sys_err_code);
176 }
177 // sys_err_code is either success or failure. Fall through.
178 }
179 else // if (!CREATE_ONLY_ELSE_MAYBE)
180 {
181 // Complicated case.
182
183 bool success = false; // While this remains false, sys_err_code indicates whether it's false b/c of fatal error.
184 do
185 {
186 if (do_mq_open_func(true))
187 {
188 // Created! Only situation where we must indeed set_resource_permissions().
189 set_resource_permissions(get_logger(), m_mq, perms, &sys_err_code);
190 success = !sys_err_code;
191 continue; // `break`, really. One of success or sys_err_code is true (latter <= s_r_p() failed).
192 }
193 // else if (!do_mq_open_func(true)) // I.e., it failed for some reason.
194 if (sys_err_code != boost::system::errc::file_exists)
195 {
196 // Real error. GTFO.
197 continue; // `break;`, really (sys_err_code==true).
198 }
199 // else if (file_exists): Create failed, because it already exists. Open the existing guy!
200
201 if (do_mq_open_func(false))
202 {
203 // Opened!
204 success = true;
205 continue; // `break;`, really (success==true).
206 }
207 // else if (!do_mq_open_func(false)) // I.e., it failed for some reason.
208 if (sys_err_code != boost::system::errc::no_such_file_or_directory)
209 {
210 // Real error. GTFO.
211 continue; // `break;`, really (sys_err_code==true).
212 }
213 // else if (no_such_file_or_directory): Open failed, because MQ *just* got unlinked. Try again!
214
215 // This is fun and rare enough to warrant an INFO message.
216 {
217 ios_all_saver saver(*(get_logger()->this_thread_ostream())); // Revert std::oct/etc. soon.
218 FLOW_LOG_INFO
219 ("Posix_mq_handle [" << *this << "]: Create-or-open algorithm encountered the rare concurrency: "
220 "MQ at name [" << absolute_name() << "] existed during the create-only mq_open() but disappeared "
221 "before we were able to complete open-only mq_open(). Retrying in spin-lock fashion. "
222 "Details: max msg size [" << max_msg_sz << "] x [" << max_n_msg << "] msgs; "
223 "perms = [" << std::setfill('0') << std::setw(4) << std::oct << perms.get_permissions() << "].");
224 }
225
226 sys_err_code.clear(); // success remains false, and that wasn't a fatal error, so ensure loop continues.
227 }
228 while ((!success) && (!sys_err_code));
229
230 // Now just encode success-or-not entirely in sys_err_code's truthiness.
231 if (success)
232 {
233 sys_err_code.clear();
234 }
235 // else { sys_err_code is the reason loop exited already. }
236
237 // Now fall through.
238 } // else if (!CREATE_ONLY_ELSE_MAYBE)
239
240 // We never use blocking transmission -- always using *wait_*able() so that interrupt_*() works. Non-blocking 4eva.
241 if ((!sys_err_code) && (!set_non_blocking(true, &sys_err_code)))
242 {
243 assert(sys_err_code);
244
245 // Clean up.
246
247 // Disregard any error. In Linux, by the way, only EBADF is possible apparently; should be fine.
248 mq_close(m_mq.m_native_handle);
250 }
251
252 if (!sys_err_code)
253 {
254 sys_err_code = epoll_setup(); // It cleans up everything if it fails.
255 }
256 } // if (!sys_err_code) (but might have become true inside)
257
258 if (sys_err_code)
259 {
260 {
261 ios_all_saver saver(*(get_logger()->this_thread_ostream())); // Revert std::oct/etc. soon.
262
263 FLOW_LOG_WARNING
264 ("Posix_mq_handle [" << *this << "]: mq_open() or set_resource_permissions() error (if the latter, details "
265 "above and repeated below; otherwise error details only follow) while "
266 "constructing MQ handle to MQ at name [" << absolute_name() << "] in "
267 "create-only mode; max msg size [" << max_msg_sz << "] x [" << max_n_msg << "] msgs; "
268 "perms = [" << std::setfill('0') << std::setw(4) << std::oct << perms.get_permissions() << "].");
269 }
270 FLOW_ERROR_SYS_ERROR_LOG_WARNING();
271
272 if (err_code)
273 {
274 *err_code = sys_err_code;
275 }
276 else
277 {
278 throw Runtime_error(sys_err_code, FLOW_UTIL_WHERE_AM_I_STR());
279 }
280 } // if (sys_err_code)
281 // else { Cool! }
282} // Posix_mq_handle::Posix_mq_handle(Mode_tag)
283
284Posix_mq_handle::Posix_mq_handle(flow::log::Logger* logger_ptr, const Shared_name& absolute_name_arg,
285 util::Create_only, size_t max_n_msg, size_t max_msg_sz,
286 const util::Permissions& perms, Error_code* err_code) :
287 Posix_mq_handle(util::CREATE_ONLY, logger_ptr, absolute_name_arg, max_n_msg, max_msg_sz, perms, err_code)
288{
289 // Cool.
290}
291
292Posix_mq_handle::Posix_mq_handle(flow::log::Logger* logger_ptr, const Shared_name& absolute_name_arg,
293 util::Open_or_create, size_t max_n_msg, size_t max_msg_sz,
294 const util::Permissions& perms, Error_code* err_code) :
295 Posix_mq_handle(util::OPEN_OR_CREATE, logger_ptr, absolute_name_arg, max_n_msg, max_msg_sz, perms, err_code)
296{
297 // Cool.
298}
299
300Posix_mq_handle::Posix_mq_handle(flow::log::Logger* logger_ptr, const Shared_name& absolute_name_arg,
301 util::Open_only, Error_code* err_code) :
302 flow::log::Log_context(logger_ptr, Log_component::S_TRANSPORT),
303 m_absolute_name(absolute_name_arg),
304 m_interrupting_snd(false),
305 m_interrupting_rcv(false),
306 m_interrupter_snd(m_nb_task_engine),
307 m_interrupt_detector_snd(m_nb_task_engine),
308 m_interrupter_rcv(m_nb_task_engine),
309 m_interrupt_detector_rcv(m_nb_task_engine)
310{
311 using flow::log::Sev;
312 using flow::error::Runtime_error;
313 using boost::system::system_category;
314 using ::mq_open;
315 // using ::O_RDWR; // A macro apparently.
316
317 FLOW_LOG_TRACE
318 ("Posix_mq_handle [" << *this << "]: Constructing MQ handle to MQ at name [" << absolute_name() << "] in "
319 "open-only mode.");
320
321 // Get pipe stuff out of the way, as it does not need m_mq.
322 auto sys_err_code = pipe_setup();
323
324 if (!sys_err_code)
325 {
326 /* Note: O_NONBLOCK is something we are forced to set/unset depending on the transmission API being called.
327 * So just don't worry about it here. */
328 const auto raw = mq_open(shared_name_to_mq_name(absolute_name()).c_str(), O_RDWR);
329 if (raw != -1)
330 {
331 m_mq = Native_handle(raw);
332 }
333
334 if (m_mq.null())
335 {
336 FLOW_LOG_WARNING
337 ("Posix_mq_handle [" << *this << "]: mq_open() error (error details follow) while "
338 "constructing MQ handle to MQ at name [" << absolute_name() << "] in open-only mode.");
339 sys_err_code = Error_code(errno, system_category());
340 }
341 else
342 {
343 // We never use blocking transmission -- always using *wait_*able() => interrupt_*() works. Non-blocking 4eva.
344 if (!set_non_blocking(true, &sys_err_code))
345 {
346 assert(sys_err_code);
347
348 // Clean up.
349
350 // Disregard any error. In Linux, by the way, only EBADF is possible apparently; it's fine.
351 mq_close(m_mq.m_native_handle);
353 }
354 else
355 {
356 sys_err_code = epoll_setup(); // It logged on error; also then it cleaned up everything.
357 }
358 } // if (!m_mq.null()) (but it may have become null, and sys_err_code therefore truthy, inside)
359 } // if (!sys_err_code) (but it may have become truthy inside)
360
361 if (sys_err_code)
362 {
363 FLOW_ERROR_SYS_ERROR_LOG_WARNING();
364
365 if (err_code)
366 {
367 *err_code = sys_err_code;
368 }
369 else
370 {
371 throw Runtime_error(sys_err_code, FLOW_UTIL_WHERE_AM_I_STR());
372 }
373 }
374 // else { Cool! }
375} // Posix_mq_handle::Posix_mq_handle(Open_only)
376
378{
379 using boost::asio::connect_pipe;
380
381 Error_code sys_err_code;
382 connect_pipe(m_interrupt_detector_snd, m_interrupter_snd, sys_err_code);
383 if (!sys_err_code)
384 {
385 connect_pipe(m_interrupt_detector_rcv, m_interrupter_rcv, sys_err_code);
386 }
387
388 if (sys_err_code)
389 {
390 FLOW_LOG_WARNING
391 ("Posix_mq_handle [" << *this << "]: Constructing MQ handle to MQ at name [" << absolute_name() << "]: "
392 "connect-pipe failed. Details follow.");
393 FLOW_ERROR_SYS_ERROR_LOG_WARNING();
394
395 // Clean up first guys (might be no-op).
396 Error_code sink;
397 m_interrupter_snd.close(sink);
398 m_interrupt_detector_snd.close(sink);
399 }
400
401 return sys_err_code;
402} // Posix_mq_handle::pipe_setup()
403
405{
406 using boost::system::system_category;
407 using ::epoll_create1;
408 using ::epoll_ctl;
409 using ::mq_close;
410 using ::close;
411 using Epoll_event = ::epoll_event;
412 // using ::EPOLL_CTL_ADD; // A macro apparently.
413 // using ::EPOLLIN; // A macro apparently.
414
415 /* m_mq is good to go.
416 *
417 * Set up one epoll handle for checking for writability; another for readability.
418 * Into each one add an interrupter FD (readability of read end of an anonymous pipe).
419 *
420 * There's some lame cleanup logic below specific to the 2-handle situation. It's okay. Just, if setting
421 * up the outgoing-direction epolliness fails, it cleans up itself. If that succeeds, but incoming-direction
422 * setup then fails, then after it cleans up after itself, we have to undo the outgoing-direction setup. */
423
424 Error_code sys_err_code;
425
426 const auto setup = [&](Native_handle* epoll_hndl_ptr, bool snd_else_rcv)
427 {
428 auto& epoll_hndl = *epoll_hndl_ptr;
429 auto& interrupt_detector = snd_else_rcv ? m_interrupt_detector_snd : m_interrupt_detector_rcv;
430
431 epoll_hndl = Native_handle(epoll_create1(0));
432 if (epoll_hndl.m_native_handle == -1)
433 {
434 FLOW_LOG_WARNING("Posix_mq_handle [" << *this << "]: Created MQ handle fine, but epoll_create1() failed; "
435 "details follow.");
436 sys_err_code = Error_code(errno, system_category());
437
438 // Clean up.
439
440 epoll_hndl = Native_handle(); // No-op as of this writing, but just to keep it maintainable do it anyway.
441
442 Error_code sink;
443 m_interrupt_detector_snd.close(sink);
444 m_interrupter_snd.close(sink);
445 m_interrupt_detector_rcv.close(sink);
446 m_interrupter_rcv.close(sink);
447
448 // Disregard any error. In Linux, by the way, only EBADF is possible apparently; should be fine.
449 mq_close(m_mq.m_native_handle);
451 return;
452 }
453 // else if (epoll_hndl.m_native_handle != -1)
454
455 // Each epoll_wait() will listen for transmissibility of the queue itself + readability of interrupt-pipe.
456 Epoll_event event_of_interest1;
457 event_of_interest1.events = snd_else_rcv ? EPOLLOUT : EPOLLIN;
458 event_of_interest1.data.fd = m_mq.m_native_handle;
459 Epoll_event event_of_interest2;
460 event_of_interest2.events = EPOLLIN;
461 event_of_interest2.data.fd = interrupt_detector.native_handle();
462 if ((epoll_ctl(epoll_hndl.m_native_handle, EPOLL_CTL_ADD, event_of_interest1.data.fd, &event_of_interest1) == -1) ||
463 (epoll_ctl(epoll_hndl.m_native_handle, EPOLL_CTL_ADD, event_of_interest2.data.fd, &event_of_interest2) == -1))
464 {
465 FLOW_LOG_WARNING("Posix_mq_handle [" << *this << "]: Created MQ handle fine, but an epoll_ctl() failed; "
466 "snd_else_rcv = [" << snd_else_rcv << "]; details follow.");
467 sys_err_code = Error_code(errno, system_category());
468
469 // Clean up everything.
470 close(epoll_hndl.m_native_handle);
471 epoll_hndl = Native_handle();
472 // Disregard any error. In Linux, by the way, only EBADF is possible apparently; should be fine.
473 mq_close(m_mq.m_native_handle);
475 return;
476 }
477 }; // const auto setup =
478
479 setup(&m_epoll_hndl_snd, true);
480 if (!sys_err_code)
481 {
482 setup(&m_epoll_hndl_rcv, false);
483 if (sys_err_code)
484 {
485 // Have to undo first setup(), except m_mq+pipes cleanup was already done by 2nd setup().
488 }
489 }
490 // else { 1st setup() cleaned everything up. }
491
492 return sys_err_code;
493} // Posix_mq_handle::epoll_setup()
494
497{
498 operator=(std::move(src));
499}
500
502{
503 using ::mq_close;
504 using ::close;
505
506 if (m_mq.null())
507 {
508 FLOW_LOG_TRACE("Posix_mq_handle [" << *this << "]: No MQ handle to close in destructor.");
509 assert(m_epoll_hndl_snd.null());
510 assert(m_epoll_hndl_rcv.null());
511 }
512 else
513 {
514 FLOW_LOG_TRACE("Posix_mq_handle [" << *this << "]: Closing MQ handle (and epoll set).");
515 // Disregard any error. In Linux, by the way, only EBADF is possible apparently; we should be fine.
516 mq_close(m_mq.m_native_handle);
517
518 assert(!m_epoll_hndl_snd.null());
519 assert(!m_epoll_hndl_rcv.null());
520 close(m_epoll_hndl_snd.m_native_handle); // Ditto regarding errors.
522 }
523} // Posix_mq_handle::~Posix_mq_handle()
524
526{
527 using std::swap;
528
529 if (&src != this)
530 {
539
540 swap(*this, src);
541 }
542 return *this;
543}
544
546{
547 using util::Pipe_writer;
548 using util::Pipe_reader;
549 using flow::log::Log_context;
550 using std::swap;
551 using boost::array;
552
553 // This is a bit faster than un-specialized std::swap() which would require a move ction + 3 move assignments.
554
555 swap(static_cast<Log_context&>(val1), static_cast<Log_context&>(val2));
556 swap(val1.m_mq, val2.m_mq);
562
563 /* This is annoying! Maybe we should just wrap all these in unique_ptr<>s to avoid this nonsense.
564 * As it is, it might be unsafe to swap asio objects apart from their Task_engines, so doing this blech. */
565 try
566 {
567 array<Native_handle, 4> fds1;
568 array<Native_handle, 4> fds2;
569 const auto unload = [&](Posix_mq_handle& val, auto& fds)
570 {
571 if (val.m_interrupter_snd.is_open())
572 {
573 fds[0].m_native_handle = val.m_interrupter_snd.release();
574 }
575 if (val.m_interrupter_rcv.is_open())
576 {
577 fds[1].m_native_handle = val.m_interrupter_rcv.release();
578 }
579 if (val.m_interrupt_detector_snd.is_open())
580 {
581 fds[2].m_native_handle = val.m_interrupt_detector_snd.release();
582 }
583 if (val.m_interrupt_detector_rcv.is_open())
584 {
585 fds[3].m_native_handle = val.m_interrupt_detector_rcv.release();
586 }
587 };
588 unload(val1, fds1);
589 unload(val2, fds2);
590
591 const auto reload = [&](Posix_mq_handle& val, auto& fds)
592 {
593 // Leave their stupid task engines in-place. Do need to reassociate them with the swapped FDs though.
595 = fds[0].null() ? Pipe_writer(val.m_nb_task_engine)
596 : Pipe_writer(val.m_nb_task_engine, fds[0].m_native_handle);
598 = fds[1].null() ? Pipe_writer(val.m_nb_task_engine)
599 : Pipe_writer(val.m_nb_task_engine, fds[1].m_native_handle);
601 = fds[2].null() ? Pipe_reader(val.m_nb_task_engine)
602 : Pipe_reader(val.m_nb_task_engine, fds[2].m_native_handle);
604 = fds[3].null() ? Pipe_reader(val.m_nb_task_engine)
605 : Pipe_reader(val.m_nb_task_engine, fds[3].m_native_handle);
606 };
607 reload(val1, fds2); // Swap 'em.
608 reload(val2, fds1);
609 }
610 catch (...) // .release() and ctor can throw, and they really shouldn't at all.
611 {
612 assert(false && "Horrible exception."); std::abort();
613 }
614} // swap()
615
617{
618 using ::mq_attr;
619 using ::mq_getattr;
620
621 assert((!m_mq.null())
622 && "As advertised: max_msg_size() => undefined behavior if not successfully cted or was moved-from.");
623
624 mq_attr attr;
625 if (mq_getattr(m_mq.m_native_handle, &attr) != 0)
626 {
627 /* We could handle error gracefully, as we do elsewhere, but EBADF is the only possibility, and there's zero
628 * reason it should happen unles m_mq is null at this point. It'd be fine to handle it, but then we'd have
629 * to emit an Error_code, and the API would change, and the user would have more to worry about; I (ygoldfel)
630 * decided it's overkill. So if it does happen, log and assert(). @todo Maybe reconsider. */
631 Error_code sink;
632 handle_mq_api_result(-1, &sink, "Posix_mq_handle::max_msg_size: mq_getattr()");
633 assert(false && "mq_getattr() failed (details logged); this is too bizarre.");
634 return 0;
635 }
636 // else
637
638 return size_t(attr.mq_msgsize);
639} // Posix_mq_handle::max_msg_size()
640
642{
643 // Very similar to max_msg_size(). @todo Maybe code reuse. Though line count might be even greater then so....
644
645 using ::mq_attr;
646 using ::mq_getattr;
647 assert((!m_mq.null())
648 && "As advertised: max_n_msgs() => undefined behavior if not successfully cted or was moved-from.");
649 mq_attr attr;
650 if (mq_getattr(m_mq.m_native_handle, &attr) != 0)
651 {
652 Error_code sink;
653 handle_mq_api_result(-1, &sink, "Posix_mq_handle::max_msg_size: mq_getattr()");
654 assert(false && "mq_getattr() failed (details logged); this is too bizarre.");
655 return 0;
656 }
657 return size_t(attr.mq_maxmsg);
658} // Posix_mq_handle::max_msg_size()
659
661{
662 using boost::system::system_category;
663 using ::mq_attr;
664 using ::mq_setattr;
665 // using ::O_NONBLOCK; // A macro apparently.
666
667 assert(err_code);
668
669 /* By the way -- no, this cannot be done in mq_attr that goes into mq_open(). It is ignored there according to docs.
670 * Would've been nice actually.... */
671
672 mq_attr attr;
673 attr.mq_flags = nb ? O_NONBLOCK : 0;
674 return handle_mq_api_result(mq_setattr(m_mq.m_native_handle, &attr, 0),
675 err_code, "Posix_mq_handle::set_non_blocking(): mq_setattr()");
676} // Posix_mq_handle::set_non_blocking()
677
679{
680 using flow::util::buffers_dump_string;
681 using ::mq_send;
682 // using ::EAGAIN; // A macro apparently.
683
684 FLOW_ERROR_EXEC_AND_THROW_ON_ERROR(bool, Posix_mq_handle::try_send, flow::util::bind_ns::cref(blob), _1);
685 // ^-- Call ourselves and return if err_code is null. If got to present line, err_code is not null.
686
687 assert((!m_mq.null())
688 && "As advertised: try_send() => undefined behavior if not successfully cted or was moved-from.");
689
690 auto blob_data = blob.data();
691 FLOW_LOG_TRACE("Posix_mq_handle [" << *this << "]: Nb-push of blob @[" << blob_data << "], "
692 "size [" << blob.size() << "].");
693 if (blob.size() == 0)
694 {
695 /* ::mq_send(..., nullptr, N), even when N == 0, is (1) empirically speaking harmless but (2) technically
696 * in violation of arg 2's non-null decl (even though `N == 0` is explicitly allowed per `man` page) hence
697 * (3) causes a clang UBSAN sanitizer error. So waste a couple cycles by feeding it this dummy
698 * non-null value. */
699 blob_data = static_cast<const void*>(&blob_data);
700 }
701 else // if (blob.size() != 0)
702 {
703 FLOW_LOG_DATA("Blob contents: [\n" << buffers_dump_string(blob, " ") << "].");
704 }
705
706 if (mq_send(m_mq.m_native_handle,
707 static_cast<const char*>(blob_data),
708 blob.size(), 0) == 0)
709 {
710 err_code->clear();
711 return true; // Instant success.
712 }
713 // else
714 if (errno == EAGAIN)
715 {
716 FLOW_LOG_TRACE("Posix_mq_handle [" << *this << "]: Nb-push of blob @[" << blob_data << "], "
717 "size [" << blob.size() << "]: would-block.");
718 err_code->clear();
719 return false; // Queue full.
720 }
721 // else
722
723 handle_mq_api_result(-1, err_code, "Posix_mq_handle::try_send(): mq_send(nb)");
724 assert(*err_code);
725 return false;
726} // Posix_mq_handle::try_send()
727
729{
730 using flow::util::buffers_dump_string;
731 using ::mq_send;
732 // using ::EAGAIN; // A macro apparently.
733
734 if (flow::error::exec_void_and_throw_on_error
735 ([&](Error_code* actual_err_code) { send(blob, actual_err_code); },
736 err_code, "Posix_mq_handle::send()"))
737 {
738 return;
739 }
740 // else
741
742 assert((!m_mq.null())
743 && "As advertised: send() => undefined behavior if not successfully cted or was moved-from.");
744
745 auto blob_data = blob.data();
746 FLOW_LOG_TRACE("Posix_mq_handle [" << *this << "]: Blocking-push of blob @[" << blob_data << "], "
747 "size [" << blob.size() << "]. Trying nb-push first; if it succeeds -- great. "
748 "Else will wait/retry/wait/retry/....");
749 if (blob.size() == 0)
750 {
751 // See similarly-placed comment in try_send() which explains this.
752 blob_data = static_cast<const void*>(&blob_data);
753 }
754 else // if (blob.size() != 0)
755 {
756 FLOW_LOG_DATA("Blob contents: [\n" << buffers_dump_string(blob, " ") << "].");
757 }
758
759 /* We could just invoke blocking mq_send(m_mq), but to get the promised logging we go a tiny bit fancier as follows.
760 * Update: Now that we have to be interrupt_*()ible, also reuse wait_*() instead of using native
761 * blocking mq_*(). */
762
763 while (true)
764 {
765 if (mq_send(m_mq.m_native_handle,
766 static_cast<const char*>(blob_data),
767 blob.size(), 0) == 0)
768 {
769 err_code->clear();
770 return; // Instant success.
771 }
772 // else
773 if (errno != EAGAIN)
774 {
775 handle_mq_api_result(-1, err_code, "Posix_mq_handle::send(): mq_send(nb)");
776 assert(*err_code);
777 return;
778 }
779 // else if (would-block): as promised, INFO logs.
780
781 FLOW_LOG_TRACE("Posix_mq_handle [" << *this << "]: Nb-push of blob @[" << blob_data << "], "
782 "size [" << blob.size() << "]: would-block. Executing blocking-wait.");
783
784 wait_sendable(err_code);
785 if (*err_code)
786 {
787 // Whether interrupted or true error, we're done.
788 return;
789 }
790 // else
791
792 FLOW_LOG_TRACE("Blocking-wait reported transmissibility. Retrying.");
793 } // while (true)
794} // Posix_mq_handle::send()
795
797 Error_code* err_code)
798{
799 using flow::util::time_since_posix_epoch;
800 using flow::util::buffers_dump_string;
801 using flow::Fine_clock;
802 using boost::chrono::floor;
803 using boost::chrono::round;
804 using boost::chrono::seconds;
805 using boost::chrono::microseconds;
806 using boost::chrono::nanoseconds;
807 using ::mq_send;
808 // using ::EAGAIN; // A macro apparently.
809
810 FLOW_ERROR_EXEC_AND_THROW_ON_ERROR(bool, Posix_mq_handle::timed_send,
811 flow::util::bind_ns::cref(blob), timeout_from_now, _1);
812 // ^-- Call ourselves and return if err_code is null. If got to present line, err_code is not null.
813
814 // Similar to a combo of (blocking) send() and (non-blocking) try_send() -- keeping comments light where redundant.
815
816 assert((!m_mq.null())
817 && "As advertised: timed_send() => undefined behavior if not successfully cted or was moved-from.");
818
819 auto blob_data = blob.data();
820 FLOW_LOG_TRACE("Posix_mq_handle [" << *this << "]: Blocking-timed-push of blob @[" << blob_data << "], "
821 "size [" << blob.size() << "]; timeout ~[" << round<microseconds>(timeout_from_now) << "]. "
822 "Trying nb-push first; if it succeeds -- great. Else will wait/retry/wait/retry/....");
823 if (blob.size() == 0)
824 {
825 // See similarly-placed comment in try_send() which explains this.
826 blob_data = static_cast<const void*>(&blob_data);
827 }
828 else // if (blob.size() != 0)
829 {
830 FLOW_LOG_DATA("Blob contents: [\n" << buffers_dump_string(blob, " ") << "].");
831 }
832
833 auto now = Fine_clock::now();
834 auto after = now;
835
836 while (true)
837 {
838 if (mq_send(m_mq.m_native_handle,
839 static_cast<const char*>(blob_data),
840 blob.size(), 0) == 0)
841 {
842 err_code->clear();
843 break; // Instant success.
844 }
845 // else
846 if (errno != EAGAIN)
847 {
848 handle_mq_api_result(-1, err_code, "Posix_mq_handle::send(): mq_send(nb)");
849 assert(*err_code);
850 return false;
851 }
852 // else if (would-block): as promised, INFO logs.
853
854 FLOW_LOG_TRACE("Posix_mq_handle [" << *this << "]: Nb-push of blob @[" << blob_data << "], "
855 "size [" << blob.size() << "]: would-block. Executing blocking-wait.");
856
857 timeout_from_now -= (after - now); // No-op the first time; after that reduces time left.
858 const bool ready = timed_wait_sendable(timeout_from_now, err_code);
859 if (*err_code)
860 {
861 // Whether interrupted or true error, we're done.
862 return false;
863 }
864 // else:
865
866 if (!ready) // I.e., if (timed out).
867 {
868 FLOW_LOG_TRACE("Did not finish before timeout.");
869 return false;
870 }
871 // else: successful wait for transmissibility. Try nb-transmitting again.
872 FLOW_LOG_TRACE("Blocking-wait reported transmissibility. Retrying.");
873
874 after = Fine_clock::now();
875 assert((after >= now) && "Fine_clock is supposed to never go backwards.");
876 } // while (true)
877
878 return true;
879} // Posix_mq_handle::timed_send()
880
882{
883 using util::Blob_mutable;
884 using flow::util::buffers_dump_string;
885 using ::mq_receive;
886 // using ::EAGAIN; // A macro apparently.
887
888 FLOW_ERROR_EXEC_AND_THROW_ON_ERROR(bool, Posix_mq_handle::try_receive, blob, _1);
889 // ^-- Call ourselves and return if err_code is null. If got to present line, err_code is not null.
890
891 assert((!m_mq.null())
892 && "As advertised: try_receive() => undefined behavior if not successfully cted or was moved-from.");
893
894 FLOW_LOG_TRACE("Posix_mq_handle [" << *this << "]: Nb-pop to blob @[" << blob->data() << "], "
895 "max-size [" << blob->size() << "].");
896
897 ssize_t n_rcvd;
898 unsigned int pri_ignored;
899 if ((n_rcvd = mq_receive(m_mq.m_native_handle,
900 static_cast<char*>(blob->data()),
901 blob->size(), &pri_ignored)) >= 0)
902 {
903 *blob = Blob_mutable(blob->data(), n_rcvd);
904 FLOW_LOG_TRACE("Received message sized [" << n_rcvd << "].");
905 if (blob->size() != 0)
906 {
907 FLOW_LOG_DATA("Blob contents: [\n" << buffers_dump_string(*blob, " ") << "].");
908 }
909 err_code->clear();
910 return true; // Instant success.
911 }
912 // else
913 if (errno == EAGAIN)
914 {
915 FLOW_LOG_TRACE("Posix_mq_handle [" << *this << "]: Nb-pop to blob @[" << blob->data() << "], "
916 "max-size [" << blob->size() << "]: would-block.");
917 err_code->clear();
918 return false; // Queue full.
919 }
920 // else
921
922 handle_mq_api_result(-1, err_code, "Posix_mq_handle::try_receive(): mq_receive(nb)");
923 assert(*err_code);
924 return false;
925} // Posix_mq_handle::try_receive()
926
928{
929 using util::Blob_mutable;
930 using flow::util::buffers_dump_string;
931 using ::mq_receive;
932 // using ::EAGAIN; // A macro apparently.
933
934 if (flow::error::exec_void_and_throw_on_error
935 ([&](Error_code* actual_err_code) { receive(blob, actual_err_code); },
936 err_code, "Posix_mq_handle::receive()"))
937 {
938 return;
939 }
940 // else
941
942 assert((!m_mq.null())
943 && "As advertised: receive() => undefined behavior if not successfully cted or was moved-from.");
944
945 FLOW_LOG_TRACE("Posix_mq_handle [" << *this << "]: Blocking-pop to blob @[" << blob->data() << "], "
946 "max-size [" << blob->size() << "]. Trying nb-pop first; if it succeeds -- great. "
947 "Else will wait/retry/wait/retry/....");
948
949 /* We could just invoke blocking mq_receive(m_mq), but to get the promised logging we go tiny bit fancier as follows.
950 * Update: Now that we have to be interrupt_*()ible, also reuse wait_*() instead of using native
951 * blocking mq_*(). */
952
953 ssize_t n_rcvd;
954 unsigned int pri_ignored;
955
956 while (true)
957 {
958 if ((n_rcvd = mq_receive(m_mq.m_native_handle,
959 static_cast<char*>(blob->data()),
960 blob->size(), &pri_ignored)) >= 0)
961 {
962 *blob = Blob_mutable(blob->data(), n_rcvd);
963 FLOW_LOG_TRACE("Received message sized [" << n_rcvd << "].");
964 if (blob->size() != 0)
965 {
966 FLOW_LOG_DATA("Blob contents: [\n" << buffers_dump_string(*blob, " ") << "].");
967 }
968 err_code->clear();
969 return; // Instant success.
970 }
971 // else
972 if (errno != EAGAIN)
973 {
974 handle_mq_api_result(-1, err_code, "Posix_mq_handle::receive(): mq_receive(nb)");
975 assert(*err_code);
976 return;
977 }
978 // else if (would-block): as promised, INFO logs.
979
980 FLOW_LOG_TRACE("Posix_mq_handle [" << *this << "]: Nb-pop to blob @[" << blob->data() << "], "
981 "max-size [" << blob->size() << "]: would-block. Executing blocking-wait.");
982
983 wait_receivable(err_code);
984 if (*err_code)
985 {
986 // Whether interrupted or true error, we're done.
987 return;
988 }
989 // else
990
991 FLOW_LOG_TRACE("Blocking-wait reported transmissibility. Retrying.");
992 } // while (true)
993} // Posix_mq_handle::receive()
994
996 Error_code* err_code)
997{
998 using util::Blob_mutable;
999 using flow::Fine_clock;
1000 using flow::util::time_since_posix_epoch;
1001 using flow::util::buffers_dump_string;
1002 using boost::chrono::floor;
1003 using boost::chrono::round;
1004 using boost::chrono::seconds;
1005 using boost::chrono::microseconds;
1006 using boost::chrono::nanoseconds;
1007 using ::mq_receive;
1008 // using ::EAGAIN; // A macro apparently.
1009
1010 FLOW_ERROR_EXEC_AND_THROW_ON_ERROR(bool, Posix_mq_handle::timed_receive,
1011 blob, timeout_from_now, _1);
1012 // ^-- Call ourselves and return if err_code is null. If got to present line, err_code is not null.
1013
1014 // Similar to a combo of (blocking) receive() and (non-blocking) try_receive() -- keeping comments light if redundant.
1015
1016 assert((!m_mq.null())
1017 && "As advertised: timed_receive() => undefined behavior if not successfully cted or was moved-from.");
1018
1019 FLOW_LOG_TRACE("Posix_mq_handle [" << *this << "]: Blocking-timed-pop to blob @[" << blob->data() << "], "
1020 "max-size [" << blob->size() << "]; timeout ~[" << round<microseconds>(timeout_from_now) << "]. "
1021 "Trying nb-pop first; if it succeeds -- great. Else will wait/retry/wait/retry/....");
1022
1023 ssize_t n_rcvd;
1024 unsigned int pri_ignored;
1025
1026 auto now = Fine_clock::now();
1027 auto after = now;
1028
1029 while (true)
1030 {
1031 if ((n_rcvd = mq_receive(m_mq.m_native_handle,
1032 static_cast<char*>(blob->data()),
1033 blob->size(), &pri_ignored)) >= 0)
1034 {
1035 *blob = Blob_mutable(blob->data(), n_rcvd);
1036 FLOW_LOG_TRACE("Received message sized [" << n_rcvd << "].");
1037 if (blob->size() != 0)
1038 {
1039 FLOW_LOG_DATA("Blob contents: [\n" << buffers_dump_string(*blob, " ") << "].");
1040 }
1041 err_code->clear();
1042 break; // Instant success.
1043 }
1044 // else
1045 if (errno != EAGAIN)
1046 {
1047 handle_mq_api_result(-1, err_code, "Posix_mq_handle::timed_receive(): mq_receive(nb)");
1048 assert(*err_code);
1049 return false;
1050 }
1051 // else if (would-block): as promised, INFO logs.
1052
1053 FLOW_LOG_TRACE("Posix_mq_handle [" << *this << "]: Nb-pop to blob @[" << blob->data() << "], "
1054 "max-size [" << blob->size() << "]: would-block. Executing blocking-wait.");
1055
1056 timeout_from_now -= (after - now); // No-op the first time; after that reduces time left.
1057 const bool ready = timed_wait_receivable(timeout_from_now, err_code);
1058 if (*err_code)
1059 {
1060 // Whether interrupted or true error, we're done.
1061 return false;
1062 }
1063 // else:
1064
1065 if (!ready) // I.e., if (timed out).
1066 {
1067 FLOW_LOG_TRACE("Did not finish before timeout.");
1068 return false;
1069 }
1070 // else: successful wait for transmissibility. Try nb-transmitting again.
1071 FLOW_LOG_TRACE("Blocking-wait reported transmissibility. Retrying.");
1072
1073 after = Fine_clock::now();
1074 assert((after >= now) && "Fine_clock is supposed to never go backwards.");
1075 } // while (true)
1076
1077 return true;
1078} // Posix_mq_handle::timed_receive()
1079
1080template<bool SND_ELSE_RCV>
1082{
1083 using util::Blob_const;
1084
1085 assert((!m_mq.null())
1086 && "As advertised: interrupt_impl() => undefined behavior if not successfully cted or was moved-from.");
1087
1088 Pipe_writer* interrupter_ptr;
1089 bool* interrupting_ptr;
1090 if constexpr(SND_ELSE_RCV)
1091 {
1092 interrupter_ptr = &m_interrupter_snd; interrupting_ptr = &m_interrupting_snd;
1093 }
1094 else
1095 {
1096 interrupter_ptr = &m_interrupter_rcv; interrupting_ptr = &m_interrupting_rcv;
1097 }
1098 auto& interrupter = *interrupter_ptr;
1099 auto& interrupting = *interrupting_ptr;
1100
1101 if (interrupting)
1102 {
1103 FLOW_LOG_WARNING("Posix_mq_handle [" << *this << "]: Interrupt mode already ON for "
1104 "snd_else_rcv [" << SND_ELSE_RCV << "]. Ignoring.");
1105 return false;
1106 }
1107 // else
1108
1109 interrupting = true; // Reminder: This member is for the above guard *only*. wait_impl() relies on FD state.
1110 FLOW_LOG_INFO("Posix_mq_handle [" << *this << "]: Interrupt mode turning ON for "
1111 "snd_else_rcv [" << SND_ELSE_RCV << "].");
1112
1113 util::pipe_produce(get_logger(), &interrupter);
1114
1115 // Now any level-triggered poll-wait will detect that this mode is on.
1116 return true;
1117} // Posix_mq_handle::interrupt_impl()
1118
1119template<bool SND_ELSE_RCV>
1121{
1122 using util::Blob_mutable;
1123
1124 assert((!m_mq.null())
1125 && "As advertised: allow_impl() => undefined behavior if not successfully cted or was moved-from.");
1126
1127 // Inverse of interrupt_impl(). Keeping comments light.
1128
1129 Pipe_reader* interrupt_detector_ptr;
1130 bool* interrupting_ptr;
1131 if constexpr(SND_ELSE_RCV)
1132 {
1133 interrupt_detector_ptr = &m_interrupt_detector_snd; interrupting_ptr = &m_interrupting_snd;
1134 }
1135 else
1136 {
1137 interrupt_detector_ptr = &m_interrupt_detector_rcv; interrupting_ptr = &m_interrupting_rcv;
1138 }
1139 auto& interrupt_detector = *interrupt_detector_ptr;
1140 auto& interrupting = *interrupting_ptr;
1141
1142 if (!interrupting)
1143 {
1144 FLOW_LOG_WARNING("Posix_mq_handle [" << *this << "]: Interrupt mode already OFF for "
1145 "snd_else_rcv [" << SND_ELSE_RCV << "]. Ignoring.");
1146 return false;
1147 }
1148 // else
1149
1150 interrupting = false;
1151 FLOW_LOG_INFO("Posix_mq_handle [" << *this << "]: Interrupt mode turning OFF for "
1152 "snd_else_rcv [" << SND_ELSE_RCV << "].");
1153
1154 util::pipe_consume(get_logger(), &interrupt_detector);
1155
1156 // Now any level-triggered poll-wait will detect that this mode is off.
1157 return true;
1158} // Posix_mq_handle::allow_impl()
1159
1161{
1162 return interrupt_impl<true>();
1163}
1164
1166{
1167 return allow_impl<true>();
1168}
1169
1171{
1172 return interrupt_impl<false>();
1173}
1174
1176{
1177 return allow_impl<false>();
1178}
1179
1180bool Posix_mq_handle::wait_impl(util::Fine_duration timeout_from_now_or_none, bool snd_else_rcv, Error_code* err_code)
1181{
1182 using util::Fine_time_pt;
1183 using util::Fine_duration;
1184 using flow::util::time_since_posix_epoch;
1185 using boost::chrono::round;
1186 using boost::chrono::milliseconds;
1187 using boost::system::system_category;
1188 using boost::array;
1189 using ::epoll_wait;
1190 using Epoll_event = ::epoll_event;
1191
1192 FLOW_ERROR_EXEC_AND_THROW_ON_ERROR(bool, Posix_mq_handle::wait_impl, timeout_from_now_or_none, snd_else_rcv, _1);
1193 // ^-- Call ourselves and return if err_code is null. If got to present line, err_code is not null.
1194
1195 assert((!m_mq.null())
1196 && "As advertised: wait_impl() => undefined behavior if not successfully cted or was moved-from.");
1197
1198 /* By the way -- epoll_wait() takes # of milliseconds, so round up to that and no need to do crazy stuff like
1199 * timed_send() et al; just convert to milliseconds. */
1200 int epoll_timeout_from_now_ms;
1201 milliseconds epoll_timeout_from_now;
1202 if (timeout_from_now_or_none == Fine_duration::max())
1203 {
1204 epoll_timeout_from_now_ms = -1;
1205 FLOW_LOG_TRACE("Posix_mq_handle [" << *this << "]: Infinite-await-unstarved for "
1206 "snd_else_rcv [" << snd_else_rcv << "]. Will perform an epoll_wait().");
1207 }
1208 else
1209 {
1210 epoll_timeout_from_now = round<milliseconds>(timeout_from_now_or_none);
1211 FLOW_LOG_TRACE("Posix_mq_handle [" << *this << "]: Blocking-await/poll-unstarved for "
1212 "snd_else_rcv [" << snd_else_rcv << "]; timeout ~[" << epoll_timeout_from_now << "] -- "
1213 "if 0 then poll. Will perform an epoll_wait().");
1214 epoll_timeout_from_now_ms = int(epoll_timeout_from_now.count());
1215 }
1216
1217 array<Epoll_event, 2> evs; // Only one possible event (we choose 1 of 2 event sets).
1218 const auto epoll_result
1219 = epoll_wait((snd_else_rcv ? m_epoll_hndl_snd : m_epoll_hndl_rcv)
1220 .m_native_handle,
1221 evs.begin(), 1, epoll_timeout_from_now_ms);
1222 if (epoll_result == -1)
1223 {
1224 FLOW_LOG_WARNING("Posix_mq_handle [" << *this << "]: epoll_wait() yielded error. Details follow.");
1225
1226 const auto& sys_err_code = *err_code = Error_code(errno, system_category());
1227 FLOW_ERROR_SYS_ERROR_LOG_WARNING();
1228 return false;
1229 }
1230 // else
1231
1232 assert(epoll_result <= 2);
1233 if ((epoll_result == 2) // Max of 2 events; if interrupted then as promised disregard it being also transmissible.
1234 ||
1235 ((epoll_result == 1) // 1 event: need to check whether it's the interruptor as opposed to the actual queue.
1236 && (evs[0].data.fd != m_mq.m_native_handle)))
1237 {
1238 if (timeout_from_now_or_none == Fine_duration::max())
1239 {
1240 FLOW_LOG_INFO("Posix_mq_handle [" << *this << "]: Infinite-await-unstarved for "
1241 "snd_else_rcv [" << snd_else_rcv << "]: interrupted.");
1242 }
1243 else
1244 {
1245 FLOW_LOG_INFO("Posix_mq_handle [" << *this << "]: Blocking-await/poll-unstarved for "
1246 "snd_else_rcv [" << snd_else_rcv << "]; timeout ~[" << epoll_timeout_from_now << "] -- "
1247 "if 0 then poll: interrupted.");
1248 }
1249 *err_code = error::Code::S_INTERRUPTED;
1250 return false;
1251 }
1252 // else if (epoll_result <= 1): Not interrupted.
1253
1254 const bool success = epoll_result == 1;
1255 if (timeout_from_now_or_none == Fine_duration::max())
1256 {
1257 FLOW_LOG_TRACE("Posix_mq_handle [" << *this << "]: Infinite-await-unstarved for "
1258 "snd_else_rcv [" << snd_else_rcv << "]: succeeded? = [" << success << "].");
1259 }
1260 else
1261 {
1262 FLOW_LOG_TRACE("Posix_mq_handle [" << *this << "]: Blocking-await/poll-unstarved for "
1263 "snd_else_rcv [" << snd_else_rcv << "]; timeout ~[" << epoll_timeout_from_now << "] -- "
1264 "if 0 then poll: succeeded? = [" << success << "].");
1265 }
1266
1267 err_code->clear();
1268 return success;
1269} // Posix_mq_handle::wait_impl()
1270
1272{
1273 return wait_impl(util::Fine_duration::zero(), true, err_code);
1274}
1275
1277{
1278 wait_impl(util::Fine_duration::max(), true, err_code);
1279}
1280
1282{
1283 return wait_impl(timeout_from_now, true, err_code);
1284}
1285
1287{
1288 return wait_impl(util::Fine_duration::zero(), false, err_code);
1289}
1290
1292{
1293 wait_impl(util::Fine_duration::max(), false, err_code);
1294}
1295
1297{
1298 return wait_impl(timeout_from_now, false, err_code);
1299}
1300
1302{
1303 return m_mq;
1304}
1305
1306void Posix_mq_handle::remove_persistent(flow::log::Logger* logger_ptr, // Static.
1307 const Shared_name& absolute_name, Error_code* err_code)
1308{
1309 using boost::system::system_category;
1310 using ::mq_unlink;
1311
1312 if (flow::error::exec_void_and_throw_on_error
1313 ([&](Error_code* actual_err_code)
1314 { remove_persistent(logger_ptr, absolute_name, actual_err_code); },
1315 err_code, "Posix_mq_handle::remove_persistent()"))
1316 {
1317 return;
1318 }
1319 // else
1320
1321 FLOW_LOG_SET_CONTEXT(logger_ptr, Log_component::S_TRANSPORT);
1322
1323 FLOW_LOG_INFO("Posix_mq @ Shared_name[" << absolute_name << "]: Removing persistent MQ if possible.");
1324 if (mq_unlink(shared_name_to_mq_name(absolute_name).c_str()) == 0)
1325 {
1326 err_code->clear();
1327 return;
1328 }
1329 // else
1330
1331 FLOW_LOG_WARNING("Posix_mq @ Shared_name[" << absolute_name << "]: While removing persistent MQ:"
1332 "mq_unlink() yielded error. Details follow.");
1333 const auto& sys_err_code = *err_code = Error_code(errno, system_category());
1334 FLOW_ERROR_SYS_ERROR_LOG_WARNING();
1335} // Posix_mq_handle::remove_persistent()
1336
1338{
1339 using boost::system::system_category;
1340
1341 if (result == 0)
1342 {
1343 err_code->clear();
1344 return true;
1345 }
1346 // else
1347
1348 FLOW_LOG_WARNING("Posix_mq_handle [" << *this << "]: mq_*() yielded error; context = [" << context << "]. "
1349 "Details follow.");
1350 const auto& sys_err_code = *err_code
1351 = (errno == EMSGSIZE)
1352 ? error::Code::S_MQ_MESSAGE_SIZE_OVER_OR_UNDERFLOW // By contract must emit this specific code for this.
1353 : Error_code(errno, system_category()); // Otherwise whatever it was.
1354 FLOW_ERROR_SYS_ERROR_LOG_WARNING();
1355
1356 return false;
1357} // Posix_mq_handle::handle_mq_api_result()
1358
1360{
1361 return m_absolute_name;
1362}
1363
1364std::ostream& operator<<(std::ostream& os, const Posix_mq_handle& val)
1365{
1366 os << '@' << &val << ": sh_name[" << val.absolute_name() << "] native_handle[";
1367 const auto native_handle = val.native_handle();
1368 if (native_handle.null())
1369 {
1370 return os << "null]";
1371 }
1372 // else
1373 return os << native_handle << ']';
1374}
1375
1376namespace
1377{
1378
1379std::string shared_name_to_mq_name(const Shared_name& name)
1380{
1381 // Pre-pend slash. See `man mq_overview`.
1382 std::string mq_name("/");
1383 return mq_name += name.str();
1384}
1385
1386} // namespace (anon)
1387
1388} // namespace ipc::transport
Implements the Persistent_mq_handle concept by wrapping the POSIX message queue API (see man mq_overv...
bool allow_sends()
Implements Persistent_mq_handle API: Turn off preemptive/concurrent interruption of blocking-sends an...
bool interrupt_receives()
Implements Persistent_mq_handle API: Turn on preemptive/concurrent interruption of blocking-receives ...
bool interrupt_sends()
Implements Persistent_mq_handle API: Turn on preemptive/concurrent interruption of blocking-sends and...
util::Pipe_reader Pipe_reader
Short-hand for anonymous pipe read end.
void wait_receivable(Error_code *err_code=0)
Implements Persistent_mq_handle API: Like receive() but without the actual popping of a message.
static const Shared_name S_RESOURCE_TYPE_ID
Implements concept API.
bool handle_mq_api_result(int result, Error_code *err_code, util::String_view context) const
Helper that handles the result of an mq_*() call by logging WARNING(s) on error; setting *err_code on...
Pipe_reader m_interrupt_detector_snd
A byte is read from this end by allow_sends() to make it not-readable for the poll-wait in wait_impl(...
Native_handle m_epoll_hndl_snd
epoll_*() handle (.null() if and only if m_mq is null) that is level-triggered to be active (with onl...
bool is_receivable(Error_code *err_code=0)
Implements Persistent_mq_handle API: Like try_receive() but without the actual popping of a message.
bool allow_receives()
Implements Persistent_mq_handle API: Turn off preemptive/concurrent interruption of blocking-receives...
bool is_sendable(Error_code *err_code=0)
Implements Persistent_mq_handle API: Like try_send() but without the actual pushing of a message.
size_t max_msg_size() const
Implements Persistent_mq_handle API: Returns the max message size of the underlying queue.
Pipe_reader m_interrupt_detector_rcv
Other-direction counterpart to m_interrupt_detector_snd.
Error_code epoll_setup()
Ctor helper that sets up m_epoll_hndl_snd and m_epoll_hndl_rcv.
util::Pipe_writer Pipe_writer
Short-hand for anonymous pipe write end.
size_t max_n_msgs() const
Implements Persistent_mq_handle API: Returns the max message count of the underlying queue.
bool try_send(const util::Blob_const &blob, Error_code *err_code=0)
Implements Persistent_mq_handle API: Non-blocking send: pushes copy of message to queue and returns t...
Pipe_writer m_interrupter_rcv
Other-direction counterpart to m_interrupter_snd.
Native_handle native_handle() const
Implements Persistent_mq_handle API: Returns the stored native MQ handle; null if not open.
bool timed_wait_receivable(util::Fine_duration timeout_from_now, Error_code *err_code=0)
Implements Persistent_mq_handle API: Like timed_receive() but without the actual popping of a message...
bool wait_impl(util::Fine_duration timeout_from_now_or_none, bool snd_else_rcv, Error_code *err_code)
Impl body for *_sendable() and *_receivable().
friend void swap(Posix_mq_handle &val1, Posix_mq_handle &val2)
Implements Persistent_mq_handle related concept: Swaps two objects.
static void remove_persistent(flow::log::Logger *logger_ptr, const Shared_name &name, Error_code *err_code=0)
Implements Persistent_mq_handle API: Removes the named persistent MQ.
void receive(util::Blob_mutable *blob, Error_code *err_code=0)
Implements Persistent_mq_handle API: Blocking receive: pops copy of message from queue into buffer; i...
void send(const util::Blob_const &blob, Error_code *err_code=0)
Implements Persistent_mq_handle API: Blocking send: pushes copy of message to queue; if queue is full...
Posix_mq_handle()
Implements Persistent_mq_handle API: Construct null handle.
bool m_interrupting_snd
Starting at false, this is made true via interrupt_sends(), and back by allow_sends(); acts as a guar...
const Shared_name & absolute_name() const
Implements Persistent_mq_handle API: Returns name equal to absolute_name passed to ctor.
Native_handle m_epoll_hndl_rcv
epoll_*() handle (.null() if and only if m_mq is null) that is level-triggered to be active (with onl...
bool allow_impl()
Impl body for allow_*().
bool interrupt_impl()
Impl body for interrupt_*().
bool timed_wait_sendable(util::Fine_duration timeout_from_now, Error_code *err_code=0)
Implements Persistent_mq_handle API: Like timed_send() but without the actual pushing of a message.
bool timed_receive(util::Blob_mutable *blob, util::Fine_duration timeout_from_now, Error_code *err_code=0)
Implements Persistent_mq_handle API: Blocking timed receive: pops copy of message from queue into buf...
~Posix_mq_handle()
Implements Persistent_mq_handle API: Destroys this handle (or no-op if no handle was successfully con...
bool set_non_blocking(bool nb, Error_code *err_code)
Sets m_mq to blocking or non-blocking and returns true on success and clears *err_code; otherwise ret...
Shared_name m_absolute_name
See absolute_name().
bool timed_send(const util::Blob_const &blob, util::Fine_duration timeout_from_now, Error_code *err_code=0)
Implements Persistent_mq_handle API: Blocking timed send: pushes copy of message to queue; if queue i...
bool try_receive(util::Blob_mutable *blob, Error_code *err_code=0)
Implements Persistent_mq_handle API: Non-blocking receive: pops copy of message from queue into buffe...
Error_code pipe_setup()
Ctor helper that sets up m_interrupt* pipe items.
void wait_sendable(Error_code *err_code=0)
Implements Persistent_mq_handle API: Like send() but without the actual pushing of a message.
bool m_interrupting_rcv
Other-direction counterpart to m_interrupting_snd.
Pipe_writer m_interrupter_snd
A byte is written to this end by interrupt_sends() to make it readable for the poll-wait in wait_impl...
Native_handle m_mq
Underlying MQ handle.
flow::util::Task_engine m_nb_task_engine
Never used for .run() or .async() – just so we can construct Pipe_reader, Pipe_writer.
Posix_mq_handle & operator=(Posix_mq_handle &&src)
Implements Persistent_mq_handle API: Replaces handle with the source handle while making the latter i...
String-wrapping abstraction representing a name uniquely distinguishing a kernel-persistent entity fr...
static Shared_name ct(const Source &src)
Copy-constructs from a char-sequence container (including string, util::String_view,...
void clear()
Makes it so empty() == true.
const std::string & str() const
Returns (sans copying) ref to immutable entire wrapped name string, suitable to pass into sys calls w...
@ S_INTERRUPTED
A blocking operation was intentionally interrupted or preemptively canceled.
@ S_MQ_MESSAGE_SIZE_OVER_OR_UNDERFLOW
Low-level message queue send-op buffer overflow (> max size) or receive-op buffer underflow (< max si...
Flow-IPC module providing transmission of structured messages and/or low-level blobs (and more) betwe...
util::Shared_name Shared_name
Convenience alias for the commonly used type util::Shared_name.
std::ostream & operator<<(std::ostream &os, const Bipc_mq_handle &val)
Prints string representation of the given Bipc_mq_handle to the given ostream.
void swap(Posix_mq_handle &val1, Posix_mq_handle &val2)
Implements Persistent_mq_handle related concept: Swaps two objects.
util::Native_handle Native_handle
Convenience alias for the commonly used type util::Native_handle.
void swap(Bipc_mq_handle &val1, Bipc_mq_handle &val2)
Implements Persistent_mq_handle related concept: Swaps two objects.
bipc::permissions Permissions
Short-hand for Unix (POSIX) permissions class.
Definition: util_fwd.hpp:155
const uint8_t * blob_data(const Blob_const &blob)
Syntactic-sugary helper that returns pointer to first byte in an immutable buffer,...
Definition: util.cpp:156
bipc::open_only_t Open_only
Tag type indicating an ideally-atomic open-if-exists-else-fail operation.
Definition: util_fwd.hpp:149
void pipe_produce(flow::log::Logger *logger_ptr, Pipe_writer *pipe)
Writes a byte to the given pipe writer.
Definition: util.cpp:67
const Open_or_create OPEN_OR_CREATE
Tag value indicating an open-if-exists-else-create operation.
Definition: util.cpp:30
boost::asio::mutable_buffer Blob_mutable
Short-hand for an mutable blob somewhere in memory, stored as exactly a void* and a size_t.
Definition: util_fwd.hpp:134
void pipe_consume(flow::log::Logger *logger_ptr, Pipe_reader *pipe)
Reads a byte via the given pipe reader.
Definition: util.cpp:96
boost::asio::writable_pipe Pipe_writer
Short-hand for anonymous pipe write end.
Definition: util_fwd.hpp:32
bipc::open_or_create_t Open_or_create
Tag type indicating an atomic open-if-exists-else-create operation.
Definition: util_fwd.hpp:146
bipc::create_only_t Create_only
Tag type indicating a create-unless-exists-else-fail operation.
Definition: util_fwd.hpp:152
flow::Fine_time_pt Fine_time_pt
Short-hand for Flow's Fine_time_pt.
Definition: util_fwd.hpp:113
boost::asio::readable_pipe Pipe_reader
Short-hand for anonymous pipe read end.
Definition: util_fwd.hpp:35
void set_resource_permissions(flow::log::Logger *logger_ptr, const fs::path &path, const Permissions &perms, Error_code *err_code)
Utility that sets the permissions of the given resource (at the supplied file system path) to specifi...
Definition: util.cpp:46
flow::Fine_duration Fine_duration
Short-hand for Flow's Fine_duration.
Definition: util_fwd.hpp:111
const Create_only CREATE_ONLY
Tag value indicating an atomic create-unless-exists-else-fail operation.
Definition: util.cpp:32
boost::asio::const_buffer Blob_const
Short-hand for an immutable blob somewhere in memory, stored as exactly a void const * and a size_t.
Definition: util_fwd.hpp:128
flow::util::String_view String_view
Short-hand for Flow's String_view.
Definition: util_fwd.hpp:109
Log_component
The flow::log::Component payload enumeration containing various log components used by Flow-IPC inter...
Definition: common.hpp:322
flow::Error_code Error_code
Short-hand for flow::Error_code which is very common.
Definition: common.hpp:297
A monolayer-thin wrapper around a native handle, a/k/a descriptor a/k/a FD.
bool null() const
Returns true if and only if m_native_handle equals S_NULL_HANDLE.
handle_t m_native_handle
The native handle (possibly equal to S_NULL_HANDLE), the exact payload of this Native_handle.