Flow-IPC 2.0.0
Flow-IPC project: Full implementation reference.
bipc_mq_handle.cpp
Go to the documentation of this file.
1/* Flow-IPC: Core
2 * Copyright 2023 Akamai Technologies, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the
5 * "License"); you may not use this file except in
6 * compliance with the License. You may obtain a copy
7 * of the License at
8 *
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in
12 * writing, software distributed under the License is
13 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
14 * CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing
16 * permissions and limitations under the License. */
17
18/// @file
22#include <flow/error/error.hpp>
23#include <flow/common.hpp>
24#include <boost/interprocess/ipc/message_queue.hpp>
25#include <boost/move/make_unique.hpp>
26
27namespace ipc::transport
28{
29
30// Initializers.
31
33
34// Implementations.
35
37
38template<typename Mode_tag>
39Bipc_mq_handle::Bipc_mq_handle(Mode_tag mode_tag, flow::log::Logger* logger_ptr, const Shared_name& absolute_name_arg,
40 size_t max_n_msg, size_t max_msg_sz,
41 const util::Permissions& perms,
42 Error_code* err_code) :
43 flow::log::Log_context(logger_ptr, Log_component::S_TRANSPORT),
44 m_absolute_name(absolute_name_arg),
45 m_interrupting_snd(false),
46 m_interrupting_rcv(false)
47{
48 using flow::log::Sev;
49 using boost::io::ios_all_saver;
50 using boost::movelib::make_unique;
51 using bipc::message_queue;
52
53 assert(max_n_msg >= 1);
54 assert(max_msg_sz >= 1);
55
56 static_assert(std::is_same_v<Mode_tag, util::Create_only> || std::is_same_v<Mode_tag, util::Open_or_create>,
57 "Can only delegate to this ctor with Mode_tag = Create_only or Open_or_create.");
58 constexpr char const * MODE_STR = std::is_same_v<Mode_tag, util::Create_only>
59 ? "create-only" : "open-or-create";
60
61 if (get_logger()->should_log(Sev::S_TRACE, get_log_component()))
62 {
63 ios_all_saver saver(*(get_logger()->this_thread_ostream())); // Revert std::oct/etc. soon.
64 FLOW_LOG_TRACE_WITHOUT_CHECKING
65 ("Bipc_mq_handle [" << *this << "]: Constructing MQ handle to MQ at name [" << absolute_name() << "] in "
66 "[" << MODE_STR << "] mode; max msg size [" << max_msg_sz << "] x [" << max_n_msg << "] msgs; "
67 "perms = [" << std::setfill('0') << std::setw(4) << std::oct << perms.get_permissions() << "].");
68 }
69
70 /* m_mq is null. Try to create/create-or-open it; it may throw exception; this will do the right thing including
71 * leaving m_mq at null, as promised, on any error. Note we might throw exception because of this call. */
72 op_with_possible_bipc_mq_exception(err_code, "Bipc_mq_handle(): bipc::message_queue()",
73 [&]()
74 {
75 m_mq = make_unique<message_queue>(mode_tag, absolute_name().native_str(), max_n_msg, max_msg_sz, perms);
76 /* Bonus: All bipc CREATE/OPEN_OR_CREATE guys take care to ensure permissions are set regardless of umask,
77 * so no need for us to set_resource_permissions() here. */
78 });
79} // Bipc_mq_handle::Bipc_mq_handle(Mode_tag)
80
81Bipc_mq_handle::Bipc_mq_handle(flow::log::Logger* logger_ptr, const Shared_name& absolute_name_arg,
82 util::Create_only, size_t max_n_msg, size_t max_msg_sz,
83 const util::Permissions& perms, Error_code* err_code) :
84 Bipc_mq_handle(util::CREATE_ONLY, logger_ptr, absolute_name_arg, max_n_msg, max_msg_sz, perms, err_code)
85{
86 // Cool.
87}
88
89Bipc_mq_handle::Bipc_mq_handle(flow::log::Logger* logger_ptr, const Shared_name& absolute_name_arg,
90 util::Open_or_create, size_t max_n_msg, size_t max_msg_sz,
91 const util::Permissions& perms, Error_code* err_code) :
92 Bipc_mq_handle(util::OPEN_OR_CREATE, logger_ptr, absolute_name_arg, max_n_msg, max_msg_sz, perms, err_code)
93{
94 // Cool.
95}
96
97Bipc_mq_handle::Bipc_mq_handle(flow::log::Logger* logger_ptr, const Shared_name& absolute_name_arg,
98 util::Open_only, Error_code* err_code) :
99 flow::log::Log_context(logger_ptr, Log_component::S_TRANSPORT),
100 m_absolute_name(absolute_name_arg),
101 m_interrupting_snd(false),
102 m_interrupting_rcv(false)
103{
104 using boost::movelib::make_unique;
105 using bipc::message_queue;
106
107 FLOW_LOG_TRACE("Bipc_mq_handle [" << *this << "]: Constructing MQ handle to MQ at name "
108 "[" << absolute_name() << "] in open-only mode.");
109
110 /* m_mq is null. Try to create it; it may throw exception; this will do the right thing including
111 * leaving m_mq at null, as promised, on any error. Note we might throw exception because of this call. */
112 op_with_possible_bipc_mq_exception(err_code, "Bipc_mq_handle(): bipc::message_queue(OPEN_ONLY)",
113 [&]()
114 {
115 m_mq = make_unique<message_queue>(util::OPEN_ONLY, absolute_name().native_str());
116 });
117} // Bipc_mq_handle::Bipc_mq_handle(Open_only)
118
119// Just move-construct m_mq, m_absolute_name, and Log_holder.
121
123{
124 FLOW_LOG_TRACE("Bipc_mq_handle [" << *this << "]: Closing MQ handle (already null? = [" << (!m_mq) << "]).");
125}
126
127// Just do m_mq = std::move(src.m_mq) and same with m_absolute_name and Log_holder.
129
131{
132 using flow::log::Log_context;
133 using std::swap;
134
135 // This is a bit faster than un-specialized std::swap() which would require a move ction + 2 move assignments.
136
137 swap(static_cast<Log_context&>(val1), static_cast<Log_context&>(val2));
138 swap(val1.m_mq, val2.m_mq);
140}
141
143{
144 assert(m_mq && "As advertised: max_msg_size() => undefined behavior if not successfully cted or was moved-from.");
145 return m_mq->get_max_msg_size();
146} // Bipc_mq_handle::max_msg_size()
147
149{
150 assert(m_mq && "As advertised: max_n_msgs() => undefined behavior if not successfully cted or was moved-from.");
151 return m_mq->get_max_msg();
152} // Bipc_mq_handle::max_msg_size()
153
155{
156 using flow::util::buffers_dump_string;
157
158 FLOW_ERROR_EXEC_AND_THROW_ON_ERROR(bool, try_send, blob, _1);
159 // ^-- Call ourselves and return if err_code is null. If got to present line, err_code is not null.
160
161 assert(m_mq && "As advertised: try_send() => undefined behavior if not successfully cted or was moved-from.");
162
163 bool not_blocked;
164 op_with_possible_bipc_mq_exception(err_code, "Bipc_mq_handle::try_send(): bipc::message_queue::try_send()",
165 [&]()
166 {
167 auto blob_data = blob.data();
168 FLOW_LOG_TRACE("Bipc_mq_handle [" << *this << "]: Nb-push of blob @[" << blob_data << "], "
169 "size [" << blob.size() << "].");
170 if (blob.size() == 0)
171 {
172 /* bipc::message_queue::try_send() invokes memcpy(X, nullptr, N), even when N == 0;
173 * which is (1) empirically speaking harmless but (2) technically in violation of arg 2's non-null decl
174 * hence (3) causes a clang UBSAN sanitizer error. So waste a couple cycles by feeding it this dummy
175 * non-null value. */
176 blob_data = static_cast<const void*>(&blob_data);
177 }
178 else // if (blob.size() != 0)
179 {
180 FLOW_LOG_DATA("Blob contents: [\n" << buffers_dump_string(blob, " ") << "].");
181 }
182
183 not_blocked
184 = m_mq->try_send(blob_data, blob.size(), 0); // Throws <=> error wrapper sets truthy *err_code.
185 if (!not_blocked)
186 {
187 FLOW_LOG_TRACE("Bipc_mq_handle [" << *this << "]: Nb-push of blob @[" << blob_data << "], "
188 "size [" << blob.size() << "]: would-block.");
189 }
190 }); // op_with_possible_bipc_mq_exception()
191 if (*err_code) // It logged if truthy.
192 {
193 return false; // not_blocked is garbage.
194 }
195 // else
196
197 return not_blocked; // Logged about it already.
198} // Bipc_mq_handle::try_send()
199
201{
202 using flow::util::buffers_dump_string;
203
204 if (flow::error::exec_void_and_throw_on_error
205 ([&](Error_code* actual_err_code) { send(blob, actual_err_code); },
206 err_code, "Bipc_mq_handle::send()"))
207 {
208 return;
209 }
210 // else
211 err_code->clear();
212
213 assert(m_mq && "As advertised: send() => undefined behavior if not successfully cted or was moved-from.");
214
215 auto blob_data = blob.data();
216 FLOW_LOG_TRACE("Bipc_mq_handle [" << *this << "]: Blocking-push of blob @[" << blob_data << "], "
217 "size [" << blob.size() << "]. Trying nb-push first; if it succeeds -- great. "
218 "Else will wait/retry/wait/retry/....");
219 if (blob.size() == 0)
220 {
221 // See similarly-placed comment in try_send() which explains this.
222 blob_data = static_cast<const void*>(&blob_data);
223 }
224 else // if (blob.size() != 0)
225 {
226 FLOW_LOG_DATA("Blob contents: [\n" << buffers_dump_string(blob, " ") << "].");
227 }
228
229 /* We could just invoke blocking m_mq->send(), but to get the promised logging we go a tiny bit fancier as follows.
230 * Update: Now that we have to be interrupt_*()ible, also reuse wait_*() instead of using native
231 * m_mq->*(). */
232
233 bool ok;
234 while (true)
235 {
236 op_with_possible_bipc_mq_exception(err_code, "Bipc_mq_handle::send(): bipc::message_queue::try_send()",
237 [&]()
238 {
239 ok = m_mq->try_send(blob_data, blob.size(), 0); // Throws <=> error wrapper sets truthy *err_code.
240 });
241 if (*err_code || ok)
242 {
243 // Threw => true error => *err_code set; get out. Didn't throw and returned true => success; get out.
244 return;
245 }
246 // else if (would-block): as promised, INFO logs.
247
248 FLOW_LOG_TRACE("Bipc_mq_handle [" << *this << "]: Nb-push of blob @[" << blob_data << "], "
249 "size [" << blob.size() << "]: would-block. Executing blocking-wait.");
250
251 wait_sendable(err_code);
252 if (*err_code)
253 {
254 // Whether interrupted or true error, we're done.
255 return;
256 }
257 // else
258 FLOW_LOG_TRACE("Blocking-wait reported transmissibility. Retrying.");
259 } // while (true)
260} // Bipc_mq_handle::send()
261
263 Error_code* err_code)
264{
265 using flow::util::time_since_posix_epoch;
266 using flow::util::buffers_dump_string;
267 using flow::Fine_clock;
268 using boost::chrono::round;
269 using boost::chrono::microseconds;
270
271 FLOW_ERROR_EXEC_AND_THROW_ON_ERROR(bool, timed_send, blob, timeout_from_now, _1);
272 // ^-- Call ourselves and return if err_code is null. If got to present line, err_code is not null.
273
274 // Similar to a combo of (blocking) send() and (non-blocking) try_send() -- keeping comments light where redundant.
275
276 assert(m_mq && "As advertised: timed_send() => undefined behavior if not successfully cted or was moved-from.");
277
278 auto blob_data = blob.data();
279 FLOW_LOG_TRACE("Bipc_mq_handle [" << *this << "]: Blocking-timed-push of blob @[" << blob_data << "], "
280 "size [" << blob.size() << "]; timeout ~[" << round<microseconds>(timeout_from_now) << "]. "
281 "Trying nb-push first; if it succeeds -- great. Else will wait/retry/wait/retry/....");
282 if (blob.size() == 0)
283 {
284 // See similarly-placed comment in try_send() which explains this.
285 blob_data = static_cast<const void*>(&blob_data);
286 }
287 else // if (blob.size() != 0)
288 {
289 FLOW_LOG_DATA("Blob contents: [\n" << buffers_dump_string(blob, " ") << "].");
290 }
291
292 auto now = Fine_clock::now();
293 auto after = now;
294 bool ok;
295
296 while (true)
297 {
298 op_with_possible_bipc_mq_exception(err_code, "Bipc_mq_handle::timed_send(): bipc::message_queue::try_send()",
299 [&]()
300 {
301 ok = m_mq->try_send(blob_data, blob.size(), 0); // Throws <=> error wrapper sets truthy *err_code.
302 });
303 if (*err_code)
304 {
305 // Threw => true error => *err_code set; get out.
306 return false;
307 }
308 // else if (would-block or success):
309 if (ok)
310 {
311 break; // Instant success.
312 }
313 // else if (would-block): as promised, INFO logs.
314
315 FLOW_LOG_TRACE("Bipc_mq_handle [" << *this << "]: Nb-push of blob @[" << blob_data << "], "
316 "size [" << blob.size() << "]: would-block. Executing blocking-wait.");
317
318 timeout_from_now -= (after - now); // No-op the first time; after that reduces time left.
319 const bool ready = timed_wait_sendable(timeout_from_now, err_code);
320 if (*err_code)
321 {
322 // Whether interrupted or true error, we're done.
323 return false;
324 }
325 // else:
326
327 if (!ready) // I.e., if (timed out).
328 {
329 FLOW_LOG_TRACE("Did not finish before timeout.");
330 return false;
331 }
332 // else: successful wait for transmissibility. Try nb-transmitting again.
333 FLOW_LOG_TRACE("Blocking-wait reported transmissibility. Retrying.");
334
335 after = Fine_clock::now();
336 assert((after >= now) && "Fine_clock is supposed to never go backwards.");
337 } // while (true)
338
339 return true;
340} // Bipc_mq_handle::timed_send()
341
343{
344 using flow::util::buffers_dump_string;
345 using util::Blob_mutable;
346
347 FLOW_ERROR_EXEC_AND_THROW_ON_ERROR(bool, try_receive, blob, _1);
348 // ^-- Call ourselves and return if err_code is null. If got to present line, err_code is not null.
349
350 assert(m_mq && "As advertised: try_receive() => undefined behavior if not successfully cted or was moved-from.");
351
352 bool not_blocked;
353 op_with_possible_bipc_mq_exception(err_code, "Bipc_mq_handle::try_receive(): bipc::message_queue::try_receive()",
354 [&]()
355 {
356 FLOW_LOG_TRACE("Bipc_mq_handle [" << *this << "]: Nb-pop to blob @[" << blob->data() << "], "
357 "max-size [" << blob->size() << "].");
358
359 size_t n_rcvd = {};
360 /* (^-- Initializer not needed algorithmically, but gcc-13 gives maybe-uninitialized warning;
361 * while at least various modern clangs and gcc-9 are fine. It's OK; we can afford it.) */
362 unsigned int pri_ignored;
363 not_blocked // Throws <=> error wrapper sets truthy *err_code. --v
364 = m_mq->try_receive(blob->data(), blob->size(), n_rcvd, pri_ignored);
365 if (not_blocked)
366 {
367 *blob = Blob_mutable(blob->data(), n_rcvd);
368 FLOW_LOG_TRACE("Received message sized [" << n_rcvd << "].");
369 if (blob->size() != 0)
370 {
371 FLOW_LOG_DATA("Blob contents: [\n" << buffers_dump_string(*blob, " ") << "].");
372 }
373 }
374 else // if (!not_blocked)
375 {
376 FLOW_LOG_TRACE("Bipc_mq_handle [" << *this << "]: Nb-pop to blob @[" << blob->data() << "], "
377 "max-size [" << blob->size() << "]: would-block.");
378 }
379 }); // op_with_possible_bipc_mq_exception()
380 if (*err_code) // It logged if truthy.
381 {
382 return false; // not_blocked is garbage; *blob is untouched.
383 }
384 // else
385
386 return not_blocked; // Logged about it already.
387} // Bipc_mq_handle::try_receive()
388
390{
391 using flow::util::buffers_dump_string;
392 using util::Blob_mutable;
393
394 if (flow::error::exec_void_and_throw_on_error
395 ([&](Error_code* actual_err_code) { receive(blob, actual_err_code); },
396 err_code, "Bipc_mq_handle::receive()"))
397 {
398 return;
399 }
400 // else
401 err_code->clear();
402
403 assert(m_mq && "As advertised: receive() => undefined behavior if not successfully cted or was moved-from.");
404
405 FLOW_LOG_TRACE("Bipc_mq_handle [" << *this << "]: Blocking-pop to blob @[" << blob->data() << "], "
406 "max-size [" << blob->size() << "]. Trying nb-pop first; if it succeeds -- great. "
407 "Else will wait/retry/wait/retry/....");
408
409 /* We could just invoke blocking m_mq->receive(), but to get the promised logging we go a tiny bit fancier as follows.
410 * Update: Now that we have to be interrupt_*()ible, also reuse wait_*() instead of using native
411 * m_mq->*(). */
412
413 size_t n_rcvd = {}; // (Why initialize? See comment near first such initializer for explanation.)
414 unsigned int pri_ignored;
415 bool ok;
416
417 while (true)
418 {
419 op_with_possible_bipc_mq_exception(err_code, "Bipc_mq_handle::receive(): bipc::message_queue::try_receive()",
420 [&]()
421 {
422 ok = m_mq->try_receive(blob->data(), blob->size(), // Throws <=> error wrapper sets truthy *err_code.
423 n_rcvd, pri_ignored);
424 });
425 if (*err_code)
426 {
427 // Threw => true error => *err_code set; get out.
428 return;
429 }
430 // else if (would-block or success):
431 if (ok)
432 {
433 *blob = Blob_mutable(blob->data(), n_rcvd);
434 FLOW_LOG_TRACE("Received message sized [" << n_rcvd << "].");
435 if (blob->size() != 0)
436 {
437 FLOW_LOG_DATA("Blob contents: [\n" << buffers_dump_string(*blob, " ") << "].");
438 }
439 return; // Instant success.
440 }
441 // else if (would-block): as promised, INFO logs.
442
443 FLOW_LOG_TRACE("Bipc_mq_handle [" << *this << "]: Nb-pop to blob @[" << blob->data() << "], "
444 "max-size [" << blob->size() << "]: would-block. Executing blocking-pop.");
445
446 wait_receivable(err_code);
447 if (*err_code)
448 {
449 // Whether interrupted or true error, we're done.
450 return;
451 }
452 // else
453 FLOW_LOG_TRACE("Blocking-wait reported transmissibility. Retrying.");
454 } // while (true)
455} // Bipc_mq_handle::receive()
456
458{
459 using util::Blob_mutable;
460 using flow::util::time_since_posix_epoch;
461 using flow::util::buffers_dump_string;
462 using flow::Fine_clock;
463 using boost::chrono::round;
464 using boost::chrono::microseconds;
465
466 FLOW_ERROR_EXEC_AND_THROW_ON_ERROR(bool, timed_receive, blob, timeout_from_now, _1);
467 // ^-- Call ourselves and return if err_code is null. If got to present line, err_code is not null.
468
469 // Similar to a combo of (blocking) receive() and (non-blocking) try_receive() -- keeping comments light.
470
471 assert(m_mq && "As advertised: timed_receive() => undefined behavior if not successfully cted or was moved-from.");
472
473 FLOW_LOG_TRACE("Bipc_mq_handle [" << *this << "]: Blocking-timed-pop to blob @[" << blob->data() << "], "
474 "max-size [" << blob->size() << "]; timeout ~[" << round<microseconds>(timeout_from_now) << "]. "
475 "Trying nb-pop first; if it succeeds -- great. Else will wait/retry/wait/retry/....");
476
477 size_t n_rcvd = {}; // (Why initialize? See comment near first such initializer for explanation.)
478 unsigned int pri_ignored;
479
480 auto now = Fine_clock::now();
481 auto after = now;
482 bool ok;
483
484 while (true)
485 {
486 op_with_possible_bipc_mq_exception(err_code, "Bipc_mq_handle::timed_send(): bipc::message_queue::try_send()",
487 [&]()
488 {
489 ok = m_mq->try_receive(blob->data(), blob->size(), // Throws <=> error wrapper sets truthy *err_code.
490 n_rcvd, pri_ignored);
491 });
492 if (*err_code)
493 {
494 // Threw => true error => *err_code set; get out.
495 return false;
496 }
497 // else if (would-block or success):
498 if (ok)
499 {
500 *blob = Blob_mutable(blob->data(), n_rcvd);
501 FLOW_LOG_TRACE("Received message sized [" << n_rcvd << "].");
502 if (blob->size() != 0)
503 {
504 FLOW_LOG_DATA("Blob contents: [\n" << buffers_dump_string(*blob, " ") << "].");
505 }
506 break; // Instant success.
507 }
508 // else if (would-block): as promised, INFO logs.
509
510 FLOW_LOG_TRACE("Bipc_mq_handle [" << *this << "]: Nb-pop to blob @[" << blob->data() << "], "
511 "max-size [" << blob->size() << "]: would-block. Executing blocking-wait.");
512
513 timeout_from_now -= (after - now); // No-op the first time; after that reduces time left.
514 const bool ready = timed_wait_receivable(timeout_from_now, err_code);
515 if (*err_code)
516 {
517 // Whether interrupted or true error, we're done.
518 return false;
519 }
520 // else:
521
522 if (!ready) // I.e., if (timed out).
523 {
524 FLOW_LOG_TRACE("Did not finish before timeout.");
525 return false;
526 }
527 // else: successful wait for transmissibility. Try nb-transmitting again.
528 FLOW_LOG_TRACE("Blocking-wait reported transmissibility. Retrying.");
529
530 after = Fine_clock::now();
531 assert((after >= now) && "Fine_clock is supposed to never go backwards.");
532 } // while (true)
533
534 return true;
535} // Bipc_mq_handle::timed_receive()
536
537template<bool SND_ELSE_RCV, bool ON_ELSE_OFF>
539{
540 using Classic_shm_area = bipc::ipcdetail::managed_open_or_create_impl<bipc::shared_memory_object, 0, true, false>;
541 using Bipc_mq = bipc::message_queue;
542 using Bipc_mq_hdr = bipc::ipcdetail::mq_hdr_t<Bipc_mq::void_pointer>;
543 using Bipc_mq_mtx = bipc::interprocess_mutex;
544 using Bipc_mq_lock = bipc::scoped_lock<Bipc_mq_mtx>;
545
546 assert(m_mq
547 && "As advertised: interrupt_allow_impl() => undefined behavior if not successfully cted or was moved-from.");
548
549 /* First see m_interrupting_snd doc header. Then check out wait_impl() carefully. Then the below will
550 * probably make sense. */
551
552 bool* interrupting_ptr;
553 auto& shm_area = reinterpret_cast<Classic_shm_area&>(*m_mq);
554 auto* const mq_hdr = static_cast<Bipc_mq_hdr*>(shm_area.get_user_address());
555 decltype(mq_hdr->m_cond_recv)* cond_ptr;
556 if constexpr(SND_ELSE_RCV)
557 {
558 interrupting_ptr = &m_interrupting_snd;
559 cond_ptr = &mq_hdr->m_cond_send;
560 }
561 else
562 {
563 interrupting_ptr = &m_interrupting_rcv;
564 cond_ptr = &mq_hdr->m_cond_recv;
565 }
566 auto& interrupting = *interrupting_ptr;
567 auto& cond = *cond_ptr;
568
569 {
570 Bipc_mq_lock lock(mq_hdr->m_mutex);
571
572 if (interrupting == ON_ELSE_OFF)
573 {
574 FLOW_LOG_WARNING("Bipc_mq_handle [" << *this << "]: Interrupt mode already set for "
575 "snd_else_rcv [" << SND_ELSE_RCV << "], on_else_off [" << ON_ELSE_OFF << "]. Ignoring.");
576 return false;
577 }
578 // else
579
580 interrupting = ON_ELSE_OFF;
581 FLOW_LOG_INFO("Bipc_mq_handle [" << *this << "]: Interrupt mode set for "
582 "snd_else_rcv [" << SND_ELSE_RCV << "], on_else_off [" << ON_ELSE_OFF << "]. If on -- we "
583 "shall now ping the associated condition variable to wake up any ongoing waits.");
584
585 if constexpr(ON_ELSE_OFF)
586 {
587 /* *All* ongoing waits shall be woken up. Each one will auto-re-lock mq_hdr->m_mutex and query
588 * their local m_interrupting_*. (Note: If `*this` wakes up, it'll see it's `true`. If another
589 * one wakes up, it will probably see `false` and re-enter the wait. If by some coincidence that non-`*this`
590 * had just set *that* m_interrupting_*, then -- well, cool -- presumably we won a race against
591 * their own interrupt_allow_impl() doing the same thing. */
592 cond.notify_all(); // By the docs, and even by the source code as of Boost-1.81, this does not throw.
593
594 /* (bipc::message_queue code does .notify_one() in send/receive impl, which makes sense since one
595 * message/one empty space would be used by at most one blocked dude. It also does it outside the mutex lock,
596 * citing performance at the cost of occasional spurious wakeups. Neither thing applies to us, so let's
597 * .notify_all() and keep it simple inside the locked section. */
598 } // if constexpr(ON_ELSE_OFF)
599 /* else if constexpr(!ON_ELSE_OFF)
600 * { We're good! No need to wake anyone up to tell them... not to stop. } */
601 } // Bipc_mq_lock lock(mq_hdr->m_mutex);
602
603 return true;
604} // Bipc_mq_handle::interrupt_allow_impl()
605
607{
608 return interrupt_allow_impl<true, true>();
609}
610
612{
613 return interrupt_allow_impl<true, false>();
614}
615
617{
618 return interrupt_allow_impl<false, true>();
619}
620
622{
623 return interrupt_allow_impl<false, false>();
624}
625
626template<Bipc_mq_handle::Wait_type WAIT_TYPE, bool SND_ELSE_RCV>
627bool Bipc_mq_handle::wait_impl([[maybe_unused]] util::Fine_duration timeout_from_now, Error_code* err_code)
628{
629 using util::Fine_time_pt;
630 using flow::util::time_since_posix_epoch;
631 using boost::chrono::round;
632 using boost::chrono::microseconds;
633 using Classic_shm_area = bipc::ipcdetail::managed_open_or_create_impl<bipc::shared_memory_object, 0, true, false>;
634 using Bipc_mq = bipc::message_queue;
635 using Bipc_mq_hdr = bipc::ipcdetail::mq_hdr_t<Bipc_mq::void_pointer>;
636 using Bipc_mq_mtx = bipc::interprocess_mutex;
637 using Bipc_mq_lock = bipc::scoped_lock<Bipc_mq_mtx>;
638
639 FLOW_ERROR_EXEC_AND_THROW_ON_ERROR(bool, (wait_impl<WAIT_TYPE, SND_ELSE_RCV>), timeout_from_now, _1);
640 // ^-- Call ourselves and return if err_code is null. If got to present line, err_code is not null.
641
642 assert(m_mq
643 && "As advertised: wait_impl() => undefined behavior if not successfully cted or was moved-from.");
644
645 [[maybe_unused]] Fine_time_pt timeout_since_epoch;
646 if constexpr(WAIT_TYPE == Wait_type::S_TIMED_WAIT)
647 {
648 timeout_since_epoch = Fine_time_pt(time_since_posix_epoch() + timeout_from_now);
649 }
650
651 /* Most likely the below code will elicit a "WTF." The background is that bipc::message_queue lacks anything like
652 * wait_impl(). It seems to mimic the mq_*() API. However mq_*() does have it, in Linux, because mq_t is an FD,
653 * and it can be used with epoll_*(), so that is what Posix_mq_handle uses. To get the same thing here I had
654 * to hack it as seen below. What it does is it mimics *send() and *receive() internal Boost source code, but
655 * it stops short of actually performing the read or write once the queue becomes pushable/poppable.
656 * To get it work I (ygoldfel) operate directly on their internal data structures, same as those methods do.
657 * Normally this would be beyond the pale; however in this case a couple of points make it reasonable-enough.
658 *
659 * Firstly, we can even do it in the first place, because the data structures -- all of which are directly in
660 * SHM -- `public`ly expose their data members (though, interestingly, not the methods; however the methods
661 * are very basic wrappers around public things like condition variables and mutexes). This fact is suggestive
662 * of this being intentionally possible.
663 *
664 * Secondly, and this is the main reason I consider this reasonably maintainable, is the fact that the code
665 * itself -- including a comment above the BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX define -- says that
666 * their goal was to make version A of bipc interoperable (this is IPC after all) if version B>A of bipc.
667 * That means that if they *do* change this stuff, it will be protected by #define(s) like
668 * BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX, so as to make any change possible to roll-back via a compile
669 * flag. They *could* rename a data member without changing the physical structures, in which case this
670 * would stop building, but that seems unlikely, as Boost updates are not made willy-nilly.
671 *
672 * Is there risk of this breaking with a newer Boost version? Yes, but the above evidence shows the risk is
673 * manageably low. Note that this wait_impl() feature, particularly since I've made it interruptible, is quite
674 * useful. Without it one must used timed_*(), and even with that -- suppose we want to stop work on
675 * a queue from another thread -- we have to put up a deinit time equal to the fine-grainedness of the timeout
676 * one would have to use. Plus even that aside, it is annoying to have to break up an operation into smaller
677 * ones. */
678
679 if constexpr(WAIT_TYPE == Wait_type::S_POLL)
680 {
681 FLOW_LOG_TRACE("Bipc_mq_handle [" << *this << "]: Poll-unstarved for snd_else_rcv [" << SND_ELSE_RCV << "].");
682 }
683 else if constexpr(WAIT_TYPE == Wait_type::S_TIMED_WAIT)
684 {
685 FLOW_LOG_TRACE("Bipc_mq_handle [" << *this << "]: Blocking-timed-await-unstarved for "
686 "snd_else_rcv [" << SND_ELSE_RCV << "]; "
687 "timeout ~[" << round<microseconds>(timeout_from_now) << "].");
688 }
689 else
690 {
691 static_assert(WAIT_TYPE == Wait_type::S_WAIT, "What!");
692 FLOW_LOG_TRACE("Bipc_mq_handle [" << *this << "]: Blocking-await-unstarved for snd_else_rcv "
693 "[" << SND_ELSE_RCV << "].");
694 }
695
696 auto& shm_area = reinterpret_cast<Classic_shm_area&>(*m_mq);
697 auto* const mq_hdr = static_cast<Bipc_mq_hdr*>(shm_area.get_user_address());
698 size_t* blocked_dudes_ptr;
699 bool* interrupting_ptr;
700 decltype(mq_hdr->m_cond_recv)* cond_ptr;
701 if constexpr(SND_ELSE_RCV)
702 {
703 blocked_dudes_ptr = &mq_hdr->m_blocked_senders;
704 cond_ptr = &mq_hdr->m_cond_send;
705 interrupting_ptr = &m_interrupting_snd;
706 }
707 else
708 {
709 blocked_dudes_ptr = &mq_hdr->m_blocked_receivers;
710 cond_ptr = &mq_hdr->m_cond_recv;
711 interrupting_ptr = &m_interrupting_rcv;
712 }
713 auto& blocked_dudes = *blocked_dudes_ptr;
714 auto& cond = *cond_ptr;
715 bool& interrupting = *interrupting_ptr;
716
717 const auto is_starved_func = [&]() -> bool
718 {
719 if constexpr(SND_ELSE_RCV)
720 {
721 return mq_hdr->m_cur_num_msg == mq_hdr->m_max_num_msg;
722 }
723 else
724 {
725 return mq_hdr->m_cur_num_msg == 0;
726 }
727 }; // const auto is_starved_func =
728
729 bool interrupted = false;
730 bool not_starved;
731 /* Technically [timed_]wait() can throw, I guess if bool(mutex) is false. Their internal code has a try{}
732 * so why not. */
734 "Bipc_mq_handle::wait_impl(): "
735 "bipc::interprocess_condition::[timed_]wait()",
736 [&]()
737 {
738#ifndef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
739 static_assert(false,
740 "bipc comments show this shall be true as of many Boost versions ago, unless unset which "
741 "would decrease performance; and we do not do that. Our code for simplicity assumes "
742 "it and does not support the lower-perf bipc MQ algorithm.");
743#endif
744
745 {
746 Bipc_mq_lock lock(mq_hdr->m_mutex);
747
748 // See interrupt_allow_impl() to understand this check (also below on cond.*wait()).
749 if (interrupting)
750 {
751 FLOW_LOG_TRACE("Interrupted before wait/poll started (preemptively).");
752 interrupted = true;
753 return;
754 }
755 // else
756
757 if (is_starved_func())
758 {
759 // timed_receive() would INFO-log here, but let's not slow things down while the bipc MQ mutex is locked.
760
761 if constexpr(WAIT_TYPE == Wait_type::S_POLL)
762 {
763 FLOW_LOG_TRACE("Not immediatelly unstarved. Poll = done.");
764 not_starved = false;
765 return;
766 }
767 else // if constexpr(WAIT_TYPE == Wait_type::S_[TIMED_]WAIT)
768 {
769 FLOW_LOG_TRACE("Not immediatelly unstarved. Awaiting unstarvedness or timeout.");
770
771 ++blocked_dudes;
772 try
773 {
774 do
775 {
776 if constexpr(WAIT_TYPE == Wait_type::S_WAIT)
777 {
778 cond.wait(lock);
779
780 // See interrupt_allow_impl() to understand this check (also above).
781 if (interrupting)
782 {
783 FLOW_LOG_TRACE("Interruption detected upon waking up from wait (interrupted concurrently).");
784 --blocked_dudes;
785 interrupted = true;
786 return;
787 }
788 // else: Another loop iteration.
789 }
790 else // if (WAIT_TYPE==TIMED_WAIT)
791 {
792 static_assert(WAIT_TYPE == Wait_type::S_TIMED_WAIT, "The hell?");
793
794 const bool wait_result = cond.timed_wait(lock, timeout_since_epoch); // Lock unlocked throughout wait.
795
796 // See interrupt_allow_impl() to understand this check (also above).
797 if (interrupting)
798 {
799 FLOW_LOG_TRACE("Interruption detected upon waking up from wait (interrupted concurrently).");
800 --blocked_dudes;
801 interrupted = true;
802 return;
803 }
804 // else
805
806 if (!wait_result)
807 {
808 // Timeout reached.
809 if (is_starved_func())
810 {
811 // Timeout reached; still starved. Done (exit algo).
812 --blocked_dudes;
813 not_starved = false;
814 return;
815 }
816 // else: Timeout reached; *is* unstarved. Done (exit loop -- no need to recheck is_starved_func()).
817 break;
818 } // if (!wait_result)
819 // else: Timeout not reached; probably *is* unstarved; but check it as loop exit condition.
820 } // else if (WAIT_TYPE==TIMED_WAIT)
821 }
822 while (is_starved_func());
823 } // try
824 catch (...)
825 {
826 --blocked_dudes;
827 throw;
828 }
829 --blocked_dudes;
830 } // else if constexpr(WAIT_TYPE == Wait_type::S_[TIMED_]WAIT)
831
832 // Will INFO-log shortly (outside lock).
833 not_starved = true;
834 } // if (is_starved_func())
835 else
836 {
837 FLOW_LOG_TRACE("Immediately unstarved.");
838 not_starved = true;
839 }
840 } // Bipc_mq_lock lock(mq_hdr->m_mutex);
841 }); // op_with_possible_bipc_mq_exception()
842
843 if ((!*err_code) && interrupted)
844 {
845 FLOW_LOG_INFO("Bipc_mq_handle [" << *this << "]: Poll/wait/timed-wait-unstarved for "
846 "snd_else_rcv [" << SND_ELSE_RCV << "]: interrupted (TRACE message -- if visible -- "
847 "indicates whether preemptively or concurrently).");
848 *err_code = error::Code::S_INTERRUPTED;
849 }
850
851 if (*err_code) // It logged if truthy.
852 {
853 return false; // not_starved is garbage.
854 }
855 // else: as promised, INFOx1 log.
856
857 if constexpr(WAIT_TYPE == Wait_type::S_POLL)
858 {
859 FLOW_LOG_TRACE("Bipc_mq_handle [" << *this << "]: Poll-unstarved for snd_else_rcv [" << SND_ELSE_RCV << "]: "
860 "succeeded? = [" << not_starved << "].");
861 }
862 else if constexpr(WAIT_TYPE == Wait_type::S_TIMED_WAIT)
863 {
864 FLOW_LOG_TRACE("Bipc_mq_handle [" << *this << "]: Blocking-timed-await-unstarved for "
865 "snd_else_rcv [" << SND_ELSE_RCV << "]; timeout ~[" << round<microseconds>(timeout_from_now) << "]: "
866 "succeeded? = [" << not_starved << "]. "
867 "Either was immediately unstarved, or was not but waited until success or timeout+failure. "
868 "If success: TRACE message (if visible) above indicates which occurred.");
869 }
870 else
871 {
872 assert(not_starved);
873 FLOW_LOG_TRACE("Bipc_mq_handle [" << *this << "]: Blocking-await-unstarved for "
874 "snd_else_rcv [" << SND_ELSE_RCV << "]: succeeded eventually. "
875 "Either was immediately unstarved, or was not but waited it out. "
876 "TRACE message (if visible) above indicates which occurred.");
877 }
878
879 return not_starved; // Logged about it already.
880} // Bipc_mq_handle::wait_impl()
881
883{
884 return wait_impl<Wait_type::S_POLL, true>(util::Fine_duration(), err_code);
885}
886
888{
889 wait_impl<Wait_type::S_WAIT, true>(util::Fine_duration(), err_code);
890}
891
893{
894 return wait_impl<Wait_type::S_TIMED_WAIT, true>(timeout_from_now, err_code);
895}
896
898{
899 return wait_impl<Wait_type::S_POLL, false>(util::Fine_duration(), err_code);
900}
901
903{
904 wait_impl<Wait_type::S_WAIT, false>(util::Fine_duration(), err_code);
905}
906
908{
909 return wait_impl<Wait_type::S_TIMED_WAIT, false>(timeout_from_now, err_code);
910}
911
912void Bipc_mq_handle::remove_persistent(flow::log::Logger* logger_ptr, // Static.
913 const Shared_name& absolute_name, Error_code* err_code)
914{
915 using bipc::message_queue;
916 using boost::system::system_category;
917
918 if (flow::error::exec_void_and_throw_on_error
919 ([&](Error_code* actual_err_code)
920 { remove_persistent(logger_ptr, absolute_name, actual_err_code); },
921 err_code, "Bipc_mq_handle::remove_persistent()"))
922 {
923 return;
924 }
925 // else
926
927 FLOW_LOG_SET_CONTEXT(logger_ptr, Log_component::S_TRANSPORT);
928
929 FLOW_LOG_INFO("Bipc_mq @ Shared_name[" << absolute_name << "]: Removing persistent MQ if possible.");
930 const bool ok = message_queue::remove(absolute_name.native_str()); // Does not throw.
931
932 if (ok)
933 {
934 err_code->clear();
935 return;
936 }
937 /* message_queue::remove() is strangely gimped -- though I believe so are the other kernel-persistent remove()s
938 * throughout bipc -- it does not throw and merely returns true or false and no code. Odd, since there can
939 * be at least a couple of reasons one would fail to delete.... However, in POSIX, the Boost 1.78 source code
940 * shows that mq::remove() calls shared_memory_object::remove() which calls some internal
941 * ipcdetail::delete_file() which calls... drumroll... freakin' ::unlink(const char*). Hence we haxor: */
942#ifndef FLOW_OS_LINUX // @todo Should maybe check Boost version or something too?
943 static_assert(false,
944 "Code in Bipc_mq_handle::remove_persistent() relies on Boost invoking Linux unlink() with errno.");
945#endif
946 const auto& sys_err_code = *err_code = Error_code(errno, system_category());
947 FLOW_ERROR_SYS_ERROR_LOG_WARNING();
948} // Bipc_mq_handle::remove_persistent()
949
950template<typename Func>
952 const Func& func)
953{
954 using flow::error::Runtime_error;
955 using bipc::interprocess_exception;
956 using boost::system::system_category;
957
958 if (flow::error::exec_void_and_throw_on_error
959 ([&](Error_code* actual_err_code) { op_with_possible_bipc_mq_exception(actual_err_code, context, func); },
960 err_code, context))
961 {
962 return;
963 }
964 // else
965
966 try
967 {
968 func();
969 }
970 catch (const interprocess_exception& exc)
971 {
972 /* They appear to always throw this guy with some interesting semantics. We want to yield our consistent
973 * Flow-style semantics, namely produce a truthy *err_code (which, if actual original err_code was null will
974 * be instead thrown after being wrapped in a flow::Runtime_error, which is really a boost::system::system_error
975 * with a cleaner what() message). More details inline below.
976 *
977 * So interprocess_exception is not a system_error even but rather an exception with a particular custom API.
978 * To ensure the information is logged *somewhere* for sure and not lost do log all of this.
979 * After that normalize to the *err_code semantics, even if some information is lost -- though we'll try
980 * to lose nothing even so. */
981 const auto native_code_raw = exc.get_native_error();
982 const auto bipc_err_code_enum = exc.get_error_code();
983 const bool is_size_error = bipc_err_code_enum == bipc::size_error;
984 FLOW_LOG_WARNING("bipc threw interprocess_exception; will emit some hopefully suitable Flow-IPC Error_code; "
985 "but here are all the details of the original exception: native code int "
986 "[" << native_code_raw << "]; bipc error_code_t enum->int "
987 "[" << int(bipc_err_code_enum) << "]; latter==size_error? = [" << is_size_error << "]; "
988 "message = [" << exc.what() << "]; context = [" << context << "].");
989
990 /* Special case: size_error is thrown by message_queue on a receive with underflow or send with overflow.
991 * Our API contract (Persistent_mq_handle concept doc header) is to emit this particular code then. */
992 if (is_size_error)
993 {
994 // Sufficient. (Check of Boost 1.78 source code confirms there's no native code in this case anyway.)
996 return;
997 }
998 // else
999 if (native_code_raw != 0)
1000 {
1001 /* At least in POSIX, interprocess_exception only does the following in this case:
1002 * - strerror(native_code_raw) => the message. But that's standard boost.system errno handling already;
1003 * so if just emit a system_category() Error_code, all will be equally well message-wise in what().
1004 * - They have a table that maps certain native_code_raw values to one of a few (not super-many but not
1005 * a handful) bipc_err_code_enum enum values. Technically the following will lose that information;
1006 * but (1) it is logged above; and (2) so what? */
1007 const auto& sys_err_code = *err_code = Error_code(native_code_raw, system_category());
1008 FLOW_ERROR_SYS_ERROR_LOG_WARNING();
1009 return;
1010 }
1011 // else
1012
1013 /* All we have is bipc_err_code_enum. We've already handled the known likely issue, size_error. Beyond that
1014 * there does not seem to be much we can do; really it's a matter of either emitting one catch-all code or
1015 * having our own equivalents of their enum. For now, at least, the former is OK. @todo Revisit. */
1016 *err_code = error::Code::S_MQ_BIPC_MISC_LIBRARY_ERROR; // The earlier WARNING is good enough.
1017 return;
1018 } // catch ()
1019 // Got here: all good.
1020 err_code->clear();
1021} // Bipc_mq_handle::op_with_possible_bipc_mq_exception()
1022
1024{
1025 return m_absolute_name;
1026}
1027
1028std::ostream& operator<<(std::ostream& os, const Bipc_mq_handle& val)
1029{
1030 return os << '@' << &val << ": sh_name[" << val.absolute_name() << ']';
1031}
1032
1033} // namespace ipc::transport
Implements the Persistent_mq_handle concept by thinly wrapping bipc::message_queue,...
Bipc_mq_handle()
Implements Persistent_mq_handle API: Construct null handle.
bool allow_receives()
Implements Persistent_mq_handle API: Turn off preemptive/concurrent interruption of blocking-receives...
boost::movelib::unique_ptr< bipc::message_queue > m_mq
Underlying MQ handle.
bool timed_wait_sendable(util::Fine_duration timeout_from_now, Error_code *err_code=0)
Implements Persistent_mq_handle API: Like timed_send() but without the actual pushing of a message.
Bipc_mq_handle & operator=(Bipc_mq_handle &&src)
Implements Persistent_mq_handle API: Replaces handle with the source handle while making the latter i...
bool m_interrupting_rcv
Other-direction counterpart to m_interrupting_snd.
bool interrupt_sends()
Implements Persistent_mq_handle API: Turn on preemptive/concurrent interruption of blocking-sends and...
void send(const util::Blob_const &blob, Error_code *err_code=0)
Implements Persistent_mq_handle API: Blocking send: pushes copy of message to queue; if queue is full...
bool try_send(const util::Blob_const &blob, Error_code *err_code=0)
Implements Persistent_mq_handle API: Non-blocking send: pushes copy of message to queue and returns t...
bool timed_wait_receivable(util::Fine_duration timeout_from_now, Error_code *err_code=0)
Implements Persistent_mq_handle API: Like timed_receive() but without the actual popping of a message...
bool timed_receive(util::Blob_mutable *blob, util::Fine_duration timeout_from_now, Error_code *err_code=0)
Implements Persistent_mq_handle API: Blocking timed receive: pops copy of message from queue into buf...
const Shared_name & absolute_name() const
Implements Persistent_mq_handle API: Returns name equal to absolute_name passed to ctor.
static const Shared_name S_RESOURCE_TYPE_ID
Implements concept API.
bool allow_sends()
Implements Persistent_mq_handle API: Turn off preemptive/concurrent interruption of blocking-sends an...
bool try_receive(util::Blob_mutable *blob, Error_code *err_code=0)
Implements Persistent_mq_handle API: Non-blocking receive: pops copy of message from queue into buffe...
@ S_TIMED_WAIT
Timed-wait-type (blocking until timeout).
@ S_WAIT
Wait-type (blocking indefinitely).
@ S_POLL
Poll-type (non-blocking).
void wait_receivable(Error_code *err_code=0)
Implements Persistent_mq_handle API: Like receive() but without the actual popping of a message.
bool interrupt_receives()
Implements Persistent_mq_handle API: Turn on preemptive/concurrent interruption of blocking-receives ...
bool m_interrupting_snd
Starting at false, this is made true via interrupt_sends(), and back by allow_sends(); when true wait...
bool timed_send(const util::Blob_const &blob, util::Fine_duration timeout_from_now, Error_code *err_code=0)
Implements Persistent_mq_handle API: Blocking timed send: pushes copy of message to queue; if queue i...
static void remove_persistent(flow::log::Logger *logger_ptr, const Shared_name &absolute_name, Error_code *err_code=0)
Implements Persistent_mq_handle API: Removes the named persistent MQ.
void op_with_possible_bipc_mq_exception(Error_code *err_code, util::String_view context, const Func &func)
Error helper: Run func() which will perform a bipc::message_queue API that might throw; if it does em...
void receive(util::Blob_mutable *blob, Error_code *err_code=0)
Implements Persistent_mq_handle API: Blocking receive: pops copy of message from queue into buffer; i...
size_t max_n_msgs() const
Implements Persistent_mq_handle API: Returns the max message count of the underlying queue.
~Bipc_mq_handle()
Implements Persistent_mq_handle API: Destroys this handle (or no-op if no handle was successfully con...
bool interrupt_allow_impl()
Impl body for interrupt_*() and allow_*().
void wait_sendable(Error_code *err_code=0)
Implements Persistent_mq_handle API: Like send() but without the actual pushing of a message.
size_t max_msg_size() const
Implements Persistent_mq_handle API: Returns the max message size of the underlying queue.
bool wait_impl(util::Fine_duration timeout_from_now, Error_code *err_code)
Impl body for *_sendable() and *_receivable().
bool is_receivable(Error_code *err_code=0)
Implements Persistent_mq_handle API: Like try_receive() but without the actual popping of a message.
Shared_name m_absolute_name
See absolute_name().
bool is_sendable(Error_code *err_code=0)
Implements Persistent_mq_handle API: Like try_send() but without the actual pushing of a message.
String-wrapping abstraction representing a name uniquely distinguishing a kernel-persistent entity fr...
const char * native_str() const
Returns (sans copying) pointer to NUL-terminated wrapped name string, suitable to pass into sys calls...
static Shared_name ct(const Source &src)
Copy-constructs from a char-sequence container (including string, util::String_view,...
@ S_INTERRUPTED
A blocking operation was intentionally interrupted or preemptively canceled.
@ S_MQ_BIPC_MISC_LIBRARY_ERROR
Low-level message queue: boost.interprocess emitted miscellaneous library exception sans a system cod...
@ S_MQ_MESSAGE_SIZE_OVER_OR_UNDERFLOW
Low-level message queue send-op buffer overflow (> max size) or receive-op buffer underflow (< max si...
Flow-IPC module providing transmission of structured messages and/or low-level blobs (and more) betwe...
util::Shared_name Shared_name
Convenience alias for the commonly used type util::Shared_name.
std::ostream & operator<<(std::ostream &os, const Bipc_mq_handle &val)
Prints string representation of the given Bipc_mq_handle to the given ostream.
void swap(Bipc_mq_handle &val1, Bipc_mq_handle &val2)
Implements Persistent_mq_handle related concept: Swaps two objects.
bipc::permissions Permissions
Short-hand for Unix (POSIX) permissions class.
Definition: util_fwd.hpp:161
const uint8_t * blob_data(const Blob_const &blob)
Syntactic-sugary helper that returns pointer to first byte in an immutable buffer,...
Definition: util.cpp:158
bipc::open_only_t Open_only
Tag type indicating an ideally-atomic open-if-exists-else-fail operation.
Definition: util_fwd.hpp:155
const Open_or_create OPEN_OR_CREATE
Tag value indicating an open-if-exists-else-create operation.
Definition: util.cpp:30
const Open_only OPEN_ONLY
Tag value indicating an atomic open-if-exists-else-fail operation.
Definition: util.cpp:31
boost::asio::mutable_buffer Blob_mutable
Short-hand for an mutable blob somewhere in memory, stored as exactly a void* and a size_t.
Definition: util_fwd.hpp:140
bipc::open_or_create_t Open_or_create
Tag type indicating an atomic open-if-exists-else-create operation.
Definition: util_fwd.hpp:152
bipc::create_only_t Create_only
Tag type indicating a create-unless-exists-else-fail operation.
Definition: util_fwd.hpp:158
flow::Fine_time_pt Fine_time_pt
Short-hand for Flow's Fine_time_pt.
Definition: util_fwd.hpp:119
flow::Fine_duration Fine_duration
Short-hand for Flow's Fine_duration.
Definition: util_fwd.hpp:117
const Create_only CREATE_ONLY
Tag value indicating an atomic create-unless-exists-else-fail operation.
Definition: util.cpp:32
boost::asio::const_buffer Blob_const
Short-hand for an immutable blob somewhere in memory, stored as exactly a void const * and a size_t.
Definition: util_fwd.hpp:134
flow::util::String_view String_view
Short-hand for Flow's String_view.
Definition: util_fwd.hpp:115
Log_component
The flow::log::Component payload enumeration containing various log components used by Flow-IPC inter...
Definition: common.hpp:323
flow::Error_code Error_code
Short-hand for flow::Error_code which is very common.
Definition: common.hpp:298