22#include <flow/error/error.hpp>
23#include <boost/chrono/floor.hpp>
24#include <boost/array.hpp>
38std::string shared_name_to_mq_name(
const Shared_name& name);
49 m_interrupting_snd(false),
50 m_interrupting_rcv(false),
51 m_interrupter_snd(m_nb_task_engine),
52 m_interrupt_detector_snd(m_nb_task_engine),
53 m_interrupter_rcv(m_nb_task_engine),
54 m_interrupt_detector_rcv(m_nb_task_engine)
59template<
typename Mode_tag>
61 size_t max_n_msg,
size_t max_msg_sz,
63 flow::log::Log_context(logger_ptr,
Log_component::S_TRANSPORT),
64 m_absolute_name(absolute_name_arg),
65 m_interrupting_snd(false),
66 m_interrupting_rcv(false),
67 m_interrupter_snd(m_nb_task_engine),
68 m_interrupt_detector_snd(m_nb_task_engine),
69 m_interrupter_rcv(m_nb_task_engine),
70 m_interrupt_detector_rcv(m_nb_task_engine)
73 using flow::error::Runtime_error;
74 using boost::io::ios_all_saver;
75 using boost::system::system_category;
83 assert(max_n_msg >= 1);
84 assert(max_msg_sz >= 1);
86 static_assert(std::is_same_v<Mode_tag, util::Create_only> || std::is_same_v<Mode_tag, util::Open_or_create>,
87 "Can only delegate to this ctor with Mode_tag = Create_only or Open_or_create.");
88 constexpr bool CREATE_ONLY_ELSE_MAYBE = std::is_same_v<Mode_tag, util::Create_only>;
89 constexpr char const * MODE_STR = CREATE_ONLY_ELSE_MAYBE ?
"create-only" :
"open-or-create";
92 ios_all_saver saver(*(get_logger()->this_thread_ostream()));
95 (
"Posix_mq_handle [" << *
this <<
"]: Constructing MQ handle to MQ at name [" <<
absolute_name() <<
"] in "
96 "[" << MODE_STR <<
"] mode; max msg size [" << max_msg_sz <<
"] x [" << max_n_msg <<
"] msgs; "
97 "perms = [" << std::setfill(
'0') << std::setw(4) << std::oct << perms.get_permissions() <<
"].");
135 const auto mq_name = shared_name_to_mq_name(
absolute_name());
136 const auto do_mq_open_func = [&](
bool create_else_open) ->
bool
141 if (create_else_open)
144 attr.mq_maxmsg = max_n_msg;
145 attr.mq_msgsize = max_msg_sz;
147 raw = mq_open(mq_name.c_str(),
148 O_RDWR | O_CREAT | O_EXCL,
149 perms.get_permissions(),
154 raw = mq_open(mq_name.c_str(), O_RDWR);
163 sys_err_code =
Error_code(errno, system_category());
170 if (CREATE_ONLY_ELSE_MAYBE)
173 if (do_mq_open_func(
true))
183 bool success =
false;
186 if (do_mq_open_func(
true))
190 success = !sys_err_code;
194 if (sys_err_code != boost::system::errc::file_exists)
201 if (do_mq_open_func(
false))
208 if (sys_err_code != boost::system::errc::no_such_file_or_directory)
217 ios_all_saver saver(*(get_logger()->this_thread_ostream()));
219 (
"Posix_mq_handle [" << *
this <<
"]: Create-or-open algorithm encountered the rare concurrency: "
220 "MQ at name [" <<
absolute_name() <<
"] existed during the create-only mq_open() but disappeared "
221 "before we were able to complete open-only mq_open(). Retrying in spin-lock fashion. "
222 "Details: max msg size [" << max_msg_sz <<
"] x [" << max_n_msg <<
"] msgs; "
223 "perms = [" << std::setfill(
'0') << std::setw(4) << std::oct << perms.get_permissions() <<
"].");
226 sys_err_code.clear();
228 while ((!success) && (!sys_err_code));
233 sys_err_code.clear();
243 assert(sys_err_code);
261 ios_all_saver saver(*(get_logger()->this_thread_ostream()));
264 (
"Posix_mq_handle [" << *
this <<
"]: mq_open() or set_resource_permissions() error (if the latter, details "
265 "above and repeated below; otherwise error details only follow) while "
266 "constructing MQ handle to MQ at name [" <<
absolute_name() <<
"] in "
267 "create-only mode; max msg size [" << max_msg_sz <<
"] x [" << max_n_msg <<
"] msgs; "
268 "perms = [" << std::setfill(
'0') << std::setw(4) << std::oct << perms.get_permissions() <<
"].");
270 FLOW_ERROR_SYS_ERROR_LOG_WARNING();
274 *err_code = sys_err_code;
278 throw Runtime_error(sys_err_code, FLOW_UTIL_WHERE_AM_I_STR());
302 flow::log::Log_context(logger_ptr,
Log_component::S_TRANSPORT),
303 m_absolute_name(absolute_name_arg),
304 m_interrupting_snd(false),
305 m_interrupting_rcv(false),
306 m_interrupter_snd(m_nb_task_engine),
307 m_interrupt_detector_snd(m_nb_task_engine),
308 m_interrupter_rcv(m_nb_task_engine),
309 m_interrupt_detector_rcv(m_nb_task_engine)
311 using flow::log::Sev;
312 using flow::error::Runtime_error;
313 using boost::system::system_category;
318 (
"Posix_mq_handle [" << *
this <<
"]: Constructing MQ handle to MQ at name [" <<
absolute_name() <<
"] in "
328 const auto raw = mq_open(shared_name_to_mq_name(
absolute_name()).c_str(), O_RDWR);
337 (
"Posix_mq_handle [" << *
this <<
"]: mq_open() error (error details follow) while "
338 "constructing MQ handle to MQ at name [" <<
absolute_name() <<
"] in open-only mode.");
339 sys_err_code =
Error_code(errno, system_category());
346 assert(sys_err_code);
363 FLOW_ERROR_SYS_ERROR_LOG_WARNING();
367 *err_code = sys_err_code;
371 throw Runtime_error(sys_err_code, FLOW_UTIL_WHERE_AM_I_STR());
379 using boost::asio::connect_pipe;
391 (
"Posix_mq_handle [" << *
this <<
"]: Constructing MQ handle to MQ at name [" <<
absolute_name() <<
"]: "
392 "connect-pipe failed. Details follow.");
393 FLOW_ERROR_SYS_ERROR_LOG_WARNING();
406 using boost::system::system_category;
407 using ::epoll_create1;
411 using Epoll_event = ::epoll_event;
426 const auto setup = [&](
Native_handle* epoll_hndl_ptr,
bool snd_else_rcv)
428 auto& epoll_hndl = *epoll_hndl_ptr;
432 if (epoll_hndl.m_native_handle == -1)
434 FLOW_LOG_WARNING(
"Posix_mq_handle [" << *
this <<
"]: Created MQ handle fine, but epoll_create1() failed; "
436 sys_err_code =
Error_code(errno, system_category());
456 Epoll_event event_of_interest1;
457 event_of_interest1.events = snd_else_rcv ? EPOLLOUT : EPOLLIN;
459 Epoll_event event_of_interest2;
460 event_of_interest2.events = EPOLLIN;
461 event_of_interest2.data.fd = interrupt_detector.native_handle();
462 if ((epoll_ctl(epoll_hndl.m_native_handle, EPOLL_CTL_ADD, event_of_interest1.data.fd, &event_of_interest1) == -1) ||
463 (epoll_ctl(epoll_hndl.m_native_handle, EPOLL_CTL_ADD, event_of_interest2.data.fd, &event_of_interest2) == -1))
465 FLOW_LOG_WARNING(
"Posix_mq_handle [" << *
this <<
"]: Created MQ handle fine, but an epoll_ctl() failed; "
466 "snd_else_rcv = [" << snd_else_rcv <<
"]; details follow.");
467 sys_err_code =
Error_code(errno, system_category());
470 close(epoll_hndl.m_native_handle);
508 FLOW_LOG_TRACE(
"Posix_mq_handle [" << *
this <<
"]: No MQ handle to close in destructor.");
514 FLOW_LOG_TRACE(
"Posix_mq_handle [" << *
this <<
"]: Closing MQ handle (and epoll set).");
549 using flow::log::Log_context;
555 swap(
static_cast<Log_context&
>(val1),
static_cast<Log_context&
>(val2));
567 array<Native_handle, 4> fds1;
568 array<Native_handle, 4> fds2;
612 assert(
false &&
"Horrible exception."); std::abort();
622 &&
"As advertised: max_msg_size() => undefined behavior if not successfully cted or was moved-from.");
633 assert(
false &&
"mq_getattr() failed (details logged); this is too bizarre.");
638 return size_t(attr.mq_msgsize);
648 &&
"As advertised: max_n_msgs() => undefined behavior if not successfully cted or was moved-from.");
654 assert(
false &&
"mq_getattr() failed (details logged); this is too bizarre.");
657 return size_t(attr.mq_maxmsg);
662 using boost::system::system_category;
673 attr.mq_flags = nb ? O_NONBLOCK : 0;
675 err_code,
"Posix_mq_handle::set_non_blocking(): mq_setattr()");
680 using flow::util::buffers_dump_string;
688 &&
"As advertised: try_send() => undefined behavior if not successfully cted or was moved-from.");
691 FLOW_LOG_TRACE(
"Posix_mq_handle [" << *
this <<
"]: Nb-push of blob @[" <<
blob_data <<
"], "
692 "size [" << blob.size() <<
"].");
693 if (blob.size() == 0)
703 FLOW_LOG_DATA(
"Blob contents: [\n" << buffers_dump_string(blob,
" ") <<
"].");
708 blob.size(), 0) == 0)
716 FLOW_LOG_TRACE(
"Posix_mq_handle [" << *
this <<
"]: Nb-push of blob @[" <<
blob_data <<
"], "
717 "size [" << blob.size() <<
"]: would-block.");
730 using flow::util::buffers_dump_string;
734 if (flow::error::exec_void_and_throw_on_error
735 ([&](
Error_code* actual_err_code) {
send(blob, actual_err_code); },
736 err_code,
"Posix_mq_handle::send()"))
743 &&
"As advertised: send() => undefined behavior if not successfully cted or was moved-from.");
746 FLOW_LOG_TRACE(
"Posix_mq_handle [" << *
this <<
"]: Blocking-push of blob @[" <<
blob_data <<
"], "
747 "size [" << blob.size() <<
"]. Trying nb-push first; if it succeeds -- great. "
748 "Else will wait/retry/wait/retry/....");
749 if (blob.size() == 0)
756 FLOW_LOG_DATA(
"Blob contents: [\n" << buffers_dump_string(blob,
" ") <<
"].");
767 blob.size(), 0) == 0)
781 FLOW_LOG_TRACE(
"Posix_mq_handle [" << *
this <<
"]: Nb-push of blob @[" <<
blob_data <<
"], "
782 "size [" << blob.size() <<
"]: would-block. Executing blocking-wait.");
792 FLOW_LOG_TRACE(
"Blocking-wait reported transmissibility. Retrying.");
799 using flow::util::time_since_posix_epoch;
800 using flow::util::buffers_dump_string;
801 using flow::Fine_clock;
802 using boost::chrono::floor;
803 using boost::chrono::round;
804 using boost::chrono::seconds;
805 using boost::chrono::microseconds;
806 using boost::chrono::nanoseconds;
811 flow::util::bind_ns::cref(blob), timeout_from_now, _1);
817 &&
"As advertised: timed_send() => undefined behavior if not successfully cted or was moved-from.");
820 FLOW_LOG_TRACE(
"Posix_mq_handle [" << *
this <<
"]: Blocking-timed-push of blob @[" <<
blob_data <<
"], "
821 "size [" << blob.size() <<
"]; timeout ~[" << round<microseconds>(timeout_from_now) <<
"]. "
822 "Trying nb-push first; if it succeeds -- great. Else will wait/retry/wait/retry/....");
823 if (blob.size() == 0)
830 FLOW_LOG_DATA(
"Blob contents: [\n" << buffers_dump_string(blob,
" ") <<
"].");
833 auto now = Fine_clock::now();
840 blob.size(), 0) == 0)
854 FLOW_LOG_TRACE(
"Posix_mq_handle [" << *
this <<
"]: Nb-push of blob @[" <<
blob_data <<
"], "
855 "size [" << blob.size() <<
"]: would-block. Executing blocking-wait.");
857 timeout_from_now -= (after - now);
868 FLOW_LOG_TRACE(
"Did not finish before timeout.");
872 FLOW_LOG_TRACE(
"Blocking-wait reported transmissibility. Retrying.");
874 after = Fine_clock::now();
875 assert((after >= now) &&
"Fine_clock is supposed to never go backwards.");
884 using flow::util::buffers_dump_string;
892 &&
"As advertised: try_receive() => undefined behavior if not successfully cted or was moved-from.");
894 FLOW_LOG_TRACE(
"Posix_mq_handle [" << *
this <<
"]: Nb-pop to blob @[" << blob->data() <<
"], "
895 "max-size [" << blob->size() <<
"].");
898 unsigned int pri_ignored;
900 static_cast<char*
>(blob->data()),
901 blob->size(), &pri_ignored)) >= 0)
904 FLOW_LOG_TRACE(
"Received message sized [" << n_rcvd <<
"].");
905 if (blob->size() != 0)
907 FLOW_LOG_DATA(
"Blob contents: [\n" << buffers_dump_string(*blob,
" ") <<
"].");
915 FLOW_LOG_TRACE(
"Posix_mq_handle [" << *
this <<
"]: Nb-pop to blob @[" << blob->data() <<
"], "
916 "max-size [" << blob->size() <<
"]: would-block.");
930 using flow::util::buffers_dump_string;
934 if (flow::error::exec_void_and_throw_on_error
936 err_code,
"Posix_mq_handle::receive()"))
943 &&
"As advertised: receive() => undefined behavior if not successfully cted or was moved-from.");
945 FLOW_LOG_TRACE(
"Posix_mq_handle [" << *
this <<
"]: Blocking-pop to blob @[" << blob->data() <<
"], "
946 "max-size [" << blob->size() <<
"]. Trying nb-pop first; if it succeeds -- great. "
947 "Else will wait/retry/wait/retry/....");
954 unsigned int pri_ignored;
959 static_cast<char*
>(blob->data()),
960 blob->size(), &pri_ignored)) >= 0)
963 FLOW_LOG_TRACE(
"Received message sized [" << n_rcvd <<
"].");
964 if (blob->size() != 0)
966 FLOW_LOG_DATA(
"Blob contents: [\n" << buffers_dump_string(*blob,
" ") <<
"].");
980 FLOW_LOG_TRACE(
"Posix_mq_handle [" << *
this <<
"]: Nb-pop to blob @[" << blob->data() <<
"], "
981 "max-size [" << blob->size() <<
"]: would-block. Executing blocking-wait.");
991 FLOW_LOG_TRACE(
"Blocking-wait reported transmissibility. Retrying.");
999 using flow::Fine_clock;
1000 using flow::util::time_since_posix_epoch;
1001 using flow::util::buffers_dump_string;
1002 using boost::chrono::floor;
1003 using boost::chrono::round;
1004 using boost::chrono::seconds;
1005 using boost::chrono::microseconds;
1006 using boost::chrono::nanoseconds;
1011 blob, timeout_from_now, _1);
1017 &&
"As advertised: timed_receive() => undefined behavior if not successfully cted or was moved-from.");
1019 FLOW_LOG_TRACE(
"Posix_mq_handle [" << *
this <<
"]: Blocking-timed-pop to blob @[" << blob->data() <<
"], "
1020 "max-size [" << blob->size() <<
"]; timeout ~[" << round<microseconds>(timeout_from_now) <<
"]. "
1021 "Trying nb-pop first; if it succeeds -- great. Else will wait/retry/wait/retry/....");
1024 unsigned int pri_ignored;
1026 auto now = Fine_clock::now();
1032 static_cast<char*
>(blob->data()),
1033 blob->size(), &pri_ignored)) >= 0)
1036 FLOW_LOG_TRACE(
"Received message sized [" << n_rcvd <<
"].");
1037 if (blob->size() != 0)
1039 FLOW_LOG_DATA(
"Blob contents: [\n" << buffers_dump_string(*blob,
" ") <<
"].");
1045 if (errno != EAGAIN)
1053 FLOW_LOG_TRACE(
"Posix_mq_handle [" << *
this <<
"]: Nb-pop to blob @[" << blob->data() <<
"], "
1054 "max-size [" << blob->size() <<
"]: would-block. Executing blocking-wait.");
1056 timeout_from_now -= (after - now);
1067 FLOW_LOG_TRACE(
"Did not finish before timeout.");
1071 FLOW_LOG_TRACE(
"Blocking-wait reported transmissibility. Retrying.");
1073 after = Fine_clock::now();
1074 assert((after >= now) &&
"Fine_clock is supposed to never go backwards.");
1080template<
bool SND_ELSE_RCV>
1086 &&
"As advertised: interrupt_impl() => undefined behavior if not successfully cted or was moved-from.");
1089 bool* interrupting_ptr;
1090 if constexpr(SND_ELSE_RCV)
1098 auto& interrupter = *interrupter_ptr;
1099 auto& interrupting = *interrupting_ptr;
1103 FLOW_LOG_WARNING(
"Posix_mq_handle [" << *
this <<
"]: Interrupt mode already ON for "
1104 "snd_else_rcv [" << SND_ELSE_RCV <<
"]. Ignoring.");
1109 interrupting =
true;
1110 FLOW_LOG_INFO(
"Posix_mq_handle [" << *
this <<
"]: Interrupt mode turning ON for "
1111 "snd_else_rcv [" << SND_ELSE_RCV <<
"].");
1119template<
bool SND_ELSE_RCV>
1125 &&
"As advertised: allow_impl() => undefined behavior if not successfully cted or was moved-from.");
1130 bool* interrupting_ptr;
1131 if constexpr(SND_ELSE_RCV)
1139 auto& interrupt_detector = *interrupt_detector_ptr;
1140 auto& interrupting = *interrupting_ptr;
1144 FLOW_LOG_WARNING(
"Posix_mq_handle [" << *
this <<
"]: Interrupt mode already OFF for "
1145 "snd_else_rcv [" << SND_ELSE_RCV <<
"]. Ignoring.");
1150 interrupting =
false;
1151 FLOW_LOG_INFO(
"Posix_mq_handle [" << *
this <<
"]: Interrupt mode turning OFF for "
1152 "snd_else_rcv [" << SND_ELSE_RCV <<
"].");
1162 return interrupt_impl<true>();
1167 return allow_impl<true>();
1172 return interrupt_impl<false>();
1177 return allow_impl<false>();
1184 using flow::util::time_since_posix_epoch;
1185 using boost::chrono::round;
1186 using boost::chrono::milliseconds;
1187 using boost::system::system_category;
1190 using Epoll_event = ::epoll_event;
1196 &&
"As advertised: wait_impl() => undefined behavior if not successfully cted or was moved-from.");
1200 int epoll_timeout_from_now_ms;
1201 milliseconds epoll_timeout_from_now;
1202 if (timeout_from_now_or_none == Fine_duration::max())
1204 epoll_timeout_from_now_ms = -1;
1205 FLOW_LOG_TRACE(
"Posix_mq_handle [" << *
this <<
"]: Infinite-await-unstarved for "
1206 "snd_else_rcv [" << snd_else_rcv <<
"]. Will perform an epoll_wait().");
1210 epoll_timeout_from_now = round<milliseconds>(timeout_from_now_or_none);
1211 FLOW_LOG_TRACE(
"Posix_mq_handle [" << *
this <<
"]: Blocking-await/poll-unstarved for "
1212 "snd_else_rcv [" << snd_else_rcv <<
"]; timeout ~[" << epoll_timeout_from_now <<
"] -- "
1213 "if 0 then poll. Will perform an epoll_wait().");
1214 epoll_timeout_from_now_ms = int(epoll_timeout_from_now.count());
1217 array<Epoll_event, 2> evs;
1218 const auto epoll_result
1221 evs.begin(), 1, epoll_timeout_from_now_ms);
1222 if (epoll_result == -1)
1224 FLOW_LOG_WARNING(
"Posix_mq_handle [" << *
this <<
"]: epoll_wait() yielded error. Details follow.");
1226 const auto& sys_err_code = *err_code =
Error_code(errno, system_category());
1227 FLOW_ERROR_SYS_ERROR_LOG_WARNING();
1232 assert(epoll_result <= 2);
1233 if ((epoll_result == 2)
1235 ((epoll_result == 1)
1238 if (timeout_from_now_or_none == Fine_duration::max())
1240 FLOW_LOG_INFO(
"Posix_mq_handle [" << *
this <<
"]: Infinite-await-unstarved for "
1241 "snd_else_rcv [" << snd_else_rcv <<
"]: interrupted.");
1245 FLOW_LOG_INFO(
"Posix_mq_handle [" << *
this <<
"]: Blocking-await/poll-unstarved for "
1246 "snd_else_rcv [" << snd_else_rcv <<
"]; timeout ~[" << epoll_timeout_from_now <<
"] -- "
1247 "if 0 then poll: interrupted.");
1254 const bool success = epoll_result == 1;
1255 if (timeout_from_now_or_none == Fine_duration::max())
1257 FLOW_LOG_TRACE(
"Posix_mq_handle [" << *
this <<
"]: Infinite-await-unstarved for "
1258 "snd_else_rcv [" << snd_else_rcv <<
"]: succeeded? = [" << success <<
"].");
1262 FLOW_LOG_TRACE(
"Posix_mq_handle [" << *
this <<
"]: Blocking-await/poll-unstarved for "
1263 "snd_else_rcv [" << snd_else_rcv <<
"]; timeout ~[" << epoll_timeout_from_now <<
"] -- "
1264 "if 0 then poll: succeeded? = [" << success <<
"].");
1273 return wait_impl(util::Fine_duration::zero(),
true, err_code);
1278 wait_impl(util::Fine_duration::max(),
true, err_code);
1283 return wait_impl(timeout_from_now,
true, err_code);
1288 return wait_impl(util::Fine_duration::zero(),
false, err_code);
1293 wait_impl(util::Fine_duration::max(),
false, err_code);
1298 return wait_impl(timeout_from_now,
false, err_code);
1309 using boost::system::system_category;
1312 if (flow::error::exec_void_and_throw_on_error
1315 err_code,
"Posix_mq_handle::remove_persistent()"))
1321 FLOW_LOG_SET_CONTEXT(logger_ptr, Log_component::S_TRANSPORT);
1323 FLOW_LOG_INFO(
"Posix_mq @ Shared_name[" <<
absolute_name <<
"]: Removing persistent MQ if possible.");
1324 if (mq_unlink(shared_name_to_mq_name(
absolute_name).c_str()) == 0)
1331 FLOW_LOG_WARNING(
"Posix_mq @ Shared_name[" <<
absolute_name <<
"]: While removing persistent MQ:"
1332 "mq_unlink() yielded error. Details follow.");
1333 const auto& sys_err_code = *err_code =
Error_code(errno, system_category());
1334 FLOW_ERROR_SYS_ERROR_LOG_WARNING();
1339 using boost::system::system_category;
1348 FLOW_LOG_WARNING(
"Posix_mq_handle [" << *
this <<
"]: mq_*() yielded error; context = [" << context <<
"]. "
1350 const auto& sys_err_code = *err_code
1351 = (errno == EMSGSIZE)
1354 FLOW_ERROR_SYS_ERROR_LOG_WARNING();
1366 os <<
'@' << &val <<
": sh_name[" << val.
absolute_name() <<
"] native_handle[";
1368 if (native_handle.null())
1370 return os <<
"null]";
1373 return os << native_handle <<
']';
1379std::string shared_name_to_mq_name(
const Shared_name& name)
1382 std::string mq_name(
"/");
1383 return mq_name += name.
str();
Implements the Persistent_mq_handle concept by wrapping the POSIX message queue API (see man mq_overv...
bool allow_sends()
Implements Persistent_mq_handle API: Turn off preemptive/concurrent interruption of blocking-sends an...
bool interrupt_receives()
Implements Persistent_mq_handle API: Turn on preemptive/concurrent interruption of blocking-receives ...
bool interrupt_sends()
Implements Persistent_mq_handle API: Turn on preemptive/concurrent interruption of blocking-sends and...
util::Pipe_reader Pipe_reader
Short-hand for anonymous pipe read end.
void wait_receivable(Error_code *err_code=0)
Implements Persistent_mq_handle API: Like receive() but without the actual popping of a message.
static const Shared_name S_RESOURCE_TYPE_ID
Implements concept API.
bool handle_mq_api_result(int result, Error_code *err_code, util::String_view context) const
Helper that handles the result of an mq_*() call by logging WARNING(s) on error; setting *err_code on...
Pipe_reader m_interrupt_detector_snd
A byte is read from this end by allow_sends() to make it not-readable for the poll-wait in wait_impl(...
Native_handle m_epoll_hndl_snd
epoll_*() handle (.null() if and only if m_mq is null) that is level-triggered to be active (with onl...
bool is_receivable(Error_code *err_code=0)
Implements Persistent_mq_handle API: Like try_receive() but without the actual popping of a message.
bool allow_receives()
Implements Persistent_mq_handle API: Turn off preemptive/concurrent interruption of blocking-receives...
bool is_sendable(Error_code *err_code=0)
Implements Persistent_mq_handle API: Like try_send() but without the actual pushing of a message.
size_t max_msg_size() const
Implements Persistent_mq_handle API: Returns the max message size of the underlying queue.
Pipe_reader m_interrupt_detector_rcv
Other-direction counterpart to m_interrupt_detector_snd.
Error_code epoll_setup()
Ctor helper that sets up m_epoll_hndl_snd and m_epoll_hndl_rcv.
util::Pipe_writer Pipe_writer
Short-hand for anonymous pipe write end.
size_t max_n_msgs() const
Implements Persistent_mq_handle API: Returns the max message count of the underlying queue.
bool try_send(const util::Blob_const &blob, Error_code *err_code=0)
Implements Persistent_mq_handle API: Non-blocking send: pushes copy of message to queue and returns t...
Pipe_writer m_interrupter_rcv
Other-direction counterpart to m_interrupter_snd.
Native_handle native_handle() const
Implements Persistent_mq_handle API: Returns the stored native MQ handle; null if not open.
bool timed_wait_receivable(util::Fine_duration timeout_from_now, Error_code *err_code=0)
Implements Persistent_mq_handle API: Like timed_receive() but without the actual popping of a message...
bool wait_impl(util::Fine_duration timeout_from_now_or_none, bool snd_else_rcv, Error_code *err_code)
Impl body for *_sendable() and *_receivable().
friend void swap(Posix_mq_handle &val1, Posix_mq_handle &val2)
Implements Persistent_mq_handle related concept: Swaps two objects.
static void remove_persistent(flow::log::Logger *logger_ptr, const Shared_name &name, Error_code *err_code=0)
Implements Persistent_mq_handle API: Removes the named persistent MQ.
void receive(util::Blob_mutable *blob, Error_code *err_code=0)
Implements Persistent_mq_handle API: Blocking receive: pops copy of message from queue into buffer; i...
void send(const util::Blob_const &blob, Error_code *err_code=0)
Implements Persistent_mq_handle API: Blocking send: pushes copy of message to queue; if queue is full...
Posix_mq_handle()
Implements Persistent_mq_handle API: Construct null handle.
bool m_interrupting_snd
Starting at false, this is made true via interrupt_sends(), and back by allow_sends(); acts as a guar...
const Shared_name & absolute_name() const
Implements Persistent_mq_handle API: Returns name equal to absolute_name passed to ctor.
Native_handle m_epoll_hndl_rcv
epoll_*() handle (.null() if and only if m_mq is null) that is level-triggered to be active (with onl...
bool allow_impl()
Impl body for allow_*().
bool interrupt_impl()
Impl body for interrupt_*().
bool timed_wait_sendable(util::Fine_duration timeout_from_now, Error_code *err_code=0)
Implements Persistent_mq_handle API: Like timed_send() but without the actual pushing of a message.
bool timed_receive(util::Blob_mutable *blob, util::Fine_duration timeout_from_now, Error_code *err_code=0)
Implements Persistent_mq_handle API: Blocking timed receive: pops copy of message from queue into buf...
~Posix_mq_handle()
Implements Persistent_mq_handle API: Destroys this handle (or no-op if no handle was successfully con...
bool set_non_blocking(bool nb, Error_code *err_code)
Sets m_mq to blocking or non-blocking and returns true on success and clears *err_code; otherwise ret...
Shared_name m_absolute_name
See absolute_name().
bool timed_send(const util::Blob_const &blob, util::Fine_duration timeout_from_now, Error_code *err_code=0)
Implements Persistent_mq_handle API: Blocking timed send: pushes copy of message to queue; if queue i...
bool try_receive(util::Blob_mutable *blob, Error_code *err_code=0)
Implements Persistent_mq_handle API: Non-blocking receive: pops copy of message from queue into buffe...
Error_code pipe_setup()
Ctor helper that sets up m_interrupt* pipe items.
void wait_sendable(Error_code *err_code=0)
Implements Persistent_mq_handle API: Like send() but without the actual pushing of a message.
bool m_interrupting_rcv
Other-direction counterpart to m_interrupting_snd.
Pipe_writer m_interrupter_snd
A byte is written to this end by interrupt_sends() to make it readable for the poll-wait in wait_impl...
Native_handle m_mq
Underlying MQ handle.
flow::util::Task_engine m_nb_task_engine
Never used for .run() or .async() – just so we can construct Pipe_reader, Pipe_writer.
Posix_mq_handle & operator=(Posix_mq_handle &&src)
Implements Persistent_mq_handle API: Replaces handle with the source handle while making the latter i...
String-wrapping abstraction representing a name uniquely distinguishing a kernel-persistent entity fr...
static Shared_name ct(const Source &src)
Copy-constructs from a char-sequence container (including string, util::String_view,...
void clear()
Makes it so empty() == true.
const std::string & str() const
Returns (sans copying) ref to immutable entire wrapped name string, suitable to pass into sys calls w...
@ S_INTERRUPTED
A blocking operation was intentionally interrupted or preemptively canceled.
@ S_MQ_MESSAGE_SIZE_OVER_OR_UNDERFLOW
Low-level message queue send-op buffer overflow (> max size) or receive-op buffer underflow (< max si...
Flow-IPC module providing transmission of structured messages and/or low-level blobs (and more) betwe...
util::Shared_name Shared_name
Convenience alias for the commonly used type util::Shared_name.
std::ostream & operator<<(std::ostream &os, const Bipc_mq_handle &val)
Prints string representation of the given Bipc_mq_handle to the given ostream.
void swap(Posix_mq_handle &val1, Posix_mq_handle &val2)
Implements Persistent_mq_handle related concept: Swaps two objects.
util::Native_handle Native_handle
Convenience alias for the commonly used type util::Native_handle.
void swap(Bipc_mq_handle &val1, Bipc_mq_handle &val2)
Implements Persistent_mq_handle related concept: Swaps two objects.
bipc::permissions Permissions
Short-hand for Unix (POSIX) permissions class.
const uint8_t * blob_data(const Blob_const &blob)
Syntactic-sugary helper that returns pointer to first byte in an immutable buffer,...
bipc::open_only_t Open_only
Tag type indicating an ideally-atomic open-if-exists-else-fail operation.
void pipe_produce(flow::log::Logger *logger_ptr, Pipe_writer *pipe)
Writes a byte to the given pipe writer.
const Open_or_create OPEN_OR_CREATE
Tag value indicating an open-if-exists-else-create operation.
boost::asio::mutable_buffer Blob_mutable
Short-hand for an mutable blob somewhere in memory, stored as exactly a void* and a size_t.
void pipe_consume(flow::log::Logger *logger_ptr, Pipe_reader *pipe)
Reads a byte via the given pipe reader.
boost::asio::writable_pipe Pipe_writer
Short-hand for anonymous pipe write end.
bipc::open_or_create_t Open_or_create
Tag type indicating an atomic open-if-exists-else-create operation.
bipc::create_only_t Create_only
Tag type indicating a create-unless-exists-else-fail operation.
flow::Fine_time_pt Fine_time_pt
Short-hand for Flow's Fine_time_pt.
boost::asio::readable_pipe Pipe_reader
Short-hand for anonymous pipe read end.
void set_resource_permissions(flow::log::Logger *logger_ptr, const fs::path &path, const Permissions &perms, Error_code *err_code)
Utility that sets the permissions of the given resource (at the supplied file system path) to specifi...
flow::Fine_duration Fine_duration
Short-hand for Flow's Fine_duration.
const Create_only CREATE_ONLY
Tag value indicating an atomic create-unless-exists-else-fail operation.
boost::asio::const_buffer Blob_const
Short-hand for an immutable blob somewhere in memory, stored as exactly a void const * and a size_t.
flow::util::String_view String_view
Short-hand for Flow's String_view.
Log_component
The flow::log::Component payload enumeration containing various log components used by Flow-IPC inter...
flow::Error_code Error_code
Short-hand for flow::Error_code which is very common.
A monolayer-thin wrapper around a native handle, a/k/a descriptor a/k/a FD.
bool null() const
Returns true if and only if m_native_handle equals S_NULL_HANDLE.
handle_t m_native_handle
The native handle (possibly equal to S_NULL_HANDLE), the exact payload of this Native_handle.