Flow-IPC 1.0.0
Flow-IPC project: Full implementation reference.
bipc_mq_handle.cpp
Go to the documentation of this file.
1/* Flow-IPC: Core
2 * Copyright 2023 Akamai Technologies, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the
5 * "License"); you may not use this file except in
6 * compliance with the License. You may obtain a copy
7 * of the License at
8 *
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in
12 * writing, software distributed under the License is
13 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
14 * CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing
16 * permissions and limitations under the License. */
17
18/// @file
22#include <flow/error/error.hpp>
23#include <flow/common.hpp>
24#include <boost/interprocess/ipc/message_queue.hpp>
25#include <boost/move/make_unique.hpp>
26
27namespace ipc::transport
28{
29
30// Initializers.
31
33
34// Implementations.
35
37
38template<typename Mode_tag>
39Bipc_mq_handle::Bipc_mq_handle(Mode_tag mode_tag, flow::log::Logger* logger_ptr, const Shared_name& absolute_name_arg,
40 size_t max_n_msg, size_t max_msg_sz,
41 const util::Permissions& perms,
42 Error_code* err_code) :
43 flow::log::Log_context(logger_ptr, Log_component::S_TRANSPORT),
44 m_absolute_name(absolute_name_arg),
45 m_interrupting_snd(false),
46 m_interrupting_rcv(false)
47{
48 using flow::log::Sev;
49 using boost::io::ios_all_saver;
50 using boost::movelib::make_unique;
51 using bipc::message_queue;
52
53 assert(max_n_msg >= 1);
54 assert(max_msg_sz >= 1);
55
56 static_assert(std::is_same_v<Mode_tag, util::Create_only> || std::is_same_v<Mode_tag, util::Open_or_create>,
57 "Can only delegate to this ctor with Mode_tag = Create_only or Open_or_create.");
58 constexpr char const * MODE_STR = std::is_same_v<Mode_tag, util::Create_only>
59 ? "create-only" : "open-or-create";
60
61 if (get_logger()->should_log(Sev::S_TRACE, get_log_component()))
62 {
63 ios_all_saver saver(*(get_logger()->this_thread_ostream())); // Revert std::oct/etc. soon.
64 FLOW_LOG_TRACE_WITHOUT_CHECKING
65 ("Bipc_mq_handle [" << *this << "]: Constructing MQ handle to MQ at name [" << absolute_name() << "] in "
66 "[" << MODE_STR << "] mode; max msg size [" << max_msg_sz << "] x [" << max_n_msg << "] msgs; "
67 "perms = [" << std::setfill('0') << std::setw(4) << std::oct << perms.get_permissions() << "].");
68 }
69
70 /* m_mq is null. Try to create/create-or-open it; it may throw exception; this will do the right thing including
71 * leaving m_mq at null, as promised, on any error. Note we might throw exception because of this call. */
72 op_with_possible_bipc_mq_exception(err_code, "Bipc_mq_handle(): bipc::message_queue()",
73 [&]()
74 {
75 m_mq = make_unique<message_queue>(mode_tag, absolute_name().native_str(), max_n_msg, max_msg_sz, perms);
76 /* Bonus: All bipc CREATE/OPEN_OR_CREATE guys take care to ensure permissions are set regardless of umask,
77 * so no need for us to set_resource_permissions() here. */
78 });
79} // Bipc_mq_handle::Bipc_mq_handle(Mode_tag)
80
81Bipc_mq_handle::Bipc_mq_handle(flow::log::Logger* logger_ptr, const Shared_name& absolute_name_arg,
82 util::Create_only, size_t max_n_msg, size_t max_msg_sz,
83 const util::Permissions& perms, Error_code* err_code) :
84 Bipc_mq_handle(util::CREATE_ONLY, logger_ptr, absolute_name_arg, max_n_msg, max_msg_sz, perms, err_code)
85{
86 // Cool.
87}
88
89Bipc_mq_handle::Bipc_mq_handle(flow::log::Logger* logger_ptr, const Shared_name& absolute_name_arg,
90 util::Open_or_create, size_t max_n_msg, size_t max_msg_sz,
91 const util::Permissions& perms, Error_code* err_code) :
92 Bipc_mq_handle(util::OPEN_OR_CREATE, logger_ptr, absolute_name_arg, max_n_msg, max_msg_sz, perms, err_code)
93{
94 // Cool.
95}
96
97Bipc_mq_handle::Bipc_mq_handle(flow::log::Logger* logger_ptr, const Shared_name& absolute_name_arg,
98 util::Open_only, Error_code* err_code) :
99 flow::log::Log_context(logger_ptr, Log_component::S_TRANSPORT),
100 m_absolute_name(absolute_name_arg),
101 m_interrupting_snd(false),
102 m_interrupting_rcv(false)
103{
104 using boost::movelib::make_unique;
105 using bipc::message_queue;
106
107 FLOW_LOG_TRACE("Bipc_mq_handle [" << *this << "]: Constructing MQ handle to MQ at name "
108 "[" << absolute_name() << "] in open-only mode.");
109
110 /* m_mq is null. Try to create it; it may throw exception; this will do the right thing including
111 * leaving m_mq at null, as promised, on any error. Note we might throw exception because of this call. */
112 op_with_possible_bipc_mq_exception(err_code, "Bipc_mq_handle(): bipc::message_queue(OPEN_ONLY)",
113 [&]()
114 {
115 m_mq = make_unique<message_queue>(util::OPEN_ONLY, absolute_name().native_str());
116 });
117} // Bipc_mq_handle::Bipc_mq_handle(Open_only)
118
119// Just move-construct m_mq, m_absolute_name, and Log_holder.
121
123{
124 FLOW_LOG_TRACE("Bipc_mq_handle [" << *this << "]: Closing MQ handle (already null? = [" << (!m_mq) << "]).");
125}
126
127// Just do m_mq = std::move(src.m_mq) and same with m_absolute_name and Log_holder.
129
131{
132 using flow::log::Log_context;
133 using std::swap;
134
135 // This is a bit faster than un-specialized std::swap() which would require a move ction + 2 move assignments.
136
137 swap(static_cast<Log_context&>(val1), static_cast<Log_context&>(val2));
138 swap(val1.m_mq, val2.m_mq);
140}
141
143{
144 assert(m_mq && "As advertised: max_msg_size() => undefined behavior if not successfully cted or was moved-from.");
145 return m_mq->get_max_msg_size();
146} // Bipc_mq_handle::max_msg_size()
147
149{
150 assert(m_mq && "As advertised: max_n_msgs() => undefined behavior if not successfully cted or was moved-from.");
151 return m_mq->get_max_msg();
152} // Bipc_mq_handle::max_msg_size()
153
155{
156 using flow::util::buffers_dump_string;
157
158 FLOW_ERROR_EXEC_AND_THROW_ON_ERROR(bool, Bipc_mq_handle::try_send, flow::util::bind_ns::cref(blob), _1);
159 // ^-- Call ourselves and return if err_code is null. If got to present line, err_code is not null.
160
161 assert(m_mq && "As advertised: try_send() => undefined behavior if not successfully cted or was moved-from.");
162
163 bool not_blocked;
164 op_with_possible_bipc_mq_exception(err_code, "Bipc_mq_handle::try_send(): bipc::message_queue::try_send()",
165 [&]()
166 {
167 auto blob_data = blob.data();
168 FLOW_LOG_TRACE("Bipc_mq_handle [" << *this << "]: Nb-push of blob @[" << blob_data << "], "
169 "size [" << blob.size() << "].");
170 if (blob.size() == 0)
171 {
172 /* bipc::message_queue::try_send() invokes memcpy(X, nullptr, N), even when N == 0;
173 * which is (1) empirically speaking harmless but (2) technically in violation of arg 2's non-null decl
174 * hence (3) causes a clang UBSAN sanitizer error. So waste a couple cycles by feeding it this dummy
175 * non-null value. */
176 blob_data = static_cast<const void*>(&blob_data);
177 }
178 else // if (blob.size() != 0)
179 {
180 FLOW_LOG_DATA("Blob contents: [\n" << buffers_dump_string(blob, " ") << "].");
181 }
182
183 not_blocked
184 = m_mq->try_send(blob_data, blob.size(), 0); // Throws <=> error wrapper sets truthy *err_code.
185 if (!not_blocked)
186 {
187 FLOW_LOG_TRACE("Bipc_mq_handle [" << *this << "]: Nb-push of blob @[" << blob_data << "], "
188 "size [" << blob.size() << "]: would-block.");
189 }
190 }); // op_with_possible_bipc_mq_exception()
191 if (*err_code) // It logged if truthy.
192 {
193 return false; // not_blocked is garbage.
194 }
195 // else
196
197 return not_blocked; // Logged about it already.
198} // Bipc_mq_handle::try_send()
199
201{
202 using flow::util::buffers_dump_string;
203
204 if (flow::error::exec_void_and_throw_on_error
205 ([&](Error_code* actual_err_code) { send(blob, actual_err_code); },
206 err_code, "Bipc_mq_handle::send()"))
207 {
208 return;
209 }
210 // else
211 err_code->clear();
212
213 assert(m_mq && "As advertised: send() => undefined behavior if not successfully cted or was moved-from.");
214
215 auto blob_data = blob.data();
216 FLOW_LOG_TRACE("Bipc_mq_handle [" << *this << "]: Blocking-push of blob @[" << blob_data << "], "
217 "size [" << blob.size() << "]. Trying nb-push first; if it succeeds -- great. "
218 "Else will wait/retry/wait/retry/....");
219 if (blob.size() == 0)
220 {
221 // See similarly-placed comment in try_send() which explains this.
222 blob_data = static_cast<const void*>(&blob_data);
223 }
224 else // if (blob.size() != 0)
225 {
226 FLOW_LOG_DATA("Blob contents: [\n" << buffers_dump_string(blob, " ") << "].");
227 }
228
229 /* We could just invoke blocking m_mq->send(), but to get the promised logging we go a tiny bit fancier as follows.
230 * Update: Now that we have to be interrupt_*()ible, also reuse wait_*() instead of using native
231 * m_mq->*(). */
232
233 bool ok;
234 while (true)
235 {
236 op_with_possible_bipc_mq_exception(err_code, "Bipc_mq_handle::send(): bipc::message_queue::try_send()",
237 [&]()
238 {
239 ok = m_mq->try_send(blob_data, blob.size(), 0); // Throws <=> error wrapper sets truthy *err_code.
240 });
241 if (*err_code || ok)
242 {
243 // Threw => true error => *err_code set; get out. Didn't throw and returned true => success; get out.
244 return;
245 }
246 // else if (would-block): as promised, INFO logs.
247
248 FLOW_LOG_TRACE("Bipc_mq_handle [" << *this << "]: Nb-push of blob @[" << blob_data << "], "
249 "size [" << blob.size() << "]: would-block. Executing blocking-wait.");
250
251 wait_sendable(err_code);
252 if (*err_code)
253 {
254 // Whether interrupted or true error, we're done.
255 return;
256 }
257 // else
258 FLOW_LOG_TRACE("Blocking-wait reported transmissibility. Retrying.");
259 } // while (true)
260} // Bipc_mq_handle::send()
261
263 Error_code* err_code)
264{
265 using flow::util::time_since_posix_epoch;
266 using flow::util::buffers_dump_string;
267 using flow::Fine_clock;
268 using boost::chrono::round;
269 using boost::chrono::microseconds;
270
271 FLOW_ERROR_EXEC_AND_THROW_ON_ERROR(bool, Bipc_mq_handle::timed_send,
272 flow::util::bind_ns::cref(blob), timeout_from_now, _1);
273 // ^-- Call ourselves and return if err_code is null. If got to present line, err_code is not null.
274
275 // Similar to a combo of (blocking) send() and (non-blocking) try_send() -- keeping comments light where redundant.
276
277 assert(m_mq && "As advertised: timed_send() => undefined behavior if not successfully cted or was moved-from.");
278
279 auto blob_data = blob.data();
280 FLOW_LOG_TRACE("Bipc_mq_handle [" << *this << "]: Blocking-timed-push of blob @[" << blob_data << "], "
281 "size [" << blob.size() << "]; timeout ~[" << round<microseconds>(timeout_from_now) << "]. "
282 "Trying nb-push first; if it succeeds -- great. Else will wait/retry/wait/retry/....");
283 if (blob.size() == 0)
284 {
285 // See similarly-placed comment in try_send() which explains this.
286 blob_data = static_cast<const void*>(&blob_data);
287 }
288 else // if (blob.size() != 0)
289 {
290 FLOW_LOG_DATA("Blob contents: [\n" << buffers_dump_string(blob, " ") << "].");
291 }
292
293 auto now = Fine_clock::now();
294 auto after = now;
295 bool ok;
296
297 while (true)
298 {
299 op_with_possible_bipc_mq_exception(err_code, "Bipc_mq_handle::timed_send(): bipc::message_queue::try_send()",
300 [&]()
301 {
302 ok = m_mq->try_send(blob_data, blob.size(), 0); // Throws <=> error wrapper sets truthy *err_code.
303 });
304 if (*err_code)
305 {
306 // Threw => true error => *err_code set; get out.
307 return false;
308 }
309 // else if (would-block or success):
310 if (ok)
311 {
312 break; // Instant success.
313 }
314 // else if (would-block): as promised, INFO logs.
315
316 FLOW_LOG_TRACE("Bipc_mq_handle [" << *this << "]: Nb-push of blob @[" << blob_data << "], "
317 "size [" << blob.size() << "]: would-block. Executing blocking-wait.");
318
319 timeout_from_now -= (after - now); // No-op the first time; after that reduces time left.
320 const bool ready = timed_wait_sendable(timeout_from_now, err_code);
321 if (*err_code)
322 {
323 // Whether interrupted or true error, we're done.
324 return false;
325 }
326 // else:
327
328 if (!ready) // I.e., if (timed out).
329 {
330 FLOW_LOG_TRACE("Did not finish before timeout.");
331 return false;
332 }
333 // else: successful wait for transmissibility. Try nb-transmitting again.
334 FLOW_LOG_TRACE("Blocking-wait reported transmissibility. Retrying.");
335
336 after = Fine_clock::now();
337 assert((after >= now) && "Fine_clock is supposed to never go backwards.");
338 } // while (true)
339
340 return true;
341} // Bipc_mq_handle::timed_send()
342
344{
345 using flow::util::buffers_dump_string;
346 using util::Blob_mutable;
347
348 FLOW_ERROR_EXEC_AND_THROW_ON_ERROR(bool, Bipc_mq_handle::try_receive, blob, _1);
349 // ^-- Call ourselves and return if err_code is null. If got to present line, err_code is not null.
350
351 assert(m_mq && "As advertised: try_receive() => undefined behavior if not successfully cted or was moved-from.");
352
353 bool not_blocked;
354 op_with_possible_bipc_mq_exception(err_code, "Bipc_mq_handle::try_receive(): bipc::message_queue::try_receive()",
355 [&]()
356 {
357 FLOW_LOG_TRACE("Bipc_mq_handle [" << *this << "]: Nb-pop to blob @[" << blob->data() << "], "
358 "max-size [" << blob->size() << "].");
359
360 size_t n_rcvd = {};
361 /* (^-- Initializer not needed algorithmically, but gcc-13 gives maybe-uninitialized warning;
362 * while at least various modern clangs and gcc-9 are fine. It's OK; we can afford it.) */
363 unsigned int pri_ignored;
364 not_blocked // Throws <=> error wrapper sets truthy *err_code. --v
365 = m_mq->try_receive(blob->data(), blob->size(), n_rcvd, pri_ignored);
366 if (not_blocked)
367 {
368 *blob = Blob_mutable(blob->data(), n_rcvd);
369 FLOW_LOG_TRACE("Received message sized [" << n_rcvd << "].");
370 if (blob->size() != 0)
371 {
372 FLOW_LOG_DATA("Blob contents: [\n" << buffers_dump_string(*blob, " ") << "].");
373 }
374 }
375 else // if (!not_blocked)
376 {
377 FLOW_LOG_TRACE("Bipc_mq_handle [" << *this << "]: Nb-pop to blob @[" << blob->data() << "], "
378 "max-size [" << blob->size() << "]: would-block.");
379 }
380 }); // op_with_possible_bipc_mq_exception()
381 if (*err_code) // It logged if truthy.
382 {
383 return false; // not_blocked is garbage; *blob is untouched.
384 }
385 // else
386
387 return not_blocked; // Logged about it already.
388} // Bipc_mq_handle::try_receive()
389
391{
392 using flow::util::buffers_dump_string;
393 using util::Blob_mutable;
394
395 if (flow::error::exec_void_and_throw_on_error
396 ([&](Error_code* actual_err_code) { receive(blob, actual_err_code); },
397 err_code, "Bipc_mq_handle::receive()"))
398 {
399 return;
400 }
401 // else
402 err_code->clear();
403
404 assert(m_mq && "As advertised: receive() => undefined behavior if not successfully cted or was moved-from.");
405
406 FLOW_LOG_TRACE("Bipc_mq_handle [" << *this << "]: Blocking-pop to blob @[" << blob->data() << "], "
407 "max-size [" << blob->size() << "]. Trying nb-pop first; if it succeeds -- great. "
408 "Else will wait/retry/wait/retry/....");
409
410 /* We could just invoke blocking m_mq->receive(), but to get the promised logging we go a tiny bit fancier as follows.
411 * Update: Now that we have to be interrupt_*()ible, also reuse wait_*() instead of using native
412 * m_mq->*(). */
413
414 size_t n_rcvd = {}; // (Why initialize? See comment near first such initializer for explanation.)
415 unsigned int pri_ignored;
416 bool ok;
417
418 while (true)
419 {
420 op_with_possible_bipc_mq_exception(err_code, "Bipc_mq_handle::receive(): bipc::message_queue::try_receive()",
421 [&]()
422 {
423 ok = m_mq->try_receive(blob->data(), blob->size(), // Throws <=> error wrapper sets truthy *err_code.
424 n_rcvd, pri_ignored);
425 });
426 if (*err_code)
427 {
428 // Threw => true error => *err_code set; get out.
429 return;
430 }
431 // else if (would-block or success):
432 if (ok)
433 {
434 *blob = Blob_mutable(blob->data(), n_rcvd);
435 FLOW_LOG_TRACE("Received message sized [" << n_rcvd << "].");
436 if (blob->size() != 0)
437 {
438 FLOW_LOG_DATA("Blob contents: [\n" << buffers_dump_string(*blob, " ") << "].");
439 }
440 return; // Instant success.
441 }
442 // else if (would-block): as promised, INFO logs.
443
444 FLOW_LOG_TRACE("Bipc_mq_handle [" << *this << "]: Nb-pop to blob @[" << blob->data() << "], "
445 "max-size [" << blob->size() << "]: would-block. Executing blocking-pop.");
446
447 wait_receivable(err_code);
448 if (*err_code)
449 {
450 // Whether interrupted or true error, we're done.
451 return;
452 }
453 // else
454 FLOW_LOG_TRACE("Blocking-wait reported transmissibility. Retrying.");
455 } // while (true)
456} // Bipc_mq_handle::receive()
457
459{
460 using util::Blob_mutable;
461 using flow::util::time_since_posix_epoch;
462 using flow::util::buffers_dump_string;
463 using flow::Fine_clock;
464 using boost::chrono::round;
465 using boost::chrono::microseconds;
466
467 FLOW_ERROR_EXEC_AND_THROW_ON_ERROR(bool, Bipc_mq_handle::timed_receive, blob, timeout_from_now, _1);
468 // ^-- Call ourselves and return if err_code is null. If got to present line, err_code is not null.
469
470 // Similar to a combo of (blocking) receive() and (non-blocking) try_receive() -- keeping comments light.
471
472 assert(m_mq && "As advertised: timed_receive() => undefined behavior if not successfully cted or was moved-from.");
473
474 FLOW_LOG_TRACE("Bipc_mq_handle [" << *this << "]: Blocking-timed-pop to blob @[" << blob->data() << "], "
475 "max-size [" << blob->size() << "]; timeout ~[" << round<microseconds>(timeout_from_now) << "]. "
476 "Trying nb-pop first; if it succeeds -- great. Else will wait/retry/wait/retry/....");
477
478 size_t n_rcvd = {}; // (Why initialize? See comment near first such initializer for explanation.)
479 unsigned int pri_ignored;
480
481 auto now = Fine_clock::now();
482 auto after = now;
483 bool ok;
484
485 while (true)
486 {
487 op_with_possible_bipc_mq_exception(err_code, "Bipc_mq_handle::timed_send(): bipc::message_queue::try_send()",
488 [&]()
489 {
490 ok = m_mq->try_receive(blob->data(), blob->size(), // Throws <=> error wrapper sets truthy *err_code.
491 n_rcvd, pri_ignored);
492 });
493 if (*err_code)
494 {
495 // Threw => true error => *err_code set; get out.
496 return false;
497 }
498 // else if (would-block or success):
499 if (ok)
500 {
501 *blob = Blob_mutable(blob->data(), n_rcvd);
502 FLOW_LOG_TRACE("Received message sized [" << n_rcvd << "].");
503 if (blob->size() != 0)
504 {
505 FLOW_LOG_DATA("Blob contents: [\n" << buffers_dump_string(*blob, " ") << "].");
506 }
507 break; // Instant success.
508 }
509 // else if (would-block): as promised, INFO logs.
510
511 FLOW_LOG_TRACE("Bipc_mq_handle [" << *this << "]: Nb-pop to blob @[" << blob->data() << "], "
512 "max-size [" << blob->size() << "]: would-block. Executing blocking-wait.");
513
514 timeout_from_now -= (after - now); // No-op the first time; after that reduces time left.
515 const bool ready = timed_wait_receivable(timeout_from_now, err_code);
516 if (*err_code)
517 {
518 // Whether interrupted or true error, we're done.
519 return false;
520 }
521 // else:
522
523 if (!ready) // I.e., if (timed out).
524 {
525 FLOW_LOG_TRACE("Did not finish before timeout.");
526 return false;
527 }
528 // else: successful wait for transmissibility. Try nb-transmitting again.
529 FLOW_LOG_TRACE("Blocking-wait reported transmissibility. Retrying.");
530
531 after = Fine_clock::now();
532 assert((after >= now) && "Fine_clock is supposed to never go backwards.");
533 } // while (true)
534
535 return true;
536} // Bipc_mq_handle::timed_receive()
537
538template<bool SND_ELSE_RCV, bool ON_ELSE_OFF>
540{
541 using Classic_shm_area = bipc::ipcdetail::managed_open_or_create_impl<bipc::shared_memory_object, 0, true, false>;
542 using Bipc_mq = bipc::message_queue;
543 using Bipc_mq_hdr = bipc::ipcdetail::mq_hdr_t<Bipc_mq::void_pointer>;
544 using Bipc_mq_mtx = bipc::interprocess_mutex;
545 using Bipc_mq_lock = bipc::scoped_lock<Bipc_mq_mtx>;
546
547 assert(m_mq
548 && "As advertised: interrupt_allow_impl() => undefined behavior if not successfully cted or was moved-from.");
549
550 /* First see m_interrupting_snd doc header. Then check out wait_impl() carefully. Then the below will
551 * probably make sense. */
552
553 bool* interrupting_ptr;
554 auto& shm_area = reinterpret_cast<Classic_shm_area&>(*m_mq);
555 auto* const mq_hdr = static_cast<Bipc_mq_hdr*>(shm_area.get_user_address());
556 decltype(mq_hdr->m_cond_recv)* cond_ptr;
557 if constexpr(SND_ELSE_RCV)
558 {
559 interrupting_ptr = &m_interrupting_snd;
560 cond_ptr = &mq_hdr->m_cond_send;
561 }
562 else
563 {
564 interrupting_ptr = &m_interrupting_rcv;
565 cond_ptr = &mq_hdr->m_cond_recv;
566 }
567 auto& interrupting = *interrupting_ptr;
568 auto& cond = *cond_ptr;
569
570 {
571 Bipc_mq_lock lock(mq_hdr->m_mutex);
572
573 if (interrupting == ON_ELSE_OFF)
574 {
575 FLOW_LOG_WARNING("Bipc_mq_handle [" << *this << "]: Interrupt mode already set for "
576 "snd_else_rcv [" << SND_ELSE_RCV << "], on_else_off [" << ON_ELSE_OFF << "]. Ignoring.");
577 return false;
578 }
579 // else
580
581 interrupting = ON_ELSE_OFF;
582 FLOW_LOG_INFO("Bipc_mq_handle [" << *this << "]: Interrupt mode set for "
583 "snd_else_rcv [" << SND_ELSE_RCV << "], on_else_off [" << ON_ELSE_OFF << "]. If on -- we "
584 "shall now ping the associated condition variable to wake up any ongoing waits.");
585
586 if constexpr(ON_ELSE_OFF)
587 {
588 /* *All* ongoing waits shall be woken up. Each one will auto-re-lock mq_hdr->m_mutex and query
589 * their local m_interrupting_*. (Note: If `*this` wakes up, it'll see it's `true`. If another
590 * one wakes up, it will probably see `false` and re-enter the wait. If by some coincidence that non-`*this`
591 * had just set *that* m_interrupting_*, then -- well, cool -- presumably we won a race against
592 * their own interrupt_allow_impl() doing the same thing. */
593 cond.notify_all(); // By the docs, and even by the source code as of Boost-1.81, this does not throw.
594
595 /* (bipc::message_queue code does .notify_one() in send/receive impl, which makes sense since one
596 * message/one empty space would be used by at most one blocked dude. It also does it outside the mutex lock,
597 * citing performance at the cost of occasional spurious wakeups. Neither thing applies to us, so let's
598 * .notify_all() and keep it simple inside the locked section. */
599 } // if constexpr(ON_ELSE_OFF)
600 /* else if constexpr(!ON_ELSE_OFF)
601 * { We're good! No need to wake anyone up to tell them... not to stop. } */
602 } // Bipc_mq_lock lock(mq_hdr->m_mutex);
603
604 return true;
605} // Bipc_mq_handle::interrupt_allow_impl()
606
608{
609 return interrupt_allow_impl<true, true>();
610}
611
613{
614 return interrupt_allow_impl<true, false>();
615}
616
618{
619 return interrupt_allow_impl<false, true>();
620}
621
623{
624 return interrupt_allow_impl<false, false>();
625}
626
627template<Bipc_mq_handle::Wait_type WAIT_TYPE, bool SND_ELSE_RCV>
628bool Bipc_mq_handle::wait_impl([[maybe_unused]] util::Fine_duration timeout_from_now, Error_code* err_code)
629{
630 using util::Fine_time_pt;
631 using flow::util::time_since_posix_epoch;
632 using boost::chrono::round;
633 using boost::chrono::microseconds;
634 using Classic_shm_area = bipc::ipcdetail::managed_open_or_create_impl<bipc::shared_memory_object, 0, true, false>;
635 using Bipc_mq = bipc::message_queue;
636 using Bipc_mq_hdr = bipc::ipcdetail::mq_hdr_t<Bipc_mq::void_pointer>;
637 using Bipc_mq_mtx = bipc::interprocess_mutex;
638 using Bipc_mq_lock = bipc::scoped_lock<Bipc_mq_mtx>;
639
640 FLOW_ERROR_EXEC_AND_THROW_ON_ERROR(bool, Bipc_mq_handle::timed_wait_receivable, timeout_from_now, _1);
641 // ^-- Call ourselves and return if err_code is null. If got to present line, err_code is not null.
642
643 assert(m_mq
644 && "As advertised: wait_impl() => undefined behavior if not successfully cted or was moved-from.");
645
646 [[maybe_unused]] Fine_time_pt timeout_since_epoch;
647 if constexpr(WAIT_TYPE == Wait_type::S_TIMED_WAIT)
648 {
649 timeout_since_epoch = Fine_time_pt(time_since_posix_epoch() + timeout_from_now);
650 }
651
652 /* Most likely the below code will elicit a "WTF." The background is that bipc::message_queue lacks anything like
653 * wait_impl(). It seems to mimic the mq_*() API. However mq_*() does have it, in Linux, because mq_t is an FD,
654 * and it can be used with epoll_*(), so that is what Posix_mq_handle uses. To get the same thing here I had
655 * to hack it as seen below. What it does is it mimics *send() and *receive() internal Boost source code, but
656 * it stops short of actually performing the read or write once the queue becomes pushable/poppable.
657 * To get it work I (ygoldfel) operate directly on their internal data structures, same as those methods do.
658 * Normally this would be beyond the pale; however in this case a couple of points make it reasonable-enough.
659 *
660 * Firstly, we can even do it in the first place, because the data structures -- all of which are directly in
661 * SHM -- `public`ly expose their data members (though, interestingly, not the methods; however the methods
662 * are very basic wrappers around public things like condition variables and mutexes). This fact is suggestive
663 * of this being intentionally possible.
664 *
665 * Secondly, and this is the main reason I consider this reasonably maintainable, is the fact that the code
666 * itself -- including a comment above the BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX define -- says that
667 * their goal was to make version A of bipc interoperable (this is IPC after all) if version B>A of bipc.
668 * That means that if they *do* change this stuff, it will be protected by #define(s) like
669 * BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX, so as to make any change possible to roll-back via a compile
670 * flag. They *could* rename a data member without changing the physical structures, in which case this
671 * would stop building, but that seems unlikely, as Boost updates are not made willy-nilly.
672 *
673 * Is there risk of this breaking with a newer Boost version? Yes, but the above evidence shows the risk is
674 * manageably low. Note that this wait_impl() feature, particularly since I've made it interruptible, is quite
675 * useful. Without it one must used timed_*(), and even with that -- suppose we want to stop work on
676 * a queue from another thread -- we have to put up a deinit time equal to the fine-grainedness of the timeout
677 * one would have to use. Plus even that aside, it is annoying to have to break up an operation into smaller
678 * ones. */
679
680 if constexpr(WAIT_TYPE == Wait_type::S_POLL)
681 {
682 FLOW_LOG_TRACE("Bipc_mq_handle [" << *this << "]: Poll-unstarved for snd_else_rcv [" << SND_ELSE_RCV << "].");
683 }
684 else if constexpr(WAIT_TYPE == Wait_type::S_TIMED_WAIT)
685 {
686 FLOW_LOG_TRACE("Bipc_mq_handle [" << *this << "]: Blocking-timed-await-unstarved for "
687 "snd_else_rcv [" << SND_ELSE_RCV << "]; "
688 "timeout ~[" << round<microseconds>(timeout_from_now) << "].");
689 }
690 else
691 {
692 static_assert(WAIT_TYPE == Wait_type::S_WAIT, "What!");
693 FLOW_LOG_TRACE("Bipc_mq_handle [" << *this << "]: Blocking-await-unstarved for snd_else_rcv "
694 "[" << SND_ELSE_RCV << "].");
695 }
696
697 auto& shm_area = reinterpret_cast<Classic_shm_area&>(*m_mq);
698 auto* const mq_hdr = static_cast<Bipc_mq_hdr*>(shm_area.get_user_address());
699 size_t* blocked_dudes_ptr;
700 bool* interrupting_ptr;
701 decltype(mq_hdr->m_cond_recv)* cond_ptr;
702 if constexpr(SND_ELSE_RCV)
703 {
704 blocked_dudes_ptr = &mq_hdr->m_blocked_senders;
705 cond_ptr = &mq_hdr->m_cond_send;
706 interrupting_ptr = &m_interrupting_snd;
707 }
708 else
709 {
710 blocked_dudes_ptr = &mq_hdr->m_blocked_receivers;
711 cond_ptr = &mq_hdr->m_cond_recv;
712 interrupting_ptr = &m_interrupting_rcv;
713 }
714 auto& blocked_dudes = *blocked_dudes_ptr;
715 auto& cond = *cond_ptr;
716 bool& interrupting = *interrupting_ptr;
717
718 const auto is_starved_func = [&]() -> bool
719 {
720 if constexpr(SND_ELSE_RCV)
721 {
722 return mq_hdr->m_cur_num_msg == mq_hdr->m_max_num_msg;
723 }
724 else
725 {
726 return mq_hdr->m_cur_num_msg == 0;
727 }
728 }; // const auto is_starved_func =
729
730 bool interrupted = false;
731 bool not_starved;
732 /* Technically [timed_]wait() can throw, I guess if bool(mutex) is false. Their internal code has a try{}
733 * so why not. */
735 "Bipc_mq_handle::wait_impl(): "
736 "bipc::interprocess_condition::[timed_]wait()",
737 [&]()
738 {
739#ifndef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
740 static_assert(false,
741 "bipc comments show this shall be true as of many Boost versions ago, unless unset which "
742 "would decrease performance; and we do not do that. Our code for simplicity assumes "
743 "it and does not support the lower-perf bipc MQ algorithm.");
744#endif
745
746 {
747 Bipc_mq_lock lock(mq_hdr->m_mutex);
748
749 // See interrupt_allow_impl() to understand this check (also below on cond.*wait()).
750 if (interrupting)
751 {
752 FLOW_LOG_TRACE("Interrupted before wait/poll started (preemptively).");
753 interrupted = true;
754 return;
755 }
756 // else
757
758 if (is_starved_func())
759 {
760 // timed_receive() would INFO-log here, but let's not slow things down while the bipc MQ mutex is locked.
761
762 if constexpr(WAIT_TYPE == Wait_type::S_POLL)
763 {
764 FLOW_LOG_TRACE("Not immediatelly unstarved. Poll = done.");
765 not_starved = false;
766 return;
767 }
768 else // if constexpr(WAIT_TYPE == Wait_type::S_[TIMED_]WAIT)
769 {
770 FLOW_LOG_TRACE("Not immediatelly unstarved. Awaiting unstarvedness or timeout.");
771
772 ++blocked_dudes;
773 try
774 {
775 do
776 {
777 if constexpr(WAIT_TYPE == Wait_type::S_WAIT)
778 {
779 cond.wait(lock);
780
781 // See interrupt_allow_impl() to understand this check (also above).
782 if (interrupting)
783 {
784 FLOW_LOG_TRACE("Interruption detected upon waking up from wait (interrupted concurrently).");
785 --blocked_dudes;
786 interrupted = true;
787 return;
788 }
789 // else: Another loop iteration.
790 }
791 else // if (WAIT_TYPE==TIMED_WAIT)
792 {
793 static_assert(WAIT_TYPE == Wait_type::S_TIMED_WAIT, "The hell?");
794
795 const bool wait_result = cond.timed_wait(lock, timeout_since_epoch); // Lock unlocked throughout wait.
796
797 // See interrupt_allow_impl() to understand this check (also above).
798 if (interrupting)
799 {
800 FLOW_LOG_TRACE("Interruption detected upon waking up from wait (interrupted concurrently).");
801 --blocked_dudes;
802 interrupted = true;
803 return;
804 }
805 // else
806
807 if (!wait_result)
808 {
809 // Timeout reached.
810 if (is_starved_func())
811 {
812 // Timeout reached; still starved. Done (exit algo).
813 --blocked_dudes;
814 not_starved = false;
815 return;
816 }
817 // else: Timeout reached; *is* unstarved. Done (exit loop -- no need to recheck is_starved_func()).
818 break;
819 } // if (!wait_result)
820 // else: Timeout not reached; probably *is* unstarved; but check it as loop exit condition.
821 } // else if (WAIT_TYPE==TIMED_WAIT)
822 }
823 while (is_starved_func());
824 } // try
825 catch (...)
826 {
827 --blocked_dudes;
828 throw;
829 }
830 --blocked_dudes;
831 } // else if constexpr(WAIT_TYPE == Wait_type::S_[TIMED_]WAIT)
832
833 // Will INFO-log shortly (outside lock).
834 not_starved = true;
835 } // if (is_starved_func())
836 else
837 {
838 FLOW_LOG_TRACE("Immediately unstarved.");
839 not_starved = true;
840 }
841 } // Bipc_mq_lock lock(mq_hdr->m_mutex);
842 }); // op_with_possible_bipc_mq_exception()
843
844 if ((!*err_code) && interrupted)
845 {
846 FLOW_LOG_INFO("Bipc_mq_handle [" << *this << "]: Poll/wait/timed-wait-unstarved for "
847 "snd_else_rcv [" << SND_ELSE_RCV << "]: interrupted (TRACE message -- if visible -- "
848 "indicates whether preemptively or concurrently).");
849 *err_code = error::Code::S_INTERRUPTED;
850 }
851
852 if (*err_code) // It logged if truthy.
853 {
854 return false; // not_starved is garbage.
855 }
856 // else: as promised, INFOx1 log.
857
858 if constexpr(WAIT_TYPE == Wait_type::S_POLL)
859 {
860 FLOW_LOG_TRACE("Bipc_mq_handle [" << *this << "]: Poll-unstarved for snd_else_rcv [" << SND_ELSE_RCV << "]: "
861 "succeeded? = [" << not_starved << "].");
862 }
863 else if constexpr(WAIT_TYPE == Wait_type::S_TIMED_WAIT)
864 {
865 FLOW_LOG_TRACE("Bipc_mq_handle [" << *this << "]: Blocking-timed-await-unstarved for "
866 "snd_else_rcv [" << SND_ELSE_RCV << "]; timeout ~[" << round<microseconds>(timeout_from_now) << "]: "
867 "succeeded? = [" << not_starved << "]. "
868 "Either was immediately unstarved, or was not but waited until success or timeout+failure. "
869 "If success: TRACE message (if visible) above indicates which occurred.");
870 }
871 else
872 {
873 assert(not_starved);
874 FLOW_LOG_TRACE("Bipc_mq_handle [" << *this << "]: Blocking-await-unstarved for "
875 "snd_else_rcv [" << SND_ELSE_RCV << "]: succeeded eventually. "
876 "Either was immediately unstarved, or was not but waited it out. "
877 "TRACE message (if visible) above indicates which occurred.");
878 }
879
880 return not_starved; // Logged about it already.
881} // Bipc_mq_handle::wait_impl()
882
884{
885 return wait_impl<Wait_type::S_POLL, true>(util::Fine_duration(), err_code);
886}
887
889{
890 wait_impl<Wait_type::S_WAIT, true>(util::Fine_duration(), err_code);
891}
892
894{
895 return wait_impl<Wait_type::S_TIMED_WAIT, true>(timeout_from_now, err_code);
896}
897
899{
900 return wait_impl<Wait_type::S_POLL, false>(util::Fine_duration(), err_code);
901}
902
904{
905 wait_impl<Wait_type::S_WAIT, false>(util::Fine_duration(), err_code);
906}
907
909{
910 return wait_impl<Wait_type::S_TIMED_WAIT, false>(timeout_from_now, err_code);
911}
912
913void Bipc_mq_handle::remove_persistent(flow::log::Logger* logger_ptr, // Static.
914 const Shared_name& absolute_name, Error_code* err_code)
915{
916 using bipc::message_queue;
917 using boost::system::system_category;
918
919 if (flow::error::exec_void_and_throw_on_error
920 ([&](Error_code* actual_err_code)
921 { remove_persistent(logger_ptr, absolute_name, actual_err_code); },
922 err_code, "Bipc_mq_handle::remove_persistent()"))
923 {
924 return;
925 }
926 // else
927
928 FLOW_LOG_SET_CONTEXT(logger_ptr, Log_component::S_TRANSPORT);
929
930 FLOW_LOG_INFO("Bipc_mq @ Shared_name[" << absolute_name << "]: Removing persistent MQ if possible.");
931 const bool ok = message_queue::remove(absolute_name.native_str()); // Does not throw.
932
933 if (ok)
934 {
935 err_code->clear();
936 return;
937 }
938 /* message_queue::remove() is strangely gimped -- though I believe so are the other kernel-persistent remove()s
939 * throughout bipc -- it does not throw and merely returns true or false and no code. Odd, since there can
940 * be at least a couple of reasons one would fail to delete.... However, in POSIX, the Boost 1.78 source code
941 * shows that mq::remove() calls shared_memory_object::remove() which calls some internal
942 * ipcdetail::delete_file() which calls... drumroll... freakin' ::unlink(const char*). Hence we haxor: */
943#ifndef FLOW_OS_LINUX // @todo Should maybe check Boost version or something too?
944# error "Code in Bipc_mq_handle::remove_persistent() relies on Boost invoking Linux unlink() with errno."
945#endif
946 const auto& sys_err_code = *err_code = Error_code(errno, system_category());
947 FLOW_ERROR_SYS_ERROR_LOG_WARNING();
948} // Bipc_mq_handle::remove_persistent()
949
950template<typename Func>
952 const Func& func)
953{
954 using flow::error::Runtime_error;
955 using bipc::interprocess_exception;
956 using boost::system::system_category;
957
958 if (flow::error::exec_void_and_throw_on_error
959 ([&](Error_code* actual_err_code) { op_with_possible_bipc_mq_exception(actual_err_code, context, func); },
960 err_code, context))
961 {
962 return;
963 }
964 // else
965
966 try
967 {
968 func();
969 }
970 catch (const interprocess_exception& exc)
971 {
972 /* They appear to always throw this guy with some interesting semantics. We want to yield our consistent
973 * Flow-style semantics, namely produce a truthy *err_code (which, if actual original err_code was null will
974 * be instead thrown after being wrapped in a flow::Runtime_error, which is really a boost::system::system_error
975 * with a cleaner what() message). More details inline below.
976 *
977 * So interprocess_exception is not a system_error even but rather an exception with a particular custom API.
978 * To ensure the information is logged *somewhere* for sure and not lost do log all of this.
979 * After that normalize to the *err_code semantics, even if some information is lost -- though we'll try
980 * to lose nothing even so. */
981 const auto native_code_raw = exc.get_native_error();
982 const auto bipc_err_code_enum = exc.get_error_code();
983 const bool is_size_error = bipc_err_code_enum == bipc::size_error;
984 FLOW_LOG_WARNING("bipc threw interprocess_exception; will emit some hopefully suitable Flow-IPC Error_code; "
985 "but here are all the details of the original exception: native code int "
986 "[" << native_code_raw << "]; bipc error_code_t enum->int "
987 "[" << int(bipc_err_code_enum) << "]; latter==size_error? = [" << is_size_error << "]; "
988 "message = [" << exc.what() << "]; context = [" << context << "].");
989
990 /* Special case: size_error is thrown by message_queue on a receive with underflow or send with overflow.
991 * Our API contract (Persistent_mq_handle concept doc header) is to emit this particular code then. */
992 if (is_size_error)
993 {
994 // Sufficient. (Check of Boost 1.78 source code confirms there's no native code in this case anyway.)
996 return;
997 }
998 // else
999 if (native_code_raw != 0)
1000 {
1001 /* At least in POSIX, interprocess_exception only does the following in this case:
1002 * - strerror(native_code_raw) => the message. But that's standard boost.system errno handling already;
1003 * so if just emit a system_category() Error_code, all will be equally well message-wise in what().
1004 * - They have a table that maps certain native_code_raw values to one of a few (not super-many but not
1005 * a handful) bipc_err_code_enum enum values. Technically the following will lose that information;
1006 * but (1) it is logged above; and (2) so what? */
1007 const auto& sys_err_code = *err_code = Error_code(native_code_raw, system_category());
1008 FLOW_ERROR_SYS_ERROR_LOG_WARNING();
1009 return;
1010 }
1011 // else
1012
1013 /* All we have is bipc_err_code_enum. We've already handled the known likely issue, size_error. Beyond that
1014 * there does not seem to be much we can do; really it's a matter of either emitting one catch-all code or
1015 * having our own equivalents of their enum. For now, at least, the former is OK. @todo Revisit. */
1016 *err_code = error::Code::S_MQ_BIPC_MISC_LIBRARY_ERROR; // The earlier WARNING is good enough.
1017 return;
1018 } // catch ()
1019 // Got here: all good.
1020 err_code->clear();
1021} // Bipc_mq_handle::op_with_possible_bipc_mq_exception()
1022
1024{
1025 return m_absolute_name;
1026}
1027
1028std::ostream& operator<<(std::ostream& os, const Bipc_mq_handle& val)
1029{
1030 return os << '@' << &val << ": sh_name[" << val.absolute_name() << ']';
1031}
1032
1033} // namespace ipc::transport
Implements the Persistent_mq_handle concept by thinly wrapping bipc::message_queue,...
Bipc_mq_handle()
Implements Persistent_mq_handle API: Construct null handle.
bool allow_receives()
Implements Persistent_mq_handle API: Turn off preemptive/concurrent interruption of blocking-receives...
boost::movelib::unique_ptr< bipc::message_queue > m_mq
Underlying MQ handle.
bool timed_wait_sendable(util::Fine_duration timeout_from_now, Error_code *err_code=0)
Implements Persistent_mq_handle API: Like timed_send() but without the actual pushing of a message.
Bipc_mq_handle & operator=(Bipc_mq_handle &&src)
Implements Persistent_mq_handle API: Replaces handle with the source handle while making the latter i...
bool m_interrupting_rcv
Other-direction counterpart to m_interrupting_snd.
bool interrupt_sends()
Implements Persistent_mq_handle API: Turn on preemptive/concurrent interruption of blocking-sends and...
void send(const util::Blob_const &blob, Error_code *err_code=0)
Implements Persistent_mq_handle API: Blocking send: pushes copy of message to queue; if queue is full...
bool try_send(const util::Blob_const &blob, Error_code *err_code=0)
Implements Persistent_mq_handle API: Non-blocking send: pushes copy of message to queue and returns t...
bool timed_wait_receivable(util::Fine_duration timeout_from_now, Error_code *err_code=0)
Implements Persistent_mq_handle API: Like timed_receive() but without the actual popping of a message...
bool timed_receive(util::Blob_mutable *blob, util::Fine_duration timeout_from_now, Error_code *err_code=0)
Implements Persistent_mq_handle API: Blocking timed receive: pops copy of message from queue into buf...
const Shared_name & absolute_name() const
Implements Persistent_mq_handle API: Returns name equal to absolute_name passed to ctor.
static const Shared_name S_RESOURCE_TYPE_ID
Implements concept API.
bool allow_sends()
Implements Persistent_mq_handle API: Turn off preemptive/concurrent interruption of blocking-sends an...
bool try_receive(util::Blob_mutable *blob, Error_code *err_code=0)
Implements Persistent_mq_handle API: Non-blocking receive: pops copy of message from queue into buffe...
@ S_TIMED_WAIT
Timed-wait-type (blocking until timeout).
@ S_WAIT
Wait-type (blocking indefinitely).
@ S_POLL
Poll-type (non-blocking).
void wait_receivable(Error_code *err_code=0)
Implements Persistent_mq_handle API: Like receive() but without the actual popping of a message.
bool interrupt_receives()
Implements Persistent_mq_handle API: Turn on preemptive/concurrent interruption of blocking-receives ...
bool m_interrupting_snd
Starting at false, this is made true via interrupt_sends(), and back by allow_sends(); when true wait...
bool timed_send(const util::Blob_const &blob, util::Fine_duration timeout_from_now, Error_code *err_code=0)
Implements Persistent_mq_handle API: Blocking timed send: pushes copy of message to queue; if queue i...
static void remove_persistent(flow::log::Logger *logger_ptr, const Shared_name &absolute_name, Error_code *err_code=0)
Implements Persistent_mq_handle API: Removes the named persistent MQ.
void op_with_possible_bipc_mq_exception(Error_code *err_code, util::String_view context, const Func &func)
Error helper: Run func() which will perform a bipc::message_queue API that might throw; if it does em...
void receive(util::Blob_mutable *blob, Error_code *err_code=0)
Implements Persistent_mq_handle API: Blocking receive: pops copy of message from queue into buffer; i...
size_t max_n_msgs() const
Implements Persistent_mq_handle API: Returns the max message count of the underlying queue.
~Bipc_mq_handle()
Implements Persistent_mq_handle API: Destroys this handle (or no-op if no handle was successfully con...
bool interrupt_allow_impl()
Impl body for interrupt_*() and allow_*().
void wait_sendable(Error_code *err_code=0)
Implements Persistent_mq_handle API: Like send() but without the actual pushing of a message.
size_t max_msg_size() const
Implements Persistent_mq_handle API: Returns the max message size of the underlying queue.
bool wait_impl(util::Fine_duration timeout_from_now, Error_code *err_code)
Impl body for *_sendable() and *_receivable().
bool is_receivable(Error_code *err_code=0)
Implements Persistent_mq_handle API: Like try_receive() but without the actual popping of a message.
Shared_name m_absolute_name
See absolute_name().
bool is_sendable(Error_code *err_code=0)
Implements Persistent_mq_handle API: Like try_send() but without the actual pushing of a message.
String-wrapping abstraction representing a name uniquely distinguishing a kernel-persistent entity fr...
const char * native_str() const
Returns (sans copying) pointer to NUL-terminated wrapped name string, suitable to pass into sys calls...
static Shared_name ct(const Source &src)
Copy-constructs from a char-sequence container (including string, util::String_view,...
@ S_INTERRUPTED
A blocking operation was intentionally interrupted or preemptively canceled.
@ S_MQ_BIPC_MISC_LIBRARY_ERROR
Low-level message queue: boost.interprocess emitted miscellaneous library exception sans a system cod...
@ S_MQ_MESSAGE_SIZE_OVER_OR_UNDERFLOW
Low-level message queue send-op buffer overflow (> max size) or receive-op buffer underflow (< max si...
Flow-IPC module providing transmission of structured messages and/or low-level blobs (and more) betwe...
util::Shared_name Shared_name
Convenience alias for the commonly used type util::Shared_name.
std::ostream & operator<<(std::ostream &os, const Bipc_mq_handle &val)
Prints string representation of the given Bipc_mq_handle to the given ostream.
void swap(Bipc_mq_handle &val1, Bipc_mq_handle &val2)
Implements Persistent_mq_handle related concept: Swaps two objects.
bipc::permissions Permissions
Short-hand for Unix (POSIX) permissions class.
Definition: util_fwd.hpp:155
const uint8_t * blob_data(const Blob_const &blob)
Syntactic-sugary helper that returns pointer to first byte in an immutable buffer,...
Definition: util.cpp:156
bipc::open_only_t Open_only
Tag type indicating an ideally-atomic open-if-exists-else-fail operation.
Definition: util_fwd.hpp:149
const Open_or_create OPEN_OR_CREATE
Tag value indicating an open-if-exists-else-create operation.
Definition: util.cpp:30
const Open_only OPEN_ONLY
Tag value indicating an atomic open-if-exists-else-fail operation.
Definition: util.cpp:31
boost::asio::mutable_buffer Blob_mutable
Short-hand for an mutable blob somewhere in memory, stored as exactly a void* and a size_t.
Definition: util_fwd.hpp:134
bipc::open_or_create_t Open_or_create
Tag type indicating an atomic open-if-exists-else-create operation.
Definition: util_fwd.hpp:146
bipc::create_only_t Create_only
Tag type indicating a create-unless-exists-else-fail operation.
Definition: util_fwd.hpp:152
flow::Fine_time_pt Fine_time_pt
Short-hand for Flow's Fine_time_pt.
Definition: util_fwd.hpp:113
flow::Fine_duration Fine_duration
Short-hand for Flow's Fine_duration.
Definition: util_fwd.hpp:111
const Create_only CREATE_ONLY
Tag value indicating an atomic create-unless-exists-else-fail operation.
Definition: util.cpp:32
boost::asio::const_buffer Blob_const
Short-hand for an immutable blob somewhere in memory, stored as exactly a void const * and a size_t.
Definition: util_fwd.hpp:128
flow::util::String_view String_view
Short-hand for Flow's String_view.
Definition: util_fwd.hpp:109
Log_component
The flow::log::Component payload enumeration containing various log components used by Flow-IPC inter...
Definition: common.hpp:322
flow::Error_code Error_code
Short-hand for flow::Error_code which is very common.
Definition: common.hpp:297