Flow-IPC 2.0.0
Flow-IPC project: Full implementation reference.
posix_mq_handle.cpp
Go to the documentation of this file.
1/* Flow-IPC: Core
2 * Copyright 2023 Akamai Technologies, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the
5 * "License"); you may not use this file except in
6 * compliance with the License. You may obtain a copy
7 * of the License at
8 *
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in
12 * writing, software distributed under the License is
13 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
14 * CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing
16 * permissions and limitations under the License. */
17
18/// @file
22#include <flow/error/error.hpp>
23#include <boost/chrono/floor.hpp>
24#include <boost/array.hpp>
25
26namespace ipc::transport
27{
28
29namespace
30{
31
32/**
33 * File-local helper: takes Shared_name; returns name suitable for `mq_open()` and buddies.
34 * @param name
35 * Shared_name.
36 * @return See above.
37 */
38std::string shared_name_to_mq_name(const Shared_name& name);
39
40}; // namespace (anon)
41
42// Initializers.
43
45
46// Implementations.
47
49 m_interrupting_snd(false), // (Not algorithmically needed as of this writing, but avoid some sanitizer complaints.)
50 m_interrupting_rcv(false), // (Ditto. E.g., swap() through clang's UBSAN => complains it's uninitialized.)
51 m_interrupter_snd(m_nb_task_engine),
52 m_interrupt_detector_snd(m_nb_task_engine),
53 m_interrupter_rcv(m_nb_task_engine),
54 m_interrupt_detector_rcv(m_nb_task_engine)
55{
56 // Done.
57}
58
59template<typename Mode_tag>
60Posix_mq_handle::Posix_mq_handle(Mode_tag, flow::log::Logger* logger_ptr, const Shared_name& absolute_name_arg,
61 size_t max_n_msg, size_t max_msg_sz,
62 const util::Permissions& perms, Error_code* err_code) :
63 flow::log::Log_context(logger_ptr, Log_component::S_TRANSPORT),
64 m_absolute_name(absolute_name_arg),
65 m_interrupting_snd(false),
66 m_interrupting_rcv(false),
67 m_interrupter_snd(m_nb_task_engine),
68 m_interrupt_detector_snd(m_nb_task_engine),
69 m_interrupter_rcv(m_nb_task_engine),
70 m_interrupt_detector_rcv(m_nb_task_engine)
71{
73 using flow::error::Runtime_error;
74 using flow::log::Sev;
75 using boost::io::ios_all_saver;
76 using boost::system::system_category;
77 using ::mq_open;
78 using ::mq_close;
79 using ::mq_attr;
80 // using ::O_RDWR; // A macro apparently.
81 // using ::O_CREAT; // A macro apparently.
82 // using ::O_EXCL; // A macro apparently.
83
84 assert(max_n_msg >= 1);
85 assert(max_msg_sz >= 1);
86
87 static_assert(std::is_same_v<Mode_tag, util::Create_only> || std::is_same_v<Mode_tag, util::Open_or_create>,
88 "Can only delegate to this ctor with Mode_tag = Create_only or Open_or_create.");
89 constexpr bool CREATE_ONLY_ELSE_MAYBE = std::is_same_v<Mode_tag, util::Create_only>;
90 constexpr char const * MODE_STR = CREATE_ONLY_ELSE_MAYBE ? "create-only" : "open-or-create";
91
92 if (logger_ptr && logger_ptr->should_log(Sev::S_TRACE, get_log_component()))
93 {
94 ios_all_saver saver{*(logger_ptr->this_thread_ostream())}; // Revert std::oct/etc. soon.
95
96 FLOW_LOG_TRACE_WITHOUT_CHECKING
97 ("Posix_mq_handle [" << *this << "]: Constructing MQ handle to MQ at name [" << absolute_name() << "] in "
98 "[" << MODE_STR << "] mode; max msg size [" << max_msg_sz << "] x [" << max_n_msg << "] msgs; "
99 "perms = [" << std::setfill('0') << std::setw(4) << std::oct << perms.get_permissions() << "].");
100 }
101
102 // Get pipe stuff out of the way, as it does not need m_mq.
103 auto sys_err_code = pipe_setup();
104
105 if (!sys_err_code)
106 {
107 /* Naively: Use mq_open() to both create-or-open/exclusively-create and to set mode `perms`.
108 * In reality though there are two subtleties that make it tougher than that. Fortunately this has been
109 * adjudicated out there, and internal boost.interprocess code very nicely confirms that adjudication/sets
110 * a sufficiently-authoritative precedent.
111 *
112 * Subtlety 1 is that mq_open(), like all POSIX/Linux ::open()-y calls, doesn't listen to `perms` verbatim
113 * but rather makes it subject to the process umask. The proper way (citation omitted) to deal with it is
114 * to follow it up with ::[f]chmod() or equivalent, with the resource descriptor if available (or path if not).
115 * (We do have the descriptor -- m_mq -- and we have a util::set_resource_permissions() overload for this.)
116 *
117 * Subtlety 2 is that we should only do that last thing if, indeed, we are *creating* the resource (MQ);
118 * if CREATE_ONLY_ELSE_MAYBE==false, and it already exists, then we mustn't do any such thing; we've succeeded.
119 * Sadly there's no (atomic/direct) way to find out whether mq_open() (in ==false case) did in fact create
120 * the thing. Therefore, we have to do things in a kludgy way -- that is, nevertheless, reasonable in practice
121 * and indeed is validated by bipc's use of it internally (search headers for `case ipcdetail::DoOpenOrCreate:`
122 * followed by shm_open(), fchmod(), etc.). We explain it here for peace of mind.
123 *
124 * In the CREATE_ONLY_ELSE_MAYBE=true case, we can just do what we'd do anyway -- shm_open() in create-only
125 * mode; then set_resource_permissions(); and that's that. Otherwise:
126 *
127 * We can no longer use the built-in atomic create-or-open mode (O_CREAT sans O_EXCL). Instead, we logically
128 * split it into two parts:try to create-only (O_CREAT|O_EXCL); if it succeeds, done (can
129 * set_resource_permissions()); if it fails with file-exists error, then mq_open() again, this time in open-only
130 * mode (and no need for set_resource_permissions()). Great! The problem, of course, is that this no longer uses a
131 * guaranteed-atomic open-or-create OS-call semantic. Namely it means set_resource_permissions() could fail due to
132 * file-not-found, because during the non-atomic gap the MQ got removed by someone. Then we spin-lock (in a way)
133 * through it: just try the whole procedure again. Sooner or later -- assuming no other errors of course --
134 * either the first mq_open() will succeed, or it'll "fail" with file-exists, yet the 2nd mq_open() will succeed.
135 * Pegging processor is not a serious concern in this context. */
136
137 const auto mq_name = shared_name_to_mq_name(absolute_name());
138 const auto do_mq_open_func = [&](bool create_else_open) -> bool // Just a helper.
139 {
140 /* Note: O_NONBLOCK is something we are forced to set/unset depending on the transmission API being called.
141 * So just don't worry about it here. */
142 ::mqd_t raw;
143 if (create_else_open)
144 {
145 mq_attr attr;
146 attr.mq_maxmsg = max_n_msg;
147 attr.mq_msgsize = max_msg_sz;
148
149 raw = mq_open(mq_name.c_str(),
150 O_RDWR | O_CREAT | O_EXCL, // Create-only always: per above. Can't use create-or-open.
151 perms.get_permissions(),
152 &attr);
153 }
154 else
155 {
156 raw = mq_open(mq_name.c_str(), O_RDWR);
157 }
158 if (raw != -1)
159 {
160 m_mq = Native_handle(raw);
161 }
162
163 if (m_mq.null())
164 {
165 sys_err_code = Error_code(errno, system_category());
166 return false;
167 }
168 // else
169 return true;
170 }; // do_mq_open_func =
171
172 if (CREATE_ONLY_ELSE_MAYBE)
173 {
174 // Simple case.
175 if (do_mq_open_func(true))
176 {
177 set_resource_permissions(get_logger(), m_mq, perms, &sys_err_code);
178 }
179 // sys_err_code is either success or failure. Fall through.
180 }
181 else // if (!CREATE_ONLY_ELSE_MAYBE)
182 {
183 // Complicated case.
184
185 bool success = false; // While this remains false, sys_err_code indicates whether it's false b/c of fatal error.
186 do
187 {
188 if (do_mq_open_func(true))
189 {
190 // Created! Only situation where we must indeed set_resource_permissions().
191 set_resource_permissions(get_logger(), m_mq, perms, &sys_err_code);
192 success = !sys_err_code;
193 continue; // `break`, really. One of success or sys_err_code is true (latter <= s_r_p() failed).
194 }
195 // else if (!do_mq_open_func(true)) // I.e., it failed for some reason.
196 if (sys_err_code != boost::system::errc::file_exists)
197 {
198 // Real error. GTFO.
199 continue; // `break;`, really (sys_err_code==true).
200 }
201 // else if (file_exists): Create failed, because it already exists. Open the existing guy!
202
203 if (do_mq_open_func(false))
204 {
205 // Opened!
206 success = true;
207 continue; // `break;`, really (success==true).
208 }
209 // else if (!do_mq_open_func(false)) // I.e., it failed for some reason.
210 if (sys_err_code != boost::system::errc::no_such_file_or_directory)
211 {
212 // Real error. GTFO.
213 continue; // `break;`, really (sys_err_code==true).
214 }
215 // else if (no_such_file_or_directory): Open failed, because MQ *just* got unlinked. Try again!
216
217 // This is fun and rare enough to warrant an INFO message.
218 if (logger_ptr && logger_ptr->should_log(Sev::S_INFO, get_log_component()))
219 {
220 ios_all_saver saver{*(logger_ptr->this_thread_ostream())}; // Revert std::oct/etc. soon.
221 FLOW_LOG_INFO_WITHOUT_CHECKING
222 ("Posix_mq_handle [" << *this << "]: Create-or-open algorithm encountered the rare concurrency: "
223 "MQ at name [" << absolute_name() << "] existed during the create-only mq_open() but disappeared "
224 "before we were able to complete open-only mq_open(). Retrying in spin-lock fashion. "
225 "Details: max msg size [" << max_msg_sz << "] x [" << max_n_msg << "] msgs; "
226 "perms = [" << std::setfill('0') << std::setw(4) << std::oct << perms.get_permissions() << "].");
227 }
228
229 sys_err_code.clear(); // success remains false, and that wasn't a fatal error, so ensure loop continues.
230 }
231 while ((!success) && (!sys_err_code));
232
233 // Now just encode success-or-not entirely in sys_err_code's truthiness.
234 if (success)
235 {
236 sys_err_code.clear();
237 }
238 // else { sys_err_code is the reason loop exited already. }
239
240 // Now fall through.
241 } // else if (!CREATE_ONLY_ELSE_MAYBE)
242
243 // We never use blocking transmission -- always using *wait_*able() so that interrupt_*() works. Non-blocking 4eva.
244 if ((!sys_err_code) && (!set_non_blocking(true, &sys_err_code)))
245 {
246 assert(sys_err_code);
247
248 // Clean up.
249
250 // Disregard any error. In Linux, by the way, only EBADF is possible apparently; should be fine.
251 mq_close(m_mq.m_native_handle);
253 }
254
255 if (!sys_err_code)
256 {
257 sys_err_code = epoll_setup(); // It cleans up everything if it fails.
258 }
259 } // if (!sys_err_code) (but might have become true inside)
260
261 if (sys_err_code)
262 {
263 if (logger_ptr && logger_ptr->should_log(Sev::S_WARNING, get_log_component()))
264 {
265 ios_all_saver saver{*(logger_ptr->this_thread_ostream())}; // Revert std::oct/etc. soon.
266
267 FLOW_LOG_WARNING_WITHOUT_CHECKING
268 ("Posix_mq_handle [" << *this << "]: mq_open() or set_resource_permissions() error (if the latter, details "
269 "above and repeated below; otherwise error details only follow) while "
270 "constructing MQ handle to MQ at name [" << absolute_name() << "] in "
271 "create-only mode; max msg size [" << max_msg_sz << "] x [" << max_n_msg << "] msgs; "
272 "perms = [" << std::setfill('0') << std::setw(4) << std::oct << perms.get_permissions() << "].");
273 }
274 FLOW_ERROR_SYS_ERROR_LOG_WARNING();
275
276 if (err_code)
277 {
278 *err_code = sys_err_code;
279 }
280 else
281 {
282 throw Runtime_error(sys_err_code, FLOW_UTIL_WHERE_AM_I_STR());
283 }
284 } // if (sys_err_code)
285 // else { Cool! }
286} // Posix_mq_handle::Posix_mq_handle(Mode_tag)
287
288Posix_mq_handle::Posix_mq_handle(flow::log::Logger* logger_ptr, const Shared_name& absolute_name_arg,
289 util::Create_only, size_t max_n_msg, size_t max_msg_sz,
290 const util::Permissions& perms, Error_code* err_code) :
291 Posix_mq_handle(util::CREATE_ONLY, logger_ptr, absolute_name_arg, max_n_msg, max_msg_sz, perms, err_code)
292{
293 // Cool.
294}
295
296Posix_mq_handle::Posix_mq_handle(flow::log::Logger* logger_ptr, const Shared_name& absolute_name_arg,
297 util::Open_or_create, size_t max_n_msg, size_t max_msg_sz,
298 const util::Permissions& perms, Error_code* err_code) :
299 Posix_mq_handle(util::OPEN_OR_CREATE, logger_ptr, absolute_name_arg, max_n_msg, max_msg_sz, perms, err_code)
300{
301 // Cool.
302}
303
304Posix_mq_handle::Posix_mq_handle(flow::log::Logger* logger_ptr, const Shared_name& absolute_name_arg,
305 util::Open_only, Error_code* err_code) :
306 flow::log::Log_context(logger_ptr, Log_component::S_TRANSPORT),
307 m_absolute_name(absolute_name_arg),
308 m_interrupting_snd(false),
309 m_interrupting_rcv(false),
310 m_interrupter_snd(m_nb_task_engine),
311 m_interrupt_detector_snd(m_nb_task_engine),
312 m_interrupter_rcv(m_nb_task_engine),
313 m_interrupt_detector_rcv(m_nb_task_engine)
314{
315 using flow::log::Sev;
316 using flow::error::Runtime_error;
317 using boost::system::system_category;
318 using ::mq_open;
319 // using ::O_RDWR; // A macro apparently.
320
321 FLOW_LOG_TRACE
322 ("Posix_mq_handle [" << *this << "]: Constructing MQ handle to MQ at name [" << absolute_name() << "] in "
323 "open-only mode.");
324
325 // Get pipe stuff out of the way, as it does not need m_mq.
326 auto sys_err_code = pipe_setup();
327
328 if (!sys_err_code)
329 {
330 /* Note: O_NONBLOCK is something we are forced to set/unset depending on the transmission API being called.
331 * So just don't worry about it here. */
332 const auto raw = mq_open(shared_name_to_mq_name(absolute_name()).c_str(), O_RDWR);
333 if (raw != -1)
334 {
335 m_mq = Native_handle(raw);
336 }
337
338 if (m_mq.null())
339 {
340 FLOW_LOG_WARNING
341 ("Posix_mq_handle [" << *this << "]: mq_open() error (error details follow) while "
342 "constructing MQ handle to MQ at name [" << absolute_name() << "] in open-only mode.");
343 sys_err_code = Error_code(errno, system_category());
344 }
345 else
346 {
347 // We never use blocking transmission -- always using *wait_*able() => interrupt_*() works. Non-blocking 4eva.
348 if (!set_non_blocking(true, &sys_err_code))
349 {
350 assert(sys_err_code);
351
352 // Clean up.
353
354 // Disregard any error. In Linux, by the way, only EBADF is possible apparently; it's fine.
355 mq_close(m_mq.m_native_handle);
357 }
358 else
359 {
360 sys_err_code = epoll_setup(); // It logged on error; also then it cleaned up everything.
361 }
362 } // if (!m_mq.null()) (but it may have become null, and sys_err_code therefore truthy, inside)
363 } // if (!sys_err_code) (but it may have become truthy inside)
364
365 if (sys_err_code)
366 {
367 FLOW_ERROR_SYS_ERROR_LOG_WARNING();
368
369 if (err_code)
370 {
371 *err_code = sys_err_code;
372 }
373 else
374 {
375 throw Runtime_error(sys_err_code, FLOW_UTIL_WHERE_AM_I_STR());
376 }
377 }
378 // else { Cool! }
379} // Posix_mq_handle::Posix_mq_handle(Open_only)
380
382{
383 using boost::asio::connect_pipe;
384
385 Error_code sys_err_code;
386 connect_pipe(m_interrupt_detector_snd, m_interrupter_snd, sys_err_code);
387 if (!sys_err_code)
388 {
389 connect_pipe(m_interrupt_detector_rcv, m_interrupter_rcv, sys_err_code);
390 }
391
392 if (sys_err_code)
393 {
394 FLOW_LOG_WARNING
395 ("Posix_mq_handle [" << *this << "]: Constructing MQ handle to MQ at name [" << absolute_name() << "]: "
396 "connect-pipe failed. Details follow.");
397 FLOW_ERROR_SYS_ERROR_LOG_WARNING();
398
399 // Clean up first guys (might be no-op).
400 Error_code sink;
401 m_interrupter_snd.close(sink);
402 m_interrupt_detector_snd.close(sink);
403 }
404
405 return sys_err_code;
406} // Posix_mq_handle::pipe_setup()
407
409{
410 using boost::system::system_category;
411 using ::epoll_create1;
412 using ::epoll_ctl;
413 using ::mq_close;
414 using ::close;
415 using Epoll_event = ::epoll_event;
416 // using ::EPOLL_CTL_ADD; // A macro apparently.
417 // using ::EPOLLIN; // A macro apparently.
418
419 /* m_mq is good to go.
420 *
421 * Set up one epoll handle for checking for writability; another for readability.
422 * Into each one add an interrupter FD (readability of read end of an anonymous pipe).
423 *
424 * There's some lame cleanup logic below specific to the 2-handle situation. It's okay. Just, if setting
425 * up the outgoing-direction epolliness fails, it cleans up itself. If that succeeds, but incoming-direction
426 * setup then fails, then after it cleans up after itself, we have to undo the outgoing-direction setup. */
427
428 Error_code sys_err_code;
429
430 const auto setup = [&](Native_handle* epoll_hndl_ptr, bool snd_else_rcv)
431 {
432 auto& epoll_hndl = *epoll_hndl_ptr;
433 auto& interrupt_detector = snd_else_rcv ? m_interrupt_detector_snd : m_interrupt_detector_rcv;
434
435 epoll_hndl = Native_handle(epoll_create1(0));
436 if (epoll_hndl.m_native_handle == -1)
437 {
438 FLOW_LOG_WARNING("Posix_mq_handle [" << *this << "]: Created MQ handle fine, but epoll_create1() failed; "
439 "details follow.");
440 sys_err_code = Error_code(errno, system_category());
441
442 // Clean up.
443
444 epoll_hndl = Native_handle(); // No-op as of this writing, but just to keep it maintainable do it anyway.
445
446 Error_code sink;
447 m_interrupt_detector_snd.close(sink);
448 m_interrupter_snd.close(sink);
449 m_interrupt_detector_rcv.close(sink);
450 m_interrupter_rcv.close(sink);
451
452 // Disregard any error. In Linux, by the way, only EBADF is possible apparently; should be fine.
453 mq_close(m_mq.m_native_handle);
455 return;
456 }
457 // else if (epoll_hndl.m_native_handle != -1)
458
459 // Each epoll_wait() will listen for transmissibility of the queue itself + readability of interrupt-pipe.
460 Epoll_event event_of_interest1;
461 event_of_interest1.events = snd_else_rcv ? EPOLLOUT : EPOLLIN;
462 event_of_interest1.data.fd = m_mq.m_native_handle;
463 Epoll_event event_of_interest2;
464 event_of_interest2.events = EPOLLIN;
465 event_of_interest2.data.fd = interrupt_detector.native_handle();
466 if ((epoll_ctl(epoll_hndl.m_native_handle, EPOLL_CTL_ADD, event_of_interest1.data.fd, &event_of_interest1) == -1) ||
467 (epoll_ctl(epoll_hndl.m_native_handle, EPOLL_CTL_ADD, event_of_interest2.data.fd, &event_of_interest2) == -1))
468 {
469 FLOW_LOG_WARNING("Posix_mq_handle [" << *this << "]: Created MQ handle fine, but an epoll_ctl() failed; "
470 "snd_else_rcv = [" << snd_else_rcv << "]; details follow.");
471 sys_err_code = Error_code(errno, system_category());
472
473 // Clean up everything.
474 close(epoll_hndl.m_native_handle);
475 epoll_hndl = Native_handle();
476 // Disregard any error. In Linux, by the way, only EBADF is possible apparently; should be fine.
477 mq_close(m_mq.m_native_handle);
479 return;
480 }
481 }; // const auto setup =
482
483 setup(&m_epoll_hndl_snd, true);
484 if (!sys_err_code)
485 {
486 setup(&m_epoll_hndl_rcv, false);
487 if (sys_err_code)
488 {
489 // Have to undo first setup(), except m_mq+pipes cleanup was already done by 2nd setup().
492 }
493 }
494 // else { 1st setup() cleaned everything up. }
495
496 return sys_err_code;
497} // Posix_mq_handle::epoll_setup()
498
501{
502 operator=(std::move(src));
503}
504
506{
507 using ::mq_close;
508 using ::close;
509
510 if (m_mq.null())
511 {
512 FLOW_LOG_TRACE("Posix_mq_handle [" << *this << "]: No MQ handle to close in destructor.");
513 assert(m_epoll_hndl_snd.null());
514 assert(m_epoll_hndl_rcv.null());
515 }
516 else
517 {
518 FLOW_LOG_TRACE("Posix_mq_handle [" << *this << "]: Closing MQ handle (and epoll set).");
519 // Disregard any error. In Linux, by the way, only EBADF is possible apparently; we should be fine.
520 mq_close(m_mq.m_native_handle);
521
522 assert(!m_epoll_hndl_snd.null());
523 assert(!m_epoll_hndl_rcv.null());
524 close(m_epoll_hndl_snd.m_native_handle); // Ditto regarding errors.
526 }
527} // Posix_mq_handle::~Posix_mq_handle()
528
530{
531 using std::swap;
532
533 if (&src != this)
534 {
543
544 swap(*this, src);
545 }
546 return *this;
547}
548
550{
551 using util::Pipe_writer;
552 using util::Pipe_reader;
553 using flow::log::Log_context;
554 using std::swap;
555 using boost::array;
556
557 // This is a bit faster than un-specialized std::swap() which would require a move ction + 3 move assignments.
558
559 swap(static_cast<Log_context&>(val1), static_cast<Log_context&>(val2));
560 swap(val1.m_mq, val2.m_mq);
566
567 /* This is annoying! Maybe we should just wrap all these in unique_ptr<>s to avoid this nonsense.
568 * As it is, it might be unsafe to swap asio objects apart from their Task_engines, so doing this blech. */
569 try
570 {
571 array<Native_handle, 4> fds1;
572 array<Native_handle, 4> fds2;
573 const auto unload = [&](Posix_mq_handle& val, auto& fds)
574 {
575 if (val.m_interrupter_snd.is_open())
576 {
577 fds[0].m_native_handle = val.m_interrupter_snd.release();
578 }
579 if (val.m_interrupter_rcv.is_open())
580 {
581 fds[1].m_native_handle = val.m_interrupter_rcv.release();
582 }
583 if (val.m_interrupt_detector_snd.is_open())
584 {
585 fds[2].m_native_handle = val.m_interrupt_detector_snd.release();
586 }
587 if (val.m_interrupt_detector_rcv.is_open())
588 {
589 fds[3].m_native_handle = val.m_interrupt_detector_rcv.release();
590 }
591 };
592 unload(val1, fds1);
593 unload(val2, fds2);
594
595 const auto reload = [&](Posix_mq_handle& val, auto& fds)
596 {
597 // Leave their stupid task engines in-place. Do need to reassociate them with the swapped FDs though.
599 = fds[0].null() ? Pipe_writer(val.m_nb_task_engine)
600 : Pipe_writer(val.m_nb_task_engine, fds[0].m_native_handle);
602 = fds[1].null() ? Pipe_writer(val.m_nb_task_engine)
603 : Pipe_writer(val.m_nb_task_engine, fds[1].m_native_handle);
605 = fds[2].null() ? Pipe_reader(val.m_nb_task_engine)
606 : Pipe_reader(val.m_nb_task_engine, fds[2].m_native_handle);
608 = fds[3].null() ? Pipe_reader(val.m_nb_task_engine)
609 : Pipe_reader(val.m_nb_task_engine, fds[3].m_native_handle);
610 };
611 reload(val1, fds2); // Swap 'em.
612 reload(val2, fds1);
613 }
614 catch (...) // .release() and ctor can throw, and they really shouldn't at all.
615 {
616 assert(false && "Horrible exception."); std::abort();
617 }
618} // swap()
619
621{
622 using ::mq_attr;
623 using ::mq_getattr;
624
625 assert((!m_mq.null())
626 && "As advertised: max_msg_size() => undefined behavior if not successfully cted or was moved-from.");
627
628 mq_attr attr;
629 if (mq_getattr(m_mq.m_native_handle, &attr) != 0)
630 {
631 /* We could handle error gracefully, as we do elsewhere, but EBADF is the only possibility, and there's zero
632 * reason it should happen unles m_mq is null at this point. It'd be fine to handle it, but then we'd have
633 * to emit an Error_code, and the API would change, and the user would have more to worry about; I (ygoldfel)
634 * decided it's overkill. So if it does happen, log and assert(). @todo Maybe reconsider. */
635 Error_code sink;
636 handle_mq_api_result(-1, &sink, "Posix_mq_handle::max_msg_size: mq_getattr()");
637 assert(false && "mq_getattr() failed (details logged); this is too bizarre.");
638 return 0;
639 }
640 // else
641
642 return size_t(attr.mq_msgsize);
643} // Posix_mq_handle::max_msg_size()
644
646{
647 // Very similar to max_msg_size(). @todo Maybe code reuse. Though line count might be even greater then so....
648
649 using ::mq_attr;
650 using ::mq_getattr;
651 assert((!m_mq.null())
652 && "As advertised: max_n_msgs() => undefined behavior if not successfully cted or was moved-from.");
653 mq_attr attr;
654 if (mq_getattr(m_mq.m_native_handle, &attr) != 0)
655 {
656 Error_code sink;
657 handle_mq_api_result(-1, &sink, "Posix_mq_handle::max_msg_size: mq_getattr()");
658 assert(false && "mq_getattr() failed (details logged); this is too bizarre.");
659 return 0;
660 }
661 return size_t(attr.mq_maxmsg);
662} // Posix_mq_handle::max_msg_size()
663
665{
666 using boost::system::system_category;
667 using ::mq_attr;
668 using ::mq_setattr;
669 // using ::O_NONBLOCK; // A macro apparently.
670
671 assert(err_code);
672
673 /* By the way -- no, this cannot be done in mq_attr that goes into mq_open(). It is ignored there according to docs.
674 * Would've been nice actually.... */
675
676 mq_attr attr;
677 attr.mq_flags = nb ? O_NONBLOCK : 0;
678 return handle_mq_api_result(mq_setattr(m_mq.m_native_handle, &attr, 0),
679 err_code, "Posix_mq_handle::set_non_blocking(): mq_setattr()");
680} // Posix_mq_handle::set_non_blocking()
681
683{
684 using flow::util::buffers_dump_string;
685 using ::mq_send;
686 // using ::EAGAIN; // A macro apparently.
687
688 FLOW_ERROR_EXEC_AND_THROW_ON_ERROR(bool, try_send, blob, _1);
689 // ^-- Call ourselves and return if err_code is null. If got to present line, err_code is not null.
690
691 assert((!m_mq.null())
692 && "As advertised: try_send() => undefined behavior if not successfully cted or was moved-from.");
693
694 auto blob_data = blob.data();
695 FLOW_LOG_TRACE("Posix_mq_handle [" << *this << "]: Nb-push of blob @[" << blob_data << "], "
696 "size [" << blob.size() << "].");
697 if (blob.size() == 0)
698 {
699 /* ::mq_send(..., nullptr, N), even when N == 0, is (1) empirically speaking harmless but (2) technically
700 * in violation of arg 2's non-null decl (even though `N == 0` is explicitly allowed per `man` page) hence
701 * (3) causes a clang UBSAN sanitizer error. So waste a couple cycles by feeding it this dummy
702 * non-null value. */
703 blob_data = static_cast<const void*>(&blob_data);
704 }
705 else // if (blob.size() != 0)
706 {
707 FLOW_LOG_DATA("Blob contents: [\n" << buffers_dump_string(blob, " ") << "].");
708 }
709
710 if (mq_send(m_mq.m_native_handle,
711 static_cast<const char*>(blob_data),
712 blob.size(), 0) == 0)
713 {
714 err_code->clear();
715 return true; // Instant success.
716 }
717 // else
718 if (errno == EAGAIN)
719 {
720 FLOW_LOG_TRACE("Posix_mq_handle [" << *this << "]: Nb-push of blob @[" << blob_data << "], "
721 "size [" << blob.size() << "]: would-block.");
722 err_code->clear();
723 return false; // Queue full.
724 }
725 // else
726
727 handle_mq_api_result(-1, err_code, "Posix_mq_handle::try_send(): mq_send(nb)");
728 assert(*err_code);
729 return false;
730} // Posix_mq_handle::try_send()
731
733{
734 using flow::util::buffers_dump_string;
735 using ::mq_send;
736 // using ::EAGAIN; // A macro apparently.
737
738 if (flow::error::exec_void_and_throw_on_error
739 ([&](Error_code* actual_err_code) { send(blob, actual_err_code); },
740 err_code, "Posix_mq_handle::send()"))
741 {
742 return;
743 }
744 // else
745
746 assert((!m_mq.null())
747 && "As advertised: send() => undefined behavior if not successfully cted or was moved-from.");
748
749 auto blob_data = blob.data();
750 FLOW_LOG_TRACE("Posix_mq_handle [" << *this << "]: Blocking-push of blob @[" << blob_data << "], "
751 "size [" << blob.size() << "]. Trying nb-push first; if it succeeds -- great. "
752 "Else will wait/retry/wait/retry/....");
753 if (blob.size() == 0)
754 {
755 // See similarly-placed comment in try_send() which explains this.
756 blob_data = static_cast<const void*>(&blob_data);
757 }
758 else // if (blob.size() != 0)
759 {
760 FLOW_LOG_DATA("Blob contents: [\n" << buffers_dump_string(blob, " ") << "].");
761 }
762
763 /* We could just invoke blocking mq_send(m_mq), but to get the promised logging we go a tiny bit fancier as follows.
764 * Update: Now that we have to be interrupt_*()ible, also reuse wait_*() instead of using native
765 * blocking mq_*(). */
766
767 while (true)
768 {
769 if (mq_send(m_mq.m_native_handle,
770 static_cast<const char*>(blob_data),
771 blob.size(), 0) == 0)
772 {
773 err_code->clear();
774 return; // Instant success.
775 }
776 // else
777 if (errno != EAGAIN)
778 {
779 handle_mq_api_result(-1, err_code, "Posix_mq_handle::send(): mq_send(nb)");
780 assert(*err_code);
781 return;
782 }
783 // else if (would-block): as promised, INFO logs.
784
785 FLOW_LOG_TRACE("Posix_mq_handle [" << *this << "]: Nb-push of blob @[" << blob_data << "], "
786 "size [" << blob.size() << "]: would-block. Executing blocking-wait.");
787
788 wait_sendable(err_code);
789 if (*err_code)
790 {
791 // Whether interrupted or true error, we're done.
792 return;
793 }
794 // else
795
796 FLOW_LOG_TRACE("Blocking-wait reported transmissibility. Retrying.");
797 } // while (true)
798} // Posix_mq_handle::send()
799
801 Error_code* err_code)
802{
803 using flow::util::time_since_posix_epoch;
804 using flow::util::buffers_dump_string;
805 using flow::Fine_clock;
806 using boost::chrono::floor;
807 using boost::chrono::round;
808 using boost::chrono::seconds;
809 using boost::chrono::microseconds;
810 using boost::chrono::nanoseconds;
811 using ::mq_send;
812 // using ::EAGAIN; // A macro apparently.
813
814 FLOW_ERROR_EXEC_AND_THROW_ON_ERROR(bool, timed_send, blob, timeout_from_now, _1);
815 // ^-- Call ourselves and return if err_code is null. If got to present line, err_code is not null.
816
817 // Similar to a combo of (blocking) send() and (non-blocking) try_send() -- keeping comments light where redundant.
818
819 assert((!m_mq.null())
820 && "As advertised: timed_send() => undefined behavior if not successfully cted or was moved-from.");
821
822 auto blob_data = blob.data();
823 FLOW_LOG_TRACE("Posix_mq_handle [" << *this << "]: Blocking-timed-push of blob @[" << blob_data << "], "
824 "size [" << blob.size() << "]; timeout ~[" << round<microseconds>(timeout_from_now) << "]. "
825 "Trying nb-push first; if it succeeds -- great. Else will wait/retry/wait/retry/....");
826 if (blob.size() == 0)
827 {
828 // See similarly-placed comment in try_send() which explains this.
829 blob_data = static_cast<const void*>(&blob_data);
830 }
831 else // if (blob.size() != 0)
832 {
833 FLOW_LOG_DATA("Blob contents: [\n" << buffers_dump_string(blob, " ") << "].");
834 }
835
836 auto now = Fine_clock::now();
837 auto after = now;
838
839 while (true)
840 {
841 if (mq_send(m_mq.m_native_handle,
842 static_cast<const char*>(blob_data),
843 blob.size(), 0) == 0)
844 {
845 err_code->clear();
846 break; // Instant success.
847 }
848 // else
849 if (errno != EAGAIN)
850 {
851 handle_mq_api_result(-1, err_code, "Posix_mq_handle::send(): mq_send(nb)");
852 assert(*err_code);
853 return false;
854 }
855 // else if (would-block): as promised, INFO logs.
856
857 FLOW_LOG_TRACE("Posix_mq_handle [" << *this << "]: Nb-push of blob @[" << blob_data << "], "
858 "size [" << blob.size() << "]: would-block. Executing blocking-wait.");
859
860 timeout_from_now -= (after - now); // No-op the first time; after that reduces time left.
861 const bool ready = timed_wait_sendable(timeout_from_now, err_code);
862 if (*err_code)
863 {
864 // Whether interrupted or true error, we're done.
865 return false;
866 }
867 // else:
868
869 if (!ready) // I.e., if (timed out).
870 {
871 FLOW_LOG_TRACE("Did not finish before timeout.");
872 return false;
873 }
874 // else: successful wait for transmissibility. Try nb-transmitting again.
875 FLOW_LOG_TRACE("Blocking-wait reported transmissibility. Retrying.");
876
877 after = Fine_clock::now();
878 assert((after >= now) && "Fine_clock is supposed to never go backwards.");
879 } // while (true)
880
881 return true;
882} // Posix_mq_handle::timed_send()
883
885{
886 using util::Blob_mutable;
887 using flow::util::buffers_dump_string;
888 using ::mq_receive;
889 // using ::EAGAIN; // A macro apparently.
890
891 FLOW_ERROR_EXEC_AND_THROW_ON_ERROR(bool, try_receive, blob, _1);
892 // ^-- Call ourselves and return if err_code is null. If got to present line, err_code is not null.
893
894 assert((!m_mq.null())
895 && "As advertised: try_receive() => undefined behavior if not successfully cted or was moved-from.");
896
897 FLOW_LOG_TRACE("Posix_mq_handle [" << *this << "]: Nb-pop to blob @[" << blob->data() << "], "
898 "max-size [" << blob->size() << "].");
899
900 ssize_t n_rcvd;
901 unsigned int pri_ignored;
902 if ((n_rcvd = mq_receive(m_mq.m_native_handle,
903 static_cast<char*>(blob->data()),
904 blob->size(), &pri_ignored)) >= 0)
905 {
906 *blob = Blob_mutable(blob->data(), n_rcvd);
907 FLOW_LOG_TRACE("Received message sized [" << n_rcvd << "].");
908 if (blob->size() != 0)
909 {
910 FLOW_LOG_DATA("Blob contents: [\n" << buffers_dump_string(*blob, " ") << "].");
911 }
912 err_code->clear();
913 return true; // Instant success.
914 }
915 // else
916 if (errno == EAGAIN)
917 {
918 FLOW_LOG_TRACE("Posix_mq_handle [" << *this << "]: Nb-pop to blob @[" << blob->data() << "], "
919 "max-size [" << blob->size() << "]: would-block.");
920 err_code->clear();
921 return false; // Queue full.
922 }
923 // else
924
925 handle_mq_api_result(-1, err_code, "Posix_mq_handle::try_receive(): mq_receive(nb)");
926 assert(*err_code);
927 return false;
928} // Posix_mq_handle::try_receive()
929
931{
932 using util::Blob_mutable;
933 using flow::util::buffers_dump_string;
934 using ::mq_receive;
935 // using ::EAGAIN; // A macro apparently.
936
937 if (flow::error::exec_void_and_throw_on_error
938 ([&](Error_code* actual_err_code) { receive(blob, actual_err_code); },
939 err_code, "Posix_mq_handle::receive()"))
940 {
941 return;
942 }
943 // else
944
945 assert((!m_mq.null())
946 && "As advertised: receive() => undefined behavior if not successfully cted or was moved-from.");
947
948 FLOW_LOG_TRACE("Posix_mq_handle [" << *this << "]: Blocking-pop to blob @[" << blob->data() << "], "
949 "max-size [" << blob->size() << "]. Trying nb-pop first; if it succeeds -- great. "
950 "Else will wait/retry/wait/retry/....");
951
952 /* We could just invoke blocking mq_receive(m_mq), but to get the promised logging we go tiny bit fancier as follows.
953 * Update: Now that we have to be interrupt_*()ible, also reuse wait_*() instead of using native
954 * blocking mq_*(). */
955
956 ssize_t n_rcvd;
957 unsigned int pri_ignored;
958
959 while (true)
960 {
961 if ((n_rcvd = mq_receive(m_mq.m_native_handle,
962 static_cast<char*>(blob->data()),
963 blob->size(), &pri_ignored)) >= 0)
964 {
965 *blob = Blob_mutable(blob->data(), n_rcvd);
966 FLOW_LOG_TRACE("Received message sized [" << n_rcvd << "].");
967 if (blob->size() != 0)
968 {
969 FLOW_LOG_DATA("Blob contents: [\n" << buffers_dump_string(*blob, " ") << "].");
970 }
971 err_code->clear();
972 return; // Instant success.
973 }
974 // else
975 if (errno != EAGAIN)
976 {
977 handle_mq_api_result(-1, err_code, "Posix_mq_handle::receive(): mq_receive(nb)");
978 assert(*err_code);
979 return;
980 }
981 // else if (would-block): as promised, INFO logs.
982
983 FLOW_LOG_TRACE("Posix_mq_handle [" << *this << "]: Nb-pop to blob @[" << blob->data() << "], "
984 "max-size [" << blob->size() << "]: would-block. Executing blocking-wait.");
985
986 wait_receivable(err_code);
987 if (*err_code)
988 {
989 // Whether interrupted or true error, we're done.
990 return;
991 }
992 // else
993
994 FLOW_LOG_TRACE("Blocking-wait reported transmissibility. Retrying.");
995 } // while (true)
996} // Posix_mq_handle::receive()
997
999 Error_code* err_code)
1000{
1001 using util::Blob_mutable;
1002 using flow::Fine_clock;
1003 using flow::util::time_since_posix_epoch;
1004 using flow::util::buffers_dump_string;
1005 using boost::chrono::floor;
1006 using boost::chrono::round;
1007 using boost::chrono::seconds;
1008 using boost::chrono::microseconds;
1009 using boost::chrono::nanoseconds;
1010 using ::mq_receive;
1011 // using ::EAGAIN; // A macro apparently.
1012
1013 FLOW_ERROR_EXEC_AND_THROW_ON_ERROR(bool, timed_receive, blob, timeout_from_now, _1);
1014 // ^-- Call ourselves and return if err_code is null. If got to present line, err_code is not null.
1015
1016 // Similar to a combo of (blocking) receive() and (non-blocking) try_receive() -- keeping comments light if redundant.
1017
1018 assert((!m_mq.null())
1019 && "As advertised: timed_receive() => undefined behavior if not successfully cted or was moved-from.");
1020
1021 FLOW_LOG_TRACE("Posix_mq_handle [" << *this << "]: Blocking-timed-pop to blob @[" << blob->data() << "], "
1022 "max-size [" << blob->size() << "]; timeout ~[" << round<microseconds>(timeout_from_now) << "]. "
1023 "Trying nb-pop first; if it succeeds -- great. Else will wait/retry/wait/retry/....");
1024
1025 ssize_t n_rcvd;
1026 unsigned int pri_ignored;
1027
1028 auto now = Fine_clock::now();
1029 auto after = now;
1030
1031 while (true)
1032 {
1033 if ((n_rcvd = mq_receive(m_mq.m_native_handle,
1034 static_cast<char*>(blob->data()),
1035 blob->size(), &pri_ignored)) >= 0)
1036 {
1037 *blob = Blob_mutable(blob->data(), n_rcvd);
1038 FLOW_LOG_TRACE("Received message sized [" << n_rcvd << "].");
1039 if (blob->size() != 0)
1040 {
1041 FLOW_LOG_DATA("Blob contents: [\n" << buffers_dump_string(*blob, " ") << "].");
1042 }
1043 err_code->clear();
1044 break; // Instant success.
1045 }
1046 // else
1047 if (errno != EAGAIN)
1048 {
1049 handle_mq_api_result(-1, err_code, "Posix_mq_handle::timed_receive(): mq_receive(nb)");
1050 assert(*err_code);
1051 return false;
1052 }
1053 // else if (would-block): as promised, INFO logs.
1054
1055 FLOW_LOG_TRACE("Posix_mq_handle [" << *this << "]: Nb-pop to blob @[" << blob->data() << "], "
1056 "max-size [" << blob->size() << "]: would-block. Executing blocking-wait.");
1057
1058 timeout_from_now -= (after - now); // No-op the first time; after that reduces time left.
1059 const bool ready = timed_wait_receivable(timeout_from_now, err_code);
1060 if (*err_code)
1061 {
1062 // Whether interrupted or true error, we're done.
1063 return false;
1064 }
1065 // else:
1066
1067 if (!ready) // I.e., if (timed out).
1068 {
1069 FLOW_LOG_TRACE("Did not finish before timeout.");
1070 return false;
1071 }
1072 // else: successful wait for transmissibility. Try nb-transmitting again.
1073 FLOW_LOG_TRACE("Blocking-wait reported transmissibility. Retrying.");
1074
1075 after = Fine_clock::now();
1076 assert((after >= now) && "Fine_clock is supposed to never go backwards.");
1077 } // while (true)
1078
1079 return true;
1080} // Posix_mq_handle::timed_receive()
1081
1082template<bool SND_ELSE_RCV>
1084{
1085 using util::Blob_const;
1086
1087 assert((!m_mq.null())
1088 && "As advertised: interrupt_impl() => undefined behavior if not successfully cted or was moved-from.");
1089
1090 Pipe_writer* interrupter_ptr;
1091 bool* interrupting_ptr;
1092 if constexpr(SND_ELSE_RCV)
1093 {
1094 interrupter_ptr = &m_interrupter_snd; interrupting_ptr = &m_interrupting_snd;
1095 }
1096 else
1097 {
1098 interrupter_ptr = &m_interrupter_rcv; interrupting_ptr = &m_interrupting_rcv;
1099 }
1100 auto& interrupter = *interrupter_ptr;
1101 auto& interrupting = *interrupting_ptr;
1102
1103 if (interrupting)
1104 {
1105 FLOW_LOG_WARNING("Posix_mq_handle [" << *this << "]: Interrupt mode already ON for "
1106 "snd_else_rcv [" << SND_ELSE_RCV << "]. Ignoring.");
1107 return false;
1108 }
1109 // else
1110
1111 interrupting = true; // Reminder: This member is for the above guard *only*. wait_impl() relies on FD state.
1112 FLOW_LOG_INFO("Posix_mq_handle [" << *this << "]: Interrupt mode turning ON for "
1113 "snd_else_rcv [" << SND_ELSE_RCV << "].");
1114
1115 util::pipe_produce(get_logger(), &interrupter);
1116
1117 // Now any level-triggered poll-wait will detect that this mode is on.
1118 return true;
1119} // Posix_mq_handle::interrupt_impl()
1120
1121template<bool SND_ELSE_RCV>
1123{
1124 using util::Blob_mutable;
1125
1126 assert((!m_mq.null())
1127 && "As advertised: allow_impl() => undefined behavior if not successfully cted or was moved-from.");
1128
1129 // Inverse of interrupt_impl(). Keeping comments light.
1130
1131 Pipe_reader* interrupt_detector_ptr;
1132 bool* interrupting_ptr;
1133 if constexpr(SND_ELSE_RCV)
1134 {
1135 interrupt_detector_ptr = &m_interrupt_detector_snd; interrupting_ptr = &m_interrupting_snd;
1136 }
1137 else
1138 {
1139 interrupt_detector_ptr = &m_interrupt_detector_rcv; interrupting_ptr = &m_interrupting_rcv;
1140 }
1141 auto& interrupt_detector = *interrupt_detector_ptr;
1142 auto& interrupting = *interrupting_ptr;
1143
1144 if (!interrupting)
1145 {
1146 FLOW_LOG_WARNING("Posix_mq_handle [" << *this << "]: Interrupt mode already OFF for "
1147 "snd_else_rcv [" << SND_ELSE_RCV << "]. Ignoring.");
1148 return false;
1149 }
1150 // else
1151
1152 interrupting = false;
1153 FLOW_LOG_INFO("Posix_mq_handle [" << *this << "]: Interrupt mode turning OFF for "
1154 "snd_else_rcv [" << SND_ELSE_RCV << "].");
1155
1156 util::pipe_consume(get_logger(), &interrupt_detector);
1157
1158 // Now any level-triggered poll-wait will detect that this mode is off.
1159 return true;
1160} // Posix_mq_handle::allow_impl()
1161
1163{
1164 return interrupt_impl<true>();
1165}
1166
1168{
1169 return allow_impl<true>();
1170}
1171
1173{
1174 return interrupt_impl<false>();
1175}
1176
1178{
1179 return allow_impl<false>();
1180}
1181
1182bool Posix_mq_handle::wait_impl(util::Fine_duration timeout_from_now_or_none, bool snd_else_rcv, Error_code* err_code)
1183{
1184 using util::Fine_time_pt;
1185 using util::Fine_duration;
1186 using flow::util::time_since_posix_epoch;
1187 using boost::chrono::round;
1188 using boost::chrono::milliseconds;
1189 using boost::system::system_category;
1190 using boost::array;
1191 using ::epoll_wait;
1192 using Epoll_event = ::epoll_event;
1193
1194 FLOW_ERROR_EXEC_AND_THROW_ON_ERROR(bool, wait_impl, timeout_from_now_or_none, snd_else_rcv, _1);
1195 // ^-- Call ourselves and return if err_code is null. If got to present line, err_code is not null.
1196
1197 assert((!m_mq.null())
1198 && "As advertised: wait_impl() => undefined behavior if not successfully cted or was moved-from.");
1199
1200 /* By the way -- epoll_wait() takes # of milliseconds, so round up to that and no need to do crazy stuff like
1201 * timed_send() et al; just convert to milliseconds. */
1202 int epoll_timeout_from_now_ms;
1203 milliseconds epoll_timeout_from_now;
1204 if (timeout_from_now_or_none == Fine_duration::max())
1205 {
1206 epoll_timeout_from_now_ms = -1;
1207 FLOW_LOG_TRACE("Posix_mq_handle [" << *this << "]: Infinite-await-unstarved for "
1208 "snd_else_rcv [" << snd_else_rcv << "]. Will perform an epoll_wait().");
1209 }
1210 else
1211 {
1212 epoll_timeout_from_now = round<milliseconds>(timeout_from_now_or_none);
1213 FLOW_LOG_TRACE("Posix_mq_handle [" << *this << "]: Blocking-await/poll-unstarved for "
1214 "snd_else_rcv [" << snd_else_rcv << "]; timeout ~[" << epoll_timeout_from_now << "] -- "
1215 "if 0 then poll. Will perform an epoll_wait().");
1216 epoll_timeout_from_now_ms = int(epoll_timeout_from_now.count());
1217 }
1218
1219 array<Epoll_event, 2> evs; // Only one possible event (we choose 1 of 2 event sets).
1220 const auto epoll_result
1221 = epoll_wait((snd_else_rcv ? m_epoll_hndl_snd : m_epoll_hndl_rcv)
1222 .m_native_handle,
1223 evs.begin(), 1, epoll_timeout_from_now_ms);
1224 if (epoll_result == -1)
1225 {
1226 FLOW_LOG_WARNING("Posix_mq_handle [" << *this << "]: epoll_wait() yielded error. Details follow.");
1227
1228 const auto& sys_err_code = *err_code = Error_code(errno, system_category());
1229 FLOW_ERROR_SYS_ERROR_LOG_WARNING();
1230 return false;
1231 }
1232 // else
1233
1234 assert(epoll_result <= 2);
1235 if ((epoll_result == 2) // Max of 2 events; if interrupted then as promised disregard it being also transmissible.
1236 ||
1237 ((epoll_result == 1) // 1 event: need to check whether it's the interruptor as opposed to the actual queue.
1238 && (evs[0].data.fd != m_mq.m_native_handle)))
1239 {
1240 if (timeout_from_now_or_none == Fine_duration::max())
1241 {
1242 FLOW_LOG_INFO("Posix_mq_handle [" << *this << "]: Infinite-await-unstarved for "
1243 "snd_else_rcv [" << snd_else_rcv << "]: interrupted.");
1244 }
1245 else
1246 {
1247 FLOW_LOG_INFO("Posix_mq_handle [" << *this << "]: Blocking-await/poll-unstarved for "
1248 "snd_else_rcv [" << snd_else_rcv << "]; timeout ~[" << epoll_timeout_from_now << "] -- "
1249 "if 0 then poll: interrupted.");
1250 }
1251 *err_code = error::Code::S_INTERRUPTED;
1252 return false;
1253 }
1254 // else if (epoll_result <= 1): Not interrupted.
1255
1256 const bool success = epoll_result == 1;
1257 if (timeout_from_now_or_none == Fine_duration::max())
1258 {
1259 FLOW_LOG_TRACE("Posix_mq_handle [" << *this << "]: Infinite-await-unstarved for "
1260 "snd_else_rcv [" << snd_else_rcv << "]: succeeded? = [" << success << "].");
1261 }
1262 else
1263 {
1264 FLOW_LOG_TRACE("Posix_mq_handle [" << *this << "]: Blocking-await/poll-unstarved for "
1265 "snd_else_rcv [" << snd_else_rcv << "]; timeout ~[" << epoll_timeout_from_now << "] -- "
1266 "if 0 then poll: succeeded? = [" << success << "].");
1267 }
1268
1269 err_code->clear();
1270 return success;
1271} // Posix_mq_handle::wait_impl()
1272
1274{
1275 return wait_impl(util::Fine_duration::zero(), true, err_code);
1276}
1277
1279{
1280 wait_impl(util::Fine_duration::max(), true, err_code);
1281}
1282
1284{
1285 return wait_impl(timeout_from_now, true, err_code);
1286}
1287
1289{
1290 return wait_impl(util::Fine_duration::zero(), false, err_code);
1291}
1292
1294{
1295 wait_impl(util::Fine_duration::max(), false, err_code);
1296}
1297
1299{
1300 return wait_impl(timeout_from_now, false, err_code);
1301}
1302
1304{
1305 return m_mq;
1306}
1307
1308void Posix_mq_handle::remove_persistent(flow::log::Logger* logger_ptr, // Static.
1309 const Shared_name& absolute_name, Error_code* err_code)
1310{
1311 using boost::system::system_category;
1312 using ::mq_unlink;
1313
1314 if (flow::error::exec_void_and_throw_on_error
1315 ([&](Error_code* actual_err_code)
1316 { remove_persistent(logger_ptr, absolute_name, actual_err_code); },
1317 err_code, "Posix_mq_handle::remove_persistent()"))
1318 {
1319 return;
1320 }
1321 // else
1322
1323 FLOW_LOG_SET_CONTEXT(logger_ptr, Log_component::S_TRANSPORT);
1324
1325 FLOW_LOG_INFO("Posix_mq @ Shared_name[" << absolute_name << "]: Removing persistent MQ if possible.");
1326 if (mq_unlink(shared_name_to_mq_name(absolute_name).c_str()) == 0)
1327 {
1328 err_code->clear();
1329 return;
1330 }
1331 // else
1332
1333 FLOW_LOG_WARNING("Posix_mq @ Shared_name[" << absolute_name << "]: While removing persistent MQ:"
1334 "mq_unlink() yielded error. Details follow.");
1335 const auto& sys_err_code = *err_code = Error_code(errno, system_category());
1336 FLOW_ERROR_SYS_ERROR_LOG_WARNING();
1337} // Posix_mq_handle::remove_persistent()
1338
1340{
1341 using boost::system::system_category;
1342
1343 if (result == 0)
1344 {
1345 err_code->clear();
1346 return true;
1347 }
1348 // else
1349
1350 FLOW_LOG_WARNING("Posix_mq_handle [" << *this << "]: mq_*() yielded error; context = [" << context << "]. "
1351 "Details follow.");
1352 const auto& sys_err_code = *err_code
1353 = (errno == EMSGSIZE)
1354 ? error::Code::S_MQ_MESSAGE_SIZE_OVER_OR_UNDERFLOW // By contract must emit this specific code for this.
1355 : Error_code(errno, system_category()); // Otherwise whatever it was.
1356 FLOW_ERROR_SYS_ERROR_LOG_WARNING();
1357
1358 return false;
1359} // Posix_mq_handle::handle_mq_api_result()
1360
1362{
1363 return m_absolute_name;
1364}
1365
1366std::ostream& operator<<(std::ostream& os, const Posix_mq_handle& val)
1367{
1368 os << '@' << &val << ": sh_name[" << val.absolute_name() << "] native_handle[";
1369 const auto native_handle = val.native_handle();
1370 if (native_handle.null())
1371 {
1372 return os << "null]";
1373 }
1374 // else
1375 return os << native_handle << ']';
1376}
1377
1378namespace
1379{
1380
1381std::string shared_name_to_mq_name(const Shared_name& name)
1382{
1383 // Pre-pend slash. See `man mq_overview`.
1384 std::string mq_name("/");
1385 return mq_name += name.str();
1386}
1387
1388} // namespace (anon)
1389
1390} // namespace ipc::transport
Implements the Persistent_mq_handle concept by wrapping the POSIX message queue API (see man mq_overv...
bool allow_sends()
Implements Persistent_mq_handle API: Turn off preemptive/concurrent interruption of blocking-sends an...
bool interrupt_receives()
Implements Persistent_mq_handle API: Turn on preemptive/concurrent interruption of blocking-receives ...
bool interrupt_sends()
Implements Persistent_mq_handle API: Turn on preemptive/concurrent interruption of blocking-sends and...
util::Pipe_reader Pipe_reader
Short-hand for anonymous pipe read end.
void wait_receivable(Error_code *err_code=0)
Implements Persistent_mq_handle API: Like receive() but without the actual popping of a message.
static const Shared_name S_RESOURCE_TYPE_ID
Implements concept API.
bool handle_mq_api_result(int result, Error_code *err_code, util::String_view context) const
Helper that handles the result of an mq_*() call by logging WARNING(s) on error; setting *err_code on...
Pipe_reader m_interrupt_detector_snd
A byte is read from this end by allow_sends() to make it not-readable for the poll-wait in wait_impl(...
Native_handle m_epoll_hndl_snd
epoll_*() handle (.null() if and only if m_mq is null) that is level-triggered to be active (with onl...
bool is_receivable(Error_code *err_code=0)
Implements Persistent_mq_handle API: Like try_receive() but without the actual popping of a message.
bool allow_receives()
Implements Persistent_mq_handle API: Turn off preemptive/concurrent interruption of blocking-receives...
bool is_sendable(Error_code *err_code=0)
Implements Persistent_mq_handle API: Like try_send() but without the actual pushing of a message.
size_t max_msg_size() const
Implements Persistent_mq_handle API: Returns the max message size of the underlying queue.
Pipe_reader m_interrupt_detector_rcv
Other-direction counterpart to m_interrupt_detector_snd.
Error_code epoll_setup()
Ctor helper that sets up m_epoll_hndl_snd and m_epoll_hndl_rcv.
util::Pipe_writer Pipe_writer
Short-hand for anonymous pipe write end.
size_t max_n_msgs() const
Implements Persistent_mq_handle API: Returns the max message count of the underlying queue.
bool try_send(const util::Blob_const &blob, Error_code *err_code=0)
Implements Persistent_mq_handle API: Non-blocking send: pushes copy of message to queue and returns t...
Pipe_writer m_interrupter_rcv
Other-direction counterpart to m_interrupter_snd.
Native_handle native_handle() const
Implements Persistent_mq_handle API: Returns the stored native MQ handle; null if not open.
bool timed_wait_receivable(util::Fine_duration timeout_from_now, Error_code *err_code=0)
Implements Persistent_mq_handle API: Like timed_receive() but without the actual popping of a message...
bool wait_impl(util::Fine_duration timeout_from_now_or_none, bool snd_else_rcv, Error_code *err_code)
Impl body for *_sendable() and *_receivable().
friend void swap(Posix_mq_handle &val1, Posix_mq_handle &val2)
Implements Persistent_mq_handle related concept: Swaps two objects.
static void remove_persistent(flow::log::Logger *logger_ptr, const Shared_name &name, Error_code *err_code=0)
Implements Persistent_mq_handle API: Removes the named persistent MQ.
void receive(util::Blob_mutable *blob, Error_code *err_code=0)
Implements Persistent_mq_handle API: Blocking receive: pops copy of message from queue into buffer; i...
void send(const util::Blob_const &blob, Error_code *err_code=0)
Implements Persistent_mq_handle API: Blocking send: pushes copy of message to queue; if queue is full...
Posix_mq_handle()
Implements Persistent_mq_handle API: Construct null handle.
bool m_interrupting_snd
Starting at false, this is made true via interrupt_sends(), and back by allow_sends(); acts as a guar...
const Shared_name & absolute_name() const
Implements Persistent_mq_handle API: Returns name equal to absolute_name passed to ctor.
Native_handle m_epoll_hndl_rcv
epoll_*() handle (.null() if and only if m_mq is null) that is level-triggered to be active (with onl...
bool allow_impl()
Impl body for allow_*().
bool interrupt_impl()
Impl body for interrupt_*().
bool timed_wait_sendable(util::Fine_duration timeout_from_now, Error_code *err_code=0)
Implements Persistent_mq_handle API: Like timed_send() but without the actual pushing of a message.
bool timed_receive(util::Blob_mutable *blob, util::Fine_duration timeout_from_now, Error_code *err_code=0)
Implements Persistent_mq_handle API: Blocking timed receive: pops copy of message from queue into buf...
~Posix_mq_handle()
Implements Persistent_mq_handle API: Destroys this handle (or no-op if no handle was successfully con...
bool set_non_blocking(bool nb, Error_code *err_code)
Sets m_mq to blocking or non-blocking and returns true on success and clears *err_code; otherwise ret...
Shared_name m_absolute_name
See absolute_name().
bool timed_send(const util::Blob_const &blob, util::Fine_duration timeout_from_now, Error_code *err_code=0)
Implements Persistent_mq_handle API: Blocking timed send: pushes copy of message to queue; if queue i...
bool try_receive(util::Blob_mutable *blob, Error_code *err_code=0)
Implements Persistent_mq_handle API: Non-blocking receive: pops copy of message from queue into buffe...
Error_code pipe_setup()
Ctor helper that sets up m_interrupt* pipe items.
void wait_sendable(Error_code *err_code=0)
Implements Persistent_mq_handle API: Like send() but without the actual pushing of a message.
bool m_interrupting_rcv
Other-direction counterpart to m_interrupting_snd.
Pipe_writer m_interrupter_snd
A byte is written to this end by interrupt_sends() to make it readable for the poll-wait in wait_impl...
Native_handle m_mq
Underlying MQ handle.
flow::util::Task_engine m_nb_task_engine
Never used for .run() or .async() – just so we can construct Pipe_reader, Pipe_writer.
Posix_mq_handle & operator=(Posix_mq_handle &&src)
Implements Persistent_mq_handle API: Replaces handle with the source handle while making the latter i...
String-wrapping abstraction representing a name uniquely distinguishing a kernel-persistent entity fr...
static Shared_name ct(const Source &src)
Copy-constructs from a char-sequence container (including string, util::String_view,...
void clear()
Makes it so empty() == true.
const std::string & str() const
Returns (sans copying) ref to immutable entire wrapped name string, suitable to pass into sys calls w...
@ S_INTERRUPTED
A blocking operation was intentionally interrupted or preemptively canceled.
@ S_MQ_MESSAGE_SIZE_OVER_OR_UNDERFLOW
Low-level message queue send-op buffer overflow (> max size) or receive-op buffer underflow (< max si...
Flow-IPC module providing transmission of structured messages and/or low-level blobs (and more) betwe...
util::Shared_name Shared_name
Convenience alias for the commonly used type util::Shared_name.
std::ostream & operator<<(std::ostream &os, const Bipc_mq_handle &val)
Prints string representation of the given Bipc_mq_handle to the given ostream.
void swap(Posix_mq_handle &val1, Posix_mq_handle &val2)
Implements Persistent_mq_handle related concept: Swaps two objects.
util::Native_handle Native_handle
Convenience alias for the commonly used type util::Native_handle.
void swap(Bipc_mq_handle &val1, Bipc_mq_handle &val2)
Implements Persistent_mq_handle related concept: Swaps two objects.
bipc::permissions Permissions
Short-hand for Unix (POSIX) permissions class.
Definition: util_fwd.hpp:161
const uint8_t * blob_data(const Blob_const &blob)
Syntactic-sugary helper that returns pointer to first byte in an immutable buffer,...
Definition: util.cpp:158
bipc::open_only_t Open_only
Tag type indicating an ideally-atomic open-if-exists-else-fail operation.
Definition: util_fwd.hpp:155
void pipe_produce(flow::log::Logger *logger_ptr, Pipe_writer *pipe)
Writes a byte to the given pipe writer.
Definition: util.cpp:67
const Open_or_create OPEN_OR_CREATE
Tag value indicating an open-if-exists-else-create operation.
Definition: util.cpp:30
boost::asio::mutable_buffer Blob_mutable
Short-hand for an mutable blob somewhere in memory, stored as exactly a void* and a size_t.
Definition: util_fwd.hpp:140
void pipe_consume(flow::log::Logger *logger_ptr, Pipe_reader *pipe)
Reads a byte via the given pipe reader.
Definition: util.cpp:96
boost::asio::writable_pipe Pipe_writer
Short-hand for anonymous pipe write end.
Definition: util_fwd.hpp:32
bipc::open_or_create_t Open_or_create
Tag type indicating an atomic open-if-exists-else-create operation.
Definition: util_fwd.hpp:152
bipc::create_only_t Create_only
Tag type indicating a create-unless-exists-else-fail operation.
Definition: util_fwd.hpp:158
flow::Fine_time_pt Fine_time_pt
Short-hand for Flow's Fine_time_pt.
Definition: util_fwd.hpp:119
boost::asio::readable_pipe Pipe_reader
Short-hand for anonymous pipe read end.
Definition: util_fwd.hpp:35
void set_resource_permissions(flow::log::Logger *logger_ptr, const fs::path &path, const Permissions &perms, Error_code *err_code)
Utility that sets the permissions of the given resource (at the supplied file system path) to specifi...
Definition: util.cpp:46
flow::Fine_duration Fine_duration
Short-hand for Flow's Fine_duration.
Definition: util_fwd.hpp:117
const Create_only CREATE_ONLY
Tag value indicating an atomic create-unless-exists-else-fail operation.
Definition: util.cpp:32
boost::asio::const_buffer Blob_const
Short-hand for an immutable blob somewhere in memory, stored as exactly a void const * and a size_t.
Definition: util_fwd.hpp:134
flow::util::String_view String_view
Short-hand for Flow's String_view.
Definition: util_fwd.hpp:115
Log_component
The flow::log::Component payload enumeration containing various log components used by Flow-IPC inter...
Definition: common.hpp:323
flow::Error_code Error_code
Short-hand for flow::Error_code which is very common.
Definition: common.hpp:298
A monolayer-thin wrapper around a native handle, a/k/a descriptor a/k/a FD.
bool null() const
Returns true if and only if m_native_handle equals S_NULL_HANDLE.
handle_t m_native_handle
The native handle (possibly equal to S_NULL_HANDLE), the exact payload of this Native_handle.