22#include <flow/error/error.hpp>
23#include <boost/chrono/floor.hpp>
24#include <boost/array.hpp>
38std::string shared_name_to_mq_name(
const Shared_name& name);
49 m_interrupting_snd(false),
50 m_interrupting_rcv(false),
51 m_interrupter_snd(m_nb_task_engine),
52 m_interrupt_detector_snd(m_nb_task_engine),
53 m_interrupter_rcv(m_nb_task_engine),
54 m_interrupt_detector_rcv(m_nb_task_engine)
59template<
typename Mode_tag>
61 size_t max_n_msg,
size_t max_msg_sz,
63 flow::log::Log_context(logger_ptr,
Log_component::S_TRANSPORT),
64 m_absolute_name(absolute_name_arg),
65 m_interrupting_snd(false),
66 m_interrupting_rcv(false),
67 m_interrupter_snd(m_nb_task_engine),
68 m_interrupt_detector_snd(m_nb_task_engine),
69 m_interrupter_rcv(m_nb_task_engine),
70 m_interrupt_detector_rcv(m_nb_task_engine)
73 using flow::error::Runtime_error;
75 using boost::io::ios_all_saver;
76 using boost::system::system_category;
84 assert(max_n_msg >= 1);
85 assert(max_msg_sz >= 1);
87 static_assert(std::is_same_v<Mode_tag, util::Create_only> || std::is_same_v<Mode_tag, util::Open_or_create>,
88 "Can only delegate to this ctor with Mode_tag = Create_only or Open_or_create.");
89 constexpr bool CREATE_ONLY_ELSE_MAYBE = std::is_same_v<Mode_tag, util::Create_only>;
90 constexpr char const * MODE_STR = CREATE_ONLY_ELSE_MAYBE ?
"create-only" :
"open-or-create";
92 if (logger_ptr && logger_ptr->should_log(Sev::S_TRACE, get_log_component()))
94 ios_all_saver saver{*(logger_ptr->this_thread_ostream())};
96 FLOW_LOG_TRACE_WITHOUT_CHECKING
97 (
"Posix_mq_handle [" << *
this <<
"]: Constructing MQ handle to MQ at name [" <<
absolute_name() <<
"] in "
98 "[" << MODE_STR <<
"] mode; max msg size [" << max_msg_sz <<
"] x [" << max_n_msg <<
"] msgs; "
99 "perms = [" << std::setfill(
'0') << std::setw(4) << std::oct << perms.get_permissions() <<
"].");
137 const auto mq_name = shared_name_to_mq_name(
absolute_name());
138 const auto do_mq_open_func = [&](
bool create_else_open) ->
bool
143 if (create_else_open)
146 attr.mq_maxmsg = max_n_msg;
147 attr.mq_msgsize = max_msg_sz;
149 raw = mq_open(mq_name.c_str(),
150 O_RDWR | O_CREAT | O_EXCL,
151 perms.get_permissions(),
156 raw = mq_open(mq_name.c_str(), O_RDWR);
165 sys_err_code =
Error_code(errno, system_category());
172 if (CREATE_ONLY_ELSE_MAYBE)
175 if (do_mq_open_func(
true))
185 bool success =
false;
188 if (do_mq_open_func(
true))
192 success = !sys_err_code;
196 if (sys_err_code != boost::system::errc::file_exists)
203 if (do_mq_open_func(
false))
210 if (sys_err_code != boost::system::errc::no_such_file_or_directory)
218 if (logger_ptr && logger_ptr->should_log(Sev::S_INFO, get_log_component()))
220 ios_all_saver saver{*(logger_ptr->this_thread_ostream())};
221 FLOW_LOG_INFO_WITHOUT_CHECKING
222 (
"Posix_mq_handle [" << *
this <<
"]: Create-or-open algorithm encountered the rare concurrency: "
223 "MQ at name [" <<
absolute_name() <<
"] existed during the create-only mq_open() but disappeared "
224 "before we were able to complete open-only mq_open(). Retrying in spin-lock fashion. "
225 "Details: max msg size [" << max_msg_sz <<
"] x [" << max_n_msg <<
"] msgs; "
226 "perms = [" << std::setfill(
'0') << std::setw(4) << std::oct << perms.get_permissions() <<
"].");
229 sys_err_code.clear();
231 while ((!success) && (!sys_err_code));
236 sys_err_code.clear();
246 assert(sys_err_code);
263 if (logger_ptr && logger_ptr->should_log(Sev::S_WARNING, get_log_component()))
265 ios_all_saver saver{*(logger_ptr->this_thread_ostream())};
267 FLOW_LOG_WARNING_WITHOUT_CHECKING
268 (
"Posix_mq_handle [" << *
this <<
"]: mq_open() or set_resource_permissions() error (if the latter, details "
269 "above and repeated below; otherwise error details only follow) while "
270 "constructing MQ handle to MQ at name [" <<
absolute_name() <<
"] in "
271 "create-only mode; max msg size [" << max_msg_sz <<
"] x [" << max_n_msg <<
"] msgs; "
272 "perms = [" << std::setfill(
'0') << std::setw(4) << std::oct << perms.get_permissions() <<
"].");
274 FLOW_ERROR_SYS_ERROR_LOG_WARNING();
278 *err_code = sys_err_code;
282 throw Runtime_error(sys_err_code, FLOW_UTIL_WHERE_AM_I_STR());
306 flow::log::Log_context(logger_ptr,
Log_component::S_TRANSPORT),
307 m_absolute_name(absolute_name_arg),
308 m_interrupting_snd(false),
309 m_interrupting_rcv(false),
310 m_interrupter_snd(m_nb_task_engine),
311 m_interrupt_detector_snd(m_nb_task_engine),
312 m_interrupter_rcv(m_nb_task_engine),
313 m_interrupt_detector_rcv(m_nb_task_engine)
315 using flow::log::Sev;
316 using flow::error::Runtime_error;
317 using boost::system::system_category;
322 (
"Posix_mq_handle [" << *
this <<
"]: Constructing MQ handle to MQ at name [" <<
absolute_name() <<
"] in "
332 const auto raw = mq_open(shared_name_to_mq_name(
absolute_name()).c_str(), O_RDWR);
341 (
"Posix_mq_handle [" << *
this <<
"]: mq_open() error (error details follow) while "
342 "constructing MQ handle to MQ at name [" <<
absolute_name() <<
"] in open-only mode.");
343 sys_err_code =
Error_code(errno, system_category());
350 assert(sys_err_code);
367 FLOW_ERROR_SYS_ERROR_LOG_WARNING();
371 *err_code = sys_err_code;
375 throw Runtime_error(sys_err_code, FLOW_UTIL_WHERE_AM_I_STR());
383 using boost::asio::connect_pipe;
395 (
"Posix_mq_handle [" << *
this <<
"]: Constructing MQ handle to MQ at name [" <<
absolute_name() <<
"]: "
396 "connect-pipe failed. Details follow.");
397 FLOW_ERROR_SYS_ERROR_LOG_WARNING();
410 using boost::system::system_category;
411 using ::epoll_create1;
415 using Epoll_event = ::epoll_event;
430 const auto setup = [&](
Native_handle* epoll_hndl_ptr,
bool snd_else_rcv)
432 auto& epoll_hndl = *epoll_hndl_ptr;
436 if (epoll_hndl.m_native_handle == -1)
438 FLOW_LOG_WARNING(
"Posix_mq_handle [" << *
this <<
"]: Created MQ handle fine, but epoll_create1() failed; "
440 sys_err_code =
Error_code(errno, system_category());
460 Epoll_event event_of_interest1;
461 event_of_interest1.events = snd_else_rcv ? EPOLLOUT : EPOLLIN;
463 Epoll_event event_of_interest2;
464 event_of_interest2.events = EPOLLIN;
465 event_of_interest2.data.fd = interrupt_detector.native_handle();
466 if ((epoll_ctl(epoll_hndl.m_native_handle, EPOLL_CTL_ADD, event_of_interest1.data.fd, &event_of_interest1) == -1) ||
467 (epoll_ctl(epoll_hndl.m_native_handle, EPOLL_CTL_ADD, event_of_interest2.data.fd, &event_of_interest2) == -1))
469 FLOW_LOG_WARNING(
"Posix_mq_handle [" << *
this <<
"]: Created MQ handle fine, but an epoll_ctl() failed; "
470 "snd_else_rcv = [" << snd_else_rcv <<
"]; details follow.");
471 sys_err_code =
Error_code(errno, system_category());
474 close(epoll_hndl.m_native_handle);
512 FLOW_LOG_TRACE(
"Posix_mq_handle [" << *
this <<
"]: No MQ handle to close in destructor.");
518 FLOW_LOG_TRACE(
"Posix_mq_handle [" << *
this <<
"]: Closing MQ handle (and epoll set).");
553 using flow::log::Log_context;
559 swap(
static_cast<Log_context&
>(val1),
static_cast<Log_context&
>(val2));
571 array<Native_handle, 4> fds1;
572 array<Native_handle, 4> fds2;
616 assert(
false &&
"Horrible exception."); std::abort();
626 &&
"As advertised: max_msg_size() => undefined behavior if not successfully cted or was moved-from.");
637 assert(
false &&
"mq_getattr() failed (details logged); this is too bizarre.");
642 return size_t(attr.mq_msgsize);
652 &&
"As advertised: max_n_msgs() => undefined behavior if not successfully cted or was moved-from.");
658 assert(
false &&
"mq_getattr() failed (details logged); this is too bizarre.");
661 return size_t(attr.mq_maxmsg);
666 using boost::system::system_category;
677 attr.mq_flags = nb ? O_NONBLOCK : 0;
679 err_code,
"Posix_mq_handle::set_non_blocking(): mq_setattr()");
684 using flow::util::buffers_dump_string;
688 FLOW_ERROR_EXEC_AND_THROW_ON_ERROR(
bool,
try_send, blob, _1);
692 &&
"As advertised: try_send() => undefined behavior if not successfully cted or was moved-from.");
695 FLOW_LOG_TRACE(
"Posix_mq_handle [" << *
this <<
"]: Nb-push of blob @[" <<
blob_data <<
"], "
696 "size [" << blob.size() <<
"].");
697 if (blob.size() == 0)
707 FLOW_LOG_DATA(
"Blob contents: [\n" << buffers_dump_string(blob,
" ") <<
"].");
712 blob.size(), 0) == 0)
720 FLOW_LOG_TRACE(
"Posix_mq_handle [" << *
this <<
"]: Nb-push of blob @[" <<
blob_data <<
"], "
721 "size [" << blob.size() <<
"]: would-block.");
734 using flow::util::buffers_dump_string;
738 if (flow::error::exec_void_and_throw_on_error
739 ([&](
Error_code* actual_err_code) {
send(blob, actual_err_code); },
740 err_code,
"Posix_mq_handle::send()"))
747 &&
"As advertised: send() => undefined behavior if not successfully cted or was moved-from.");
750 FLOW_LOG_TRACE(
"Posix_mq_handle [" << *
this <<
"]: Blocking-push of blob @[" <<
blob_data <<
"], "
751 "size [" << blob.size() <<
"]. Trying nb-push first; if it succeeds -- great. "
752 "Else will wait/retry/wait/retry/....");
753 if (blob.size() == 0)
760 FLOW_LOG_DATA(
"Blob contents: [\n" << buffers_dump_string(blob,
" ") <<
"].");
771 blob.size(), 0) == 0)
785 FLOW_LOG_TRACE(
"Posix_mq_handle [" << *
this <<
"]: Nb-push of blob @[" <<
blob_data <<
"], "
786 "size [" << blob.size() <<
"]: would-block. Executing blocking-wait.");
796 FLOW_LOG_TRACE(
"Blocking-wait reported transmissibility. Retrying.");
803 using flow::util::time_since_posix_epoch;
804 using flow::util::buffers_dump_string;
805 using flow::Fine_clock;
806 using boost::chrono::floor;
807 using boost::chrono::round;
808 using boost::chrono::seconds;
809 using boost::chrono::microseconds;
810 using boost::chrono::nanoseconds;
814 FLOW_ERROR_EXEC_AND_THROW_ON_ERROR(
bool,
timed_send, blob, timeout_from_now, _1);
820 &&
"As advertised: timed_send() => undefined behavior if not successfully cted or was moved-from.");
823 FLOW_LOG_TRACE(
"Posix_mq_handle [" << *
this <<
"]: Blocking-timed-push of blob @[" <<
blob_data <<
"], "
824 "size [" << blob.size() <<
"]; timeout ~[" << round<microseconds>(timeout_from_now) <<
"]. "
825 "Trying nb-push first; if it succeeds -- great. Else will wait/retry/wait/retry/....");
826 if (blob.size() == 0)
833 FLOW_LOG_DATA(
"Blob contents: [\n" << buffers_dump_string(blob,
" ") <<
"].");
836 auto now = Fine_clock::now();
843 blob.size(), 0) == 0)
857 FLOW_LOG_TRACE(
"Posix_mq_handle [" << *
this <<
"]: Nb-push of blob @[" <<
blob_data <<
"], "
858 "size [" << blob.size() <<
"]: would-block. Executing blocking-wait.");
860 timeout_from_now -= (after - now);
871 FLOW_LOG_TRACE(
"Did not finish before timeout.");
875 FLOW_LOG_TRACE(
"Blocking-wait reported transmissibility. Retrying.");
877 after = Fine_clock::now();
878 assert((after >= now) &&
"Fine_clock is supposed to never go backwards.");
887 using flow::util::buffers_dump_string;
891 FLOW_ERROR_EXEC_AND_THROW_ON_ERROR(
bool,
try_receive, blob, _1);
895 &&
"As advertised: try_receive() => undefined behavior if not successfully cted or was moved-from.");
897 FLOW_LOG_TRACE(
"Posix_mq_handle [" << *
this <<
"]: Nb-pop to blob @[" << blob->data() <<
"], "
898 "max-size [" << blob->size() <<
"].");
901 unsigned int pri_ignored;
903 static_cast<char*
>(blob->data()),
904 blob->size(), &pri_ignored)) >= 0)
907 FLOW_LOG_TRACE(
"Received message sized [" << n_rcvd <<
"].");
908 if (blob->size() != 0)
910 FLOW_LOG_DATA(
"Blob contents: [\n" << buffers_dump_string(*blob,
" ") <<
"].");
918 FLOW_LOG_TRACE(
"Posix_mq_handle [" << *
this <<
"]: Nb-pop to blob @[" << blob->data() <<
"], "
919 "max-size [" << blob->size() <<
"]: would-block.");
933 using flow::util::buffers_dump_string;
937 if (flow::error::exec_void_and_throw_on_error
939 err_code,
"Posix_mq_handle::receive()"))
946 &&
"As advertised: receive() => undefined behavior if not successfully cted or was moved-from.");
948 FLOW_LOG_TRACE(
"Posix_mq_handle [" << *
this <<
"]: Blocking-pop to blob @[" << blob->data() <<
"], "
949 "max-size [" << blob->size() <<
"]. Trying nb-pop first; if it succeeds -- great. "
950 "Else will wait/retry/wait/retry/....");
957 unsigned int pri_ignored;
962 static_cast<char*
>(blob->data()),
963 blob->size(), &pri_ignored)) >= 0)
966 FLOW_LOG_TRACE(
"Received message sized [" << n_rcvd <<
"].");
967 if (blob->size() != 0)
969 FLOW_LOG_DATA(
"Blob contents: [\n" << buffers_dump_string(*blob,
" ") <<
"].");
983 FLOW_LOG_TRACE(
"Posix_mq_handle [" << *
this <<
"]: Nb-pop to blob @[" << blob->data() <<
"], "
984 "max-size [" << blob->size() <<
"]: would-block. Executing blocking-wait.");
994 FLOW_LOG_TRACE(
"Blocking-wait reported transmissibility. Retrying.");
1002 using flow::Fine_clock;
1003 using flow::util::time_since_posix_epoch;
1004 using flow::util::buffers_dump_string;
1005 using boost::chrono::floor;
1006 using boost::chrono::round;
1007 using boost::chrono::seconds;
1008 using boost::chrono::microseconds;
1009 using boost::chrono::nanoseconds;
1013 FLOW_ERROR_EXEC_AND_THROW_ON_ERROR(
bool,
timed_receive, blob, timeout_from_now, _1);
1019 &&
"As advertised: timed_receive() => undefined behavior if not successfully cted or was moved-from.");
1021 FLOW_LOG_TRACE(
"Posix_mq_handle [" << *
this <<
"]: Blocking-timed-pop to blob @[" << blob->data() <<
"], "
1022 "max-size [" << blob->size() <<
"]; timeout ~[" << round<microseconds>(timeout_from_now) <<
"]. "
1023 "Trying nb-pop first; if it succeeds -- great. Else will wait/retry/wait/retry/....");
1026 unsigned int pri_ignored;
1028 auto now = Fine_clock::now();
1034 static_cast<char*
>(blob->data()),
1035 blob->size(), &pri_ignored)) >= 0)
1038 FLOW_LOG_TRACE(
"Received message sized [" << n_rcvd <<
"].");
1039 if (blob->size() != 0)
1041 FLOW_LOG_DATA(
"Blob contents: [\n" << buffers_dump_string(*blob,
" ") <<
"].");
1047 if (errno != EAGAIN)
1055 FLOW_LOG_TRACE(
"Posix_mq_handle [" << *
this <<
"]: Nb-pop to blob @[" << blob->data() <<
"], "
1056 "max-size [" << blob->size() <<
"]: would-block. Executing blocking-wait.");
1058 timeout_from_now -= (after - now);
1069 FLOW_LOG_TRACE(
"Did not finish before timeout.");
1073 FLOW_LOG_TRACE(
"Blocking-wait reported transmissibility. Retrying.");
1075 after = Fine_clock::now();
1076 assert((after >= now) &&
"Fine_clock is supposed to never go backwards.");
1082template<
bool SND_ELSE_RCV>
1088 &&
"As advertised: interrupt_impl() => undefined behavior if not successfully cted or was moved-from.");
1091 bool* interrupting_ptr;
1092 if constexpr(SND_ELSE_RCV)
1100 auto& interrupter = *interrupter_ptr;
1101 auto& interrupting = *interrupting_ptr;
1105 FLOW_LOG_WARNING(
"Posix_mq_handle [" << *
this <<
"]: Interrupt mode already ON for "
1106 "snd_else_rcv [" << SND_ELSE_RCV <<
"]. Ignoring.");
1111 interrupting =
true;
1112 FLOW_LOG_INFO(
"Posix_mq_handle [" << *
this <<
"]: Interrupt mode turning ON for "
1113 "snd_else_rcv [" << SND_ELSE_RCV <<
"].");
1121template<
bool SND_ELSE_RCV>
1127 &&
"As advertised: allow_impl() => undefined behavior if not successfully cted or was moved-from.");
1132 bool* interrupting_ptr;
1133 if constexpr(SND_ELSE_RCV)
1141 auto& interrupt_detector = *interrupt_detector_ptr;
1142 auto& interrupting = *interrupting_ptr;
1146 FLOW_LOG_WARNING(
"Posix_mq_handle [" << *
this <<
"]: Interrupt mode already OFF for "
1147 "snd_else_rcv [" << SND_ELSE_RCV <<
"]. Ignoring.");
1152 interrupting =
false;
1153 FLOW_LOG_INFO(
"Posix_mq_handle [" << *
this <<
"]: Interrupt mode turning OFF for "
1154 "snd_else_rcv [" << SND_ELSE_RCV <<
"].");
1164 return interrupt_impl<true>();
1169 return allow_impl<true>();
1174 return interrupt_impl<false>();
1179 return allow_impl<false>();
1186 using flow::util::time_since_posix_epoch;
1187 using boost::chrono::round;
1188 using boost::chrono::milliseconds;
1189 using boost::system::system_category;
1192 using Epoll_event = ::epoll_event;
1194 FLOW_ERROR_EXEC_AND_THROW_ON_ERROR(
bool,
wait_impl, timeout_from_now_or_none, snd_else_rcv, _1);
1198 &&
"As advertised: wait_impl() => undefined behavior if not successfully cted or was moved-from.");
1202 int epoll_timeout_from_now_ms;
1203 milliseconds epoll_timeout_from_now;
1204 if (timeout_from_now_or_none == Fine_duration::max())
1206 epoll_timeout_from_now_ms = -1;
1207 FLOW_LOG_TRACE(
"Posix_mq_handle [" << *
this <<
"]: Infinite-await-unstarved for "
1208 "snd_else_rcv [" << snd_else_rcv <<
"]. Will perform an epoll_wait().");
1212 epoll_timeout_from_now = round<milliseconds>(timeout_from_now_or_none);
1213 FLOW_LOG_TRACE(
"Posix_mq_handle [" << *
this <<
"]: Blocking-await/poll-unstarved for "
1214 "snd_else_rcv [" << snd_else_rcv <<
"]; timeout ~[" << epoll_timeout_from_now <<
"] -- "
1215 "if 0 then poll. Will perform an epoll_wait().");
1216 epoll_timeout_from_now_ms = int(epoll_timeout_from_now.count());
1219 array<Epoll_event, 2> evs;
1220 const auto epoll_result
1223 evs.begin(), 1, epoll_timeout_from_now_ms);
1224 if (epoll_result == -1)
1226 FLOW_LOG_WARNING(
"Posix_mq_handle [" << *
this <<
"]: epoll_wait() yielded error. Details follow.");
1228 const auto& sys_err_code = *err_code =
Error_code(errno, system_category());
1229 FLOW_ERROR_SYS_ERROR_LOG_WARNING();
1234 assert(epoll_result <= 2);
1235 if ((epoll_result == 2)
1237 ((epoll_result == 1)
1240 if (timeout_from_now_or_none == Fine_duration::max())
1242 FLOW_LOG_INFO(
"Posix_mq_handle [" << *
this <<
"]: Infinite-await-unstarved for "
1243 "snd_else_rcv [" << snd_else_rcv <<
"]: interrupted.");
1247 FLOW_LOG_INFO(
"Posix_mq_handle [" << *
this <<
"]: Blocking-await/poll-unstarved for "
1248 "snd_else_rcv [" << snd_else_rcv <<
"]; timeout ~[" << epoll_timeout_from_now <<
"] -- "
1249 "if 0 then poll: interrupted.");
1256 const bool success = epoll_result == 1;
1257 if (timeout_from_now_or_none == Fine_duration::max())
1259 FLOW_LOG_TRACE(
"Posix_mq_handle [" << *
this <<
"]: Infinite-await-unstarved for "
1260 "snd_else_rcv [" << snd_else_rcv <<
"]: succeeded? = [" << success <<
"].");
1264 FLOW_LOG_TRACE(
"Posix_mq_handle [" << *
this <<
"]: Blocking-await/poll-unstarved for "
1265 "snd_else_rcv [" << snd_else_rcv <<
"]; timeout ~[" << epoll_timeout_from_now <<
"] -- "
1266 "if 0 then poll: succeeded? = [" << success <<
"].");
1275 return wait_impl(util::Fine_duration::zero(),
true, err_code);
1280 wait_impl(util::Fine_duration::max(),
true, err_code);
1285 return wait_impl(timeout_from_now,
true, err_code);
1290 return wait_impl(util::Fine_duration::zero(),
false, err_code);
1295 wait_impl(util::Fine_duration::max(),
false, err_code);
1300 return wait_impl(timeout_from_now,
false, err_code);
1311 using boost::system::system_category;
1314 if (flow::error::exec_void_and_throw_on_error
1317 err_code,
"Posix_mq_handle::remove_persistent()"))
1323 FLOW_LOG_SET_CONTEXT(logger_ptr, Log_component::S_TRANSPORT);
1325 FLOW_LOG_INFO(
"Posix_mq @ Shared_name[" <<
absolute_name <<
"]: Removing persistent MQ if possible.");
1326 if (mq_unlink(shared_name_to_mq_name(
absolute_name).c_str()) == 0)
1333 FLOW_LOG_WARNING(
"Posix_mq @ Shared_name[" <<
absolute_name <<
"]: While removing persistent MQ:"
1334 "mq_unlink() yielded error. Details follow.");
1335 const auto& sys_err_code = *err_code =
Error_code(errno, system_category());
1336 FLOW_ERROR_SYS_ERROR_LOG_WARNING();
1341 using boost::system::system_category;
1350 FLOW_LOG_WARNING(
"Posix_mq_handle [" << *
this <<
"]: mq_*() yielded error; context = [" << context <<
"]. "
1352 const auto& sys_err_code = *err_code
1353 = (errno == EMSGSIZE)
1356 FLOW_ERROR_SYS_ERROR_LOG_WARNING();
1368 os <<
'@' << &val <<
": sh_name[" << val.
absolute_name() <<
"] native_handle[";
1370 if (native_handle.null())
1372 return os <<
"null]";
1375 return os << native_handle <<
']';
1381std::string shared_name_to_mq_name(
const Shared_name& name)
1384 std::string mq_name(
"/");
1385 return mq_name += name.
str();
Implements the Persistent_mq_handle concept by wrapping the POSIX message queue API (see man mq_overv...
bool allow_sends()
Implements Persistent_mq_handle API: Turn off preemptive/concurrent interruption of blocking-sends an...
bool interrupt_receives()
Implements Persistent_mq_handle API: Turn on preemptive/concurrent interruption of blocking-receives ...
bool interrupt_sends()
Implements Persistent_mq_handle API: Turn on preemptive/concurrent interruption of blocking-sends and...
util::Pipe_reader Pipe_reader
Short-hand for anonymous pipe read end.
void wait_receivable(Error_code *err_code=0)
Implements Persistent_mq_handle API: Like receive() but without the actual popping of a message.
static const Shared_name S_RESOURCE_TYPE_ID
Implements concept API.
bool handle_mq_api_result(int result, Error_code *err_code, util::String_view context) const
Helper that handles the result of an mq_*() call by logging WARNING(s) on error; setting *err_code on...
Pipe_reader m_interrupt_detector_snd
A byte is read from this end by allow_sends() to make it not-readable for the poll-wait in wait_impl(...
Native_handle m_epoll_hndl_snd
epoll_*() handle (.null() if and only if m_mq is null) that is level-triggered to be active (with onl...
bool is_receivable(Error_code *err_code=0)
Implements Persistent_mq_handle API: Like try_receive() but without the actual popping of a message.
bool allow_receives()
Implements Persistent_mq_handle API: Turn off preemptive/concurrent interruption of blocking-receives...
bool is_sendable(Error_code *err_code=0)
Implements Persistent_mq_handle API: Like try_send() but without the actual pushing of a message.
size_t max_msg_size() const
Implements Persistent_mq_handle API: Returns the max message size of the underlying queue.
Pipe_reader m_interrupt_detector_rcv
Other-direction counterpart to m_interrupt_detector_snd.
Error_code epoll_setup()
Ctor helper that sets up m_epoll_hndl_snd and m_epoll_hndl_rcv.
util::Pipe_writer Pipe_writer
Short-hand for anonymous pipe write end.
size_t max_n_msgs() const
Implements Persistent_mq_handle API: Returns the max message count of the underlying queue.
bool try_send(const util::Blob_const &blob, Error_code *err_code=0)
Implements Persistent_mq_handle API: Non-blocking send: pushes copy of message to queue and returns t...
Pipe_writer m_interrupter_rcv
Other-direction counterpart to m_interrupter_snd.
Native_handle native_handle() const
Implements Persistent_mq_handle API: Returns the stored native MQ handle; null if not open.
bool timed_wait_receivable(util::Fine_duration timeout_from_now, Error_code *err_code=0)
Implements Persistent_mq_handle API: Like timed_receive() but without the actual popping of a message...
bool wait_impl(util::Fine_duration timeout_from_now_or_none, bool snd_else_rcv, Error_code *err_code)
Impl body for *_sendable() and *_receivable().
friend void swap(Posix_mq_handle &val1, Posix_mq_handle &val2)
Implements Persistent_mq_handle related concept: Swaps two objects.
static void remove_persistent(flow::log::Logger *logger_ptr, const Shared_name &name, Error_code *err_code=0)
Implements Persistent_mq_handle API: Removes the named persistent MQ.
void receive(util::Blob_mutable *blob, Error_code *err_code=0)
Implements Persistent_mq_handle API: Blocking receive: pops copy of message from queue into buffer; i...
void send(const util::Blob_const &blob, Error_code *err_code=0)
Implements Persistent_mq_handle API: Blocking send: pushes copy of message to queue; if queue is full...
Posix_mq_handle()
Implements Persistent_mq_handle API: Construct null handle.
bool m_interrupting_snd
Starting at false, this is made true via interrupt_sends(), and back by allow_sends(); acts as a guar...
const Shared_name & absolute_name() const
Implements Persistent_mq_handle API: Returns name equal to absolute_name passed to ctor.
Native_handle m_epoll_hndl_rcv
epoll_*() handle (.null() if and only if m_mq is null) that is level-triggered to be active (with onl...
bool allow_impl()
Impl body for allow_*().
bool interrupt_impl()
Impl body for interrupt_*().
bool timed_wait_sendable(util::Fine_duration timeout_from_now, Error_code *err_code=0)
Implements Persistent_mq_handle API: Like timed_send() but without the actual pushing of a message.
bool timed_receive(util::Blob_mutable *blob, util::Fine_duration timeout_from_now, Error_code *err_code=0)
Implements Persistent_mq_handle API: Blocking timed receive: pops copy of message from queue into buf...
~Posix_mq_handle()
Implements Persistent_mq_handle API: Destroys this handle (or no-op if no handle was successfully con...
bool set_non_blocking(bool nb, Error_code *err_code)
Sets m_mq to blocking or non-blocking and returns true on success and clears *err_code; otherwise ret...
Shared_name m_absolute_name
See absolute_name().
bool timed_send(const util::Blob_const &blob, util::Fine_duration timeout_from_now, Error_code *err_code=0)
Implements Persistent_mq_handle API: Blocking timed send: pushes copy of message to queue; if queue i...
bool try_receive(util::Blob_mutable *blob, Error_code *err_code=0)
Implements Persistent_mq_handle API: Non-blocking receive: pops copy of message from queue into buffe...
Error_code pipe_setup()
Ctor helper that sets up m_interrupt* pipe items.
void wait_sendable(Error_code *err_code=0)
Implements Persistent_mq_handle API: Like send() but without the actual pushing of a message.
bool m_interrupting_rcv
Other-direction counterpart to m_interrupting_snd.
Pipe_writer m_interrupter_snd
A byte is written to this end by interrupt_sends() to make it readable for the poll-wait in wait_impl...
Native_handle m_mq
Underlying MQ handle.
flow::util::Task_engine m_nb_task_engine
Never used for .run() or .async() – just so we can construct Pipe_reader, Pipe_writer.
Posix_mq_handle & operator=(Posix_mq_handle &&src)
Implements Persistent_mq_handle API: Replaces handle with the source handle while making the latter i...
String-wrapping abstraction representing a name uniquely distinguishing a kernel-persistent entity fr...
static Shared_name ct(const Source &src)
Copy-constructs from a char-sequence container (including string, util::String_view,...
void clear()
Makes it so empty() == true.
const std::string & str() const
Returns (sans copying) ref to immutable entire wrapped name string, suitable to pass into sys calls w...
@ S_INTERRUPTED
A blocking operation was intentionally interrupted or preemptively canceled.
@ S_MQ_MESSAGE_SIZE_OVER_OR_UNDERFLOW
Low-level message queue send-op buffer overflow (> max size) or receive-op buffer underflow (< max si...
Flow-IPC module providing transmission of structured messages and/or low-level blobs (and more) betwe...
util::Shared_name Shared_name
Convenience alias for the commonly used type util::Shared_name.
std::ostream & operator<<(std::ostream &os, const Bipc_mq_handle &val)
Prints string representation of the given Bipc_mq_handle to the given ostream.
void swap(Posix_mq_handle &val1, Posix_mq_handle &val2)
Implements Persistent_mq_handle related concept: Swaps two objects.
util::Native_handle Native_handle
Convenience alias for the commonly used type util::Native_handle.
void swap(Bipc_mq_handle &val1, Bipc_mq_handle &val2)
Implements Persistent_mq_handle related concept: Swaps two objects.
bipc::permissions Permissions
Short-hand for Unix (POSIX) permissions class.
const uint8_t * blob_data(const Blob_const &blob)
Syntactic-sugary helper that returns pointer to first byte in an immutable buffer,...
bipc::open_only_t Open_only
Tag type indicating an ideally-atomic open-if-exists-else-fail operation.
void pipe_produce(flow::log::Logger *logger_ptr, Pipe_writer *pipe)
Writes a byte to the given pipe writer.
const Open_or_create OPEN_OR_CREATE
Tag value indicating an open-if-exists-else-create operation.
boost::asio::mutable_buffer Blob_mutable
Short-hand for an mutable blob somewhere in memory, stored as exactly a void* and a size_t.
void pipe_consume(flow::log::Logger *logger_ptr, Pipe_reader *pipe)
Reads a byte via the given pipe reader.
boost::asio::writable_pipe Pipe_writer
Short-hand for anonymous pipe write end.
bipc::open_or_create_t Open_or_create
Tag type indicating an atomic open-if-exists-else-create operation.
bipc::create_only_t Create_only
Tag type indicating a create-unless-exists-else-fail operation.
flow::Fine_time_pt Fine_time_pt
Short-hand for Flow's Fine_time_pt.
boost::asio::readable_pipe Pipe_reader
Short-hand for anonymous pipe read end.
void set_resource_permissions(flow::log::Logger *logger_ptr, const fs::path &path, const Permissions &perms, Error_code *err_code)
Utility that sets the permissions of the given resource (at the supplied file system path) to specifi...
flow::Fine_duration Fine_duration
Short-hand for Flow's Fine_duration.
const Create_only CREATE_ONLY
Tag value indicating an atomic create-unless-exists-else-fail operation.
boost::asio::const_buffer Blob_const
Short-hand for an immutable blob somewhere in memory, stored as exactly a void const * and a size_t.
flow::util::String_view String_view
Short-hand for Flow's String_view.
Log_component
The flow::log::Component payload enumeration containing various log components used by Flow-IPC inter...
flow::Error_code Error_code
Short-hand for flow::Error_code which is very common.
A monolayer-thin wrapper around a native handle, a/k/a descriptor a/k/a FD.
bool null() const
Returns true if and only if m_native_handle equals S_NULL_HANDLE.
handle_t m_native_handle
The native handle (possibly equal to S_NULL_HANDLE), the exact payload of this Native_handle.