22#include <flow/error/error.hpp> 
   23#include <boost/chrono/floor.hpp> 
   24#include <boost/array.hpp> 
   38std::string shared_name_to_mq_name(
const Shared_name& name);
 
   49  m_interrupting_snd(false), 
 
   50  m_interrupting_rcv(false), 
 
   51  m_interrupter_snd(m_nb_task_engine),
 
   52  m_interrupt_detector_snd(m_nb_task_engine),
 
   53  m_interrupter_rcv(m_nb_task_engine),
 
   54  m_interrupt_detector_rcv(m_nb_task_engine)
 
   59template<
typename Mode_tag>
 
   61                                 size_t max_n_msg, 
size_t max_msg_sz,
 
   63  flow::log::Log_context(logger_ptr, 
Log_component::S_TRANSPORT),
 
   64  m_absolute_name(absolute_name_arg),
 
   65  m_interrupting_snd(false),
 
   66  m_interrupting_rcv(false),
 
   67  m_interrupter_snd(m_nb_task_engine),
 
   68  m_interrupt_detector_snd(m_nb_task_engine),
 
   69  m_interrupter_rcv(m_nb_task_engine),
 
   70  m_interrupt_detector_rcv(m_nb_task_engine)
 
   73  using flow::error::Runtime_error;
 
   74  using boost::io::ios_all_saver;
 
   75  using boost::system::system_category;
 
   83  assert(max_n_msg >= 1);
 
   84  assert(max_msg_sz >= 1);
 
   86  static_assert(std::is_same_v<Mode_tag, util::Create_only> || std::is_same_v<Mode_tag, util::Open_or_create>,
 
   87                "Can only delegate to this ctor with Mode_tag = Create_only or Open_or_create.");
 
   88  constexpr bool CREATE_ONLY_ELSE_MAYBE = std::is_same_v<Mode_tag, util::Create_only>;
 
   89  constexpr char const * MODE_STR = CREATE_ONLY_ELSE_MAYBE ? 
"create-only" : 
"open-or-create";
 
   92    ios_all_saver saver(*(get_logger()->this_thread_ostream())); 
 
   95      (
"Posix_mq_handle [" << *
this << 
"]: Constructing MQ handle to MQ at name [" << 
absolute_name() << 
"] in " 
   96       "[" << MODE_STR << 
"] mode; max msg size [" << max_msg_sz << 
"] x [" << max_n_msg << 
"] msgs; " 
   97       "perms = [" << std::setfill(
'0') << std::setw(4) << std::oct << perms.get_permissions() << 
"].");
 
  135    const auto mq_name = shared_name_to_mq_name(
absolute_name());
 
  136    const auto do_mq_open_func = [&](
bool create_else_open) -> 
bool  
  141      if (create_else_open)
 
  144        attr.mq_maxmsg = max_n_msg;
 
  145        attr.mq_msgsize = max_msg_sz;
 
  147        raw = mq_open(mq_name.c_str(),
 
  148                      O_RDWR | O_CREAT | O_EXCL, 
 
  149                      perms.get_permissions(),
 
  154        raw = mq_open(mq_name.c_str(), O_RDWR);
 
  163        sys_err_code = 
Error_code(errno, system_category());
 
  170    if (CREATE_ONLY_ELSE_MAYBE)
 
  173      if (do_mq_open_func(
true))
 
  183      bool success = 
false; 
 
  186        if (do_mq_open_func(
true))
 
  190          success = !sys_err_code;
 
  194        if (sys_err_code != boost::system::errc::file_exists)
 
  201        if (do_mq_open_func(
false))
 
  208        if (sys_err_code != boost::system::errc::no_such_file_or_directory)
 
  217          ios_all_saver saver(*(get_logger()->this_thread_ostream())); 
 
  219            (
"Posix_mq_handle [" << *
this << 
"]: Create-or-open algorithm encountered the rare concurrency: " 
  220             "MQ at name [" << 
absolute_name() << 
"] existed during the create-only mq_open() but disappeared " 
  221             "before we were able to complete open-only mq_open().  Retrying in spin-lock fashion.  " 
  222             "Details: max msg size [" << max_msg_sz << 
"] x [" << max_n_msg << 
"] msgs; " 
  223             "perms = [" << std::setfill(
'0') << std::setw(4) << std::oct << perms.get_permissions() << 
"].");
 
  226        sys_err_code.clear(); 
 
  228      while ((!success) && (!sys_err_code));
 
  233        sys_err_code.clear();
 
  243      assert(sys_err_code);
 
  261      ios_all_saver saver(*(get_logger()->this_thread_ostream())); 
 
  264        (
"Posix_mq_handle [" << *
this << 
"]: mq_open() or set_resource_permissions() error (if the latter, details " 
  265         "above and repeated below; otherwise error details only follow) while " 
  266         "constructing MQ handle to MQ at name [" << 
absolute_name() << 
"] in " 
  267         "create-only mode; max msg size [" << max_msg_sz << 
"] x [" << max_n_msg << 
"] msgs; " 
  268         "perms = [" << std::setfill(
'0') << std::setw(4) << std::oct << perms.get_permissions() << 
"].");
 
  270    FLOW_ERROR_SYS_ERROR_LOG_WARNING();
 
  274      *err_code = sys_err_code;
 
  278      throw Runtime_error(sys_err_code, FLOW_UTIL_WHERE_AM_I_STR());
 
  302  flow::log::Log_context(logger_ptr, 
Log_component::S_TRANSPORT),
 
  303  m_absolute_name(absolute_name_arg),
 
  304  m_interrupting_snd(false),
 
  305  m_interrupting_rcv(false),
 
  306  m_interrupter_snd(m_nb_task_engine),
 
  307  m_interrupt_detector_snd(m_nb_task_engine),
 
  308  m_interrupter_rcv(m_nb_task_engine),
 
  309  m_interrupt_detector_rcv(m_nb_task_engine)
 
  311  using flow::log::Sev;
 
  312  using flow::error::Runtime_error;
 
  313  using boost::system::system_category;
 
  318    (
"Posix_mq_handle [" << *
this << 
"]: Constructing MQ handle to MQ at name [" << 
absolute_name() << 
"] in " 
  328    const auto raw = mq_open(shared_name_to_mq_name(
absolute_name()).c_str(), O_RDWR);
 
  337        (
"Posix_mq_handle [" << *
this << 
"]: mq_open() error (error details follow) while " 
  338         "constructing MQ handle to MQ at name [" << 
absolute_name() << 
"] in open-only mode.");
 
  339      sys_err_code = 
Error_code(errno, system_category());
 
  346        assert(sys_err_code);
 
  363    FLOW_ERROR_SYS_ERROR_LOG_WARNING();
 
  367      *err_code = sys_err_code;
 
  371      throw Runtime_error(sys_err_code, FLOW_UTIL_WHERE_AM_I_STR());
 
  379  using boost::asio::connect_pipe;
 
  391      (
"Posix_mq_handle [" << *
this << 
"]: Constructing MQ handle to MQ at name [" << 
absolute_name() << 
"]: " 
  392       "connect-pipe failed.  Details follow.");
 
  393    FLOW_ERROR_SYS_ERROR_LOG_WARNING();
 
  406  using boost::system::system_category;
 
  407  using ::epoll_create1;
 
  411  using Epoll_event = ::epoll_event;
 
  426  const auto setup = [&](
Native_handle* epoll_hndl_ptr, 
bool snd_else_rcv)
 
  428    auto& epoll_hndl = *epoll_hndl_ptr;
 
  432    if (epoll_hndl.m_native_handle == -1)
 
  434      FLOW_LOG_WARNING(
"Posix_mq_handle [" << *
this << 
"]: Created MQ handle fine, but epoll_create1() failed; " 
  436      sys_err_code = 
Error_code(errno, system_category());
 
  456    Epoll_event event_of_interest1;
 
  457    event_of_interest1.events = snd_else_rcv ? EPOLLOUT : EPOLLIN;
 
  459    Epoll_event event_of_interest2;
 
  460    event_of_interest2.events = EPOLLIN;
 
  461    event_of_interest2.data.fd = interrupt_detector.native_handle();
 
  462    if ((epoll_ctl(epoll_hndl.m_native_handle, EPOLL_CTL_ADD, event_of_interest1.data.fd, &event_of_interest1) == -1) ||
 
  463        (epoll_ctl(epoll_hndl.m_native_handle, EPOLL_CTL_ADD, event_of_interest2.data.fd, &event_of_interest2) == -1))
 
  465      FLOW_LOG_WARNING(
"Posix_mq_handle [" << *
this << 
"]: Created MQ handle fine, but an epoll_ctl() failed; " 
  466                       "snd_else_rcv = [" << snd_else_rcv << 
"]; details follow.");
 
  467      sys_err_code = 
Error_code(errno, system_category());
 
  470      close(epoll_hndl.m_native_handle);
 
  508    FLOW_LOG_TRACE(
"Posix_mq_handle [" << *
this << 
"]: No MQ handle to close in destructor.");
 
  514    FLOW_LOG_TRACE(
"Posix_mq_handle [" << *
this << 
"]: Closing MQ handle (and epoll set).");
 
  549  using flow::log::Log_context;
 
  555  swap(
static_cast<Log_context&
>(val1), 
static_cast<Log_context&
>(val2));
 
  567    array<Native_handle, 4> fds1;
 
  568    array<Native_handle, 4> fds2;
 
  612    assert(
false && 
"Horrible exception."); std::abort();
 
  622         && 
"As advertised: max_msg_size() => undefined behavior if not successfully cted or was moved-from.");
 
  633    assert(
false && 
"mq_getattr() failed (details logged); this is too bizarre.");
 
  638  return size_t(attr.mq_msgsize);
 
  648         && 
"As advertised: max_n_msgs() => undefined behavior if not successfully cted or was moved-from.");
 
  654    assert(
false && 
"mq_getattr() failed (details logged); this is too bizarre.");
 
  657  return size_t(attr.mq_maxmsg);
 
  662  using boost::system::system_category;
 
  673  attr.mq_flags = nb ? O_NONBLOCK : 0;
 
  675                              err_code, 
"Posix_mq_handle::set_non_blocking(): mq_setattr()");
 
  680  using flow::util::buffers_dump_string;
 
  684  FLOW_ERROR_EXEC_AND_THROW_ON_ERROR(
bool, 
try_send, blob, _1);
 
  688         && 
"As advertised: try_send() => undefined behavior if not successfully cted or was moved-from.");
 
  691  FLOW_LOG_TRACE(
"Posix_mq_handle [" << *
this << 
"]: Nb-push of blob @[" << 
blob_data << 
"], " 
  692                 "size [" << blob.size() << 
"].");
 
  693  if (blob.size() == 0)
 
  703    FLOW_LOG_DATA(
"Blob contents: [\n" << buffers_dump_string(blob, 
"  ") << 
"].");
 
  708              blob.size(), 0) == 0)
 
  716    FLOW_LOG_TRACE(
"Posix_mq_handle [" << *
this << 
"]: Nb-push of blob @[" << 
blob_data << 
"], " 
  717                  "size [" << blob.size() << 
"]: would-block.");
 
  730  using flow::util::buffers_dump_string;
 
  734  if (flow::error::exec_void_and_throw_on_error
 
  735        ([&](
Error_code* actual_err_code) { 
send(blob, actual_err_code); },
 
  736         err_code, 
"Posix_mq_handle::send()"))
 
  743         && 
"As advertised: send() => undefined behavior if not successfully cted or was moved-from.");
 
  746  FLOW_LOG_TRACE(
"Posix_mq_handle [" << *
this << 
"]: Blocking-push of blob @[" << 
blob_data << 
"], " 
  747                 "size [" << blob.size() << 
"].  Trying nb-push first; if it succeeds -- great.  " 
  748                 "Else will wait/retry/wait/retry/....");
 
  749  if (blob.size() == 0)
 
  756    FLOW_LOG_DATA(
"Blob contents: [\n" << buffers_dump_string(blob, 
"  ") << 
"].");
 
  767                blob.size(), 0) == 0)
 
  781    FLOW_LOG_TRACE(
"Posix_mq_handle [" << *
this << 
"]: Nb-push of blob @[" << 
blob_data << 
"], " 
  782                   "size [" << blob.size() << 
"]: would-block.  Executing blocking-wait.");
 
  792    FLOW_LOG_TRACE(
"Blocking-wait reported transmissibility.  Retrying.");
 
  799  using flow::util::time_since_posix_epoch;
 
  800  using flow::util::buffers_dump_string;
 
  801  using flow::Fine_clock;
 
  802  using boost::chrono::floor;
 
  803  using boost::chrono::round;
 
  804  using boost::chrono::seconds;
 
  805  using boost::chrono::microseconds;
 
  806  using boost::chrono::nanoseconds;
 
  810  FLOW_ERROR_EXEC_AND_THROW_ON_ERROR(
bool, 
timed_send, blob, timeout_from_now, _1);
 
  816         && 
"As advertised: timed_send() => undefined behavior if not successfully cted or was moved-from.");
 
  819  FLOW_LOG_TRACE(
"Posix_mq_handle [" << *
this << 
"]: Blocking-timed-push of blob @[" << 
blob_data << 
"], " 
  820                 "size [" << blob.size() << 
"]; timeout ~[" << round<microseconds>(timeout_from_now) << 
"].  " 
  821                 "Trying nb-push first; if it succeeds -- great.  Else will wait/retry/wait/retry/....");
 
  822  if (blob.size() == 0)
 
  829    FLOW_LOG_DATA(
"Blob contents: [\n" << buffers_dump_string(blob, 
"  ") << 
"].");
 
  832  auto now = Fine_clock::now();
 
  839                blob.size(), 0) == 0)
 
  853    FLOW_LOG_TRACE(
"Posix_mq_handle [" << *
this << 
"]: Nb-push of blob @[" << 
blob_data << 
"], " 
  854                   "size [" << blob.size() << 
"]: would-block.  Executing blocking-wait.");
 
  856    timeout_from_now -= (after - now); 
 
  867      FLOW_LOG_TRACE(
"Did not finish before timeout.");
 
  871    FLOW_LOG_TRACE(
"Blocking-wait reported transmissibility.  Retrying.");
 
  873    after = Fine_clock::now();
 
  874    assert((after >= now) && 
"Fine_clock is supposed to never go backwards.");
 
  883  using flow::util::buffers_dump_string;
 
  887  FLOW_ERROR_EXEC_AND_THROW_ON_ERROR(
bool, 
try_receive, blob, _1);
 
  891         && 
"As advertised: try_receive() => undefined behavior if not successfully cted or was moved-from.");
 
  893  FLOW_LOG_TRACE(
"Posix_mq_handle [" << *
this << 
"]: Nb-pop to blob @[" << blob->data() << 
"], " 
  894                 "max-size [" << blob->size() << 
"].");
 
  897  unsigned int pri_ignored;
 
  899                           static_cast<char*
>(blob->data()),
 
  900                           blob->size(), &pri_ignored)) >= 0)
 
  903    FLOW_LOG_TRACE(
"Received message sized [" << n_rcvd << 
"].");
 
  904    if (blob->size() != 0)
 
  906      FLOW_LOG_DATA(
"Blob contents: [\n" << buffers_dump_string(*blob, 
"  ") << 
"].");
 
  914    FLOW_LOG_TRACE(
"Posix_mq_handle [" << *
this << 
"]: Nb-pop to blob @[" << blob->data() << 
"], " 
  915                   "max-size [" << blob->size() << 
"]: would-block.");
 
  929  using flow::util::buffers_dump_string;
 
  933  if (flow::error::exec_void_and_throw_on_error
 
  935         err_code, 
"Posix_mq_handle::receive()"))
 
  942         && 
"As advertised: receive() => undefined behavior if not successfully cted or was moved-from.");
 
  944  FLOW_LOG_TRACE(
"Posix_mq_handle [" << *
this << 
"]: Blocking-pop to blob @[" << blob->data() << 
"], " 
  945                 "max-size [" << blob->size() << 
"].  Trying nb-pop first; if it succeeds -- great.  " 
  946                 "Else will wait/retry/wait/retry/....");
 
  953  unsigned int pri_ignored;
 
  958                             static_cast<char*
>(blob->data()),
 
  959                             blob->size(), &pri_ignored)) >= 0)
 
  962      FLOW_LOG_TRACE(
"Received message sized [" << n_rcvd << 
"].");
 
  963      if (blob->size() != 0)
 
  965        FLOW_LOG_DATA(
"Blob contents: [\n" << buffers_dump_string(*blob, 
"  ") << 
"].");
 
  979    FLOW_LOG_TRACE(
"Posix_mq_handle [" << *
this << 
"]: Nb-pop to blob @[" << blob->data() << 
"], " 
  980                   "max-size [" << blob->size() << 
"]: would-block.  Executing blocking-wait.");
 
  990    FLOW_LOG_TRACE(
"Blocking-wait reported transmissibility.  Retrying.");
 
  998  using flow::Fine_clock;
 
  999  using flow::util::time_since_posix_epoch;
 
 1000  using flow::util::buffers_dump_string;
 
 1001  using boost::chrono::floor;
 
 1002  using boost::chrono::round;
 
 1003  using boost::chrono::seconds;
 
 1004  using boost::chrono::microseconds;
 
 1005  using boost::chrono::nanoseconds;
 
 1009  FLOW_ERROR_EXEC_AND_THROW_ON_ERROR(
bool, 
timed_receive, blob, timeout_from_now, _1);
 
 1015         && 
"As advertised: timed_receive() => undefined behavior if not successfully cted or was moved-from.");
 
 1017  FLOW_LOG_TRACE(
"Posix_mq_handle [" << *
this << 
"]: Blocking-timed-pop to blob @[" << blob->data() << 
"], " 
 1018                 "max-size [" << blob->size() << 
"]; timeout ~[" << round<microseconds>(timeout_from_now) << 
"].  " 
 1019                 "Trying nb-pop first; if it succeeds -- great.  Else will wait/retry/wait/retry/....");
 
 1022  unsigned int pri_ignored;
 
 1024  auto now = Fine_clock::now();
 
 1030                             static_cast<char*
>(blob->data()),
 
 1031                             blob->size(), &pri_ignored)) >= 0)
 
 1034      FLOW_LOG_TRACE(
"Received message sized [" << n_rcvd << 
"].");
 
 1035      if (blob->size() != 0)
 
 1037        FLOW_LOG_DATA(
"Blob contents: [\n" << buffers_dump_string(*blob, 
"  ") << 
"].");
 
 1043    if (errno != EAGAIN)
 
 1051    FLOW_LOG_TRACE(
"Posix_mq_handle [" << *
this << 
"]: Nb-pop to blob @[" << blob->data() << 
"], " 
 1052                   "max-size [" << blob->size() << 
"]: would-block.  Executing blocking-wait.");
 
 1054    timeout_from_now -= (after - now); 
 
 1065      FLOW_LOG_TRACE(
"Did not finish before timeout.");
 
 1069    FLOW_LOG_TRACE(
"Blocking-wait reported transmissibility.  Retrying.");
 
 1071    after = Fine_clock::now();
 
 1072    assert((after >= now) && 
"Fine_clock is supposed to never go backwards.");
 
 1078template<
bool SND_ELSE_RCV>
 
 1084         && 
"As advertised: interrupt_impl() => undefined behavior if not successfully cted or was moved-from.");
 
 1087  bool* interrupting_ptr;
 
 1088  if constexpr(SND_ELSE_RCV)
 
 1096  auto& interrupter = *interrupter_ptr;
 
 1097  auto& interrupting = *interrupting_ptr;
 
 1101    FLOW_LOG_WARNING(
"Posix_mq_handle [" << *
this << 
"]: Interrupt mode already ON for " 
 1102                     "snd_else_rcv [" << SND_ELSE_RCV << 
"].  Ignoring.");
 
 1107  interrupting = 
true; 
 
 1108  FLOW_LOG_INFO(
"Posix_mq_handle [" << *
this << 
"]: Interrupt mode turning ON for " 
 1109                "snd_else_rcv [" << SND_ELSE_RCV << 
"].");
 
 1117template<
bool SND_ELSE_RCV>
 
 1123         && 
"As advertised: allow_impl() => undefined behavior if not successfully cted or was moved-from.");
 
 1128  bool* interrupting_ptr;
 
 1129  if constexpr(SND_ELSE_RCV)
 
 1137  auto& interrupt_detector = *interrupt_detector_ptr;
 
 1138  auto& interrupting = *interrupting_ptr;
 
 1142    FLOW_LOG_WARNING(
"Posix_mq_handle [" << *
this << 
"]: Interrupt mode already OFF for " 
 1143                     "snd_else_rcv [" << SND_ELSE_RCV << 
"].  Ignoring.");
 
 1148  interrupting = 
false;
 
 1149  FLOW_LOG_INFO(
"Posix_mq_handle [" << *
this << 
"]: Interrupt mode turning OFF for " 
 1150                "snd_else_rcv [" << SND_ELSE_RCV << 
"].");
 
 1160  return interrupt_impl<true>();
 
 1165  return allow_impl<true>();
 
 1170  return interrupt_impl<false>();
 
 1175  return allow_impl<false>();
 
 1182  using flow::util::time_since_posix_epoch;
 
 1183  using boost::chrono::round;
 
 1184  using boost::chrono::milliseconds;
 
 1185  using boost::system::system_category;
 
 1188  using Epoll_event = ::epoll_event;
 
 1190  FLOW_ERROR_EXEC_AND_THROW_ON_ERROR(
bool, 
wait_impl, timeout_from_now_or_none, snd_else_rcv, _1);
 
 1194         && 
"As advertised: wait_impl() => undefined behavior if not successfully cted or was moved-from.");
 
 1198  int epoll_timeout_from_now_ms;
 
 1199  milliseconds epoll_timeout_from_now;
 
 1200  if (timeout_from_now_or_none == Fine_duration::max())
 
 1202    epoll_timeout_from_now_ms = -1;
 
 1203    FLOW_LOG_TRACE(
"Posix_mq_handle [" << *
this << 
"]: Infinite-await-unstarved for " 
 1204                   "snd_else_rcv [" << snd_else_rcv << 
"].  Will perform an epoll_wait().");
 
 1208    epoll_timeout_from_now = round<milliseconds>(timeout_from_now_or_none);
 
 1209    FLOW_LOG_TRACE(
"Posix_mq_handle [" << *
this << 
"]: Blocking-await/poll-unstarved for " 
 1210                   "snd_else_rcv [" << snd_else_rcv << 
"]; timeout ~[" << epoll_timeout_from_now << 
"] -- " 
 1211                   "if 0 then poll.  Will perform an epoll_wait().");
 
 1212    epoll_timeout_from_now_ms = int(epoll_timeout_from_now.count());
 
 1215  array<Epoll_event, 2> evs; 
 
 1216  const auto epoll_result
 
 1219                 evs.begin(), 1, epoll_timeout_from_now_ms);
 
 1220  if (epoll_result == -1)
 
 1222    FLOW_LOG_WARNING(
"Posix_mq_handle [" << *
this << 
"]: epoll_wait() yielded error.  Details follow.");
 
 1224    const auto& sys_err_code = *err_code = 
Error_code(errno, system_category());
 
 1225    FLOW_ERROR_SYS_ERROR_LOG_WARNING();
 
 1230  assert(epoll_result <= 2);
 
 1231  if ((epoll_result == 2) 
 
 1233      ((epoll_result == 1) 
 
 1236    if (timeout_from_now_or_none == Fine_duration::max())
 
 1238      FLOW_LOG_INFO(
"Posix_mq_handle [" << *
this << 
"]: Infinite-await-unstarved for " 
 1239                    "snd_else_rcv [" << snd_else_rcv << 
"]: interrupted.");
 
 1243      FLOW_LOG_INFO(
"Posix_mq_handle [" << *
this << 
"]: Blocking-await/poll-unstarved for " 
 1244                    "snd_else_rcv [" << snd_else_rcv << 
"]; timeout ~[" << epoll_timeout_from_now << 
"] -- " 
 1245                    "if 0 then poll: interrupted.");
 
 1252  const bool success = epoll_result == 1;
 
 1253  if (timeout_from_now_or_none == Fine_duration::max())
 
 1255    FLOW_LOG_TRACE(
"Posix_mq_handle [" << *
this << 
"]: Infinite-await-unstarved for " 
 1256                   "snd_else_rcv [" << snd_else_rcv << 
"]: succeeded? = [" << success << 
"].");
 
 1260    FLOW_LOG_TRACE(
"Posix_mq_handle [" << *
this << 
"]: Blocking-await/poll-unstarved for " 
 1261                   "snd_else_rcv [" << snd_else_rcv << 
"]; timeout ~[" << epoll_timeout_from_now << 
"] -- " 
 1262                   "if 0 then poll: succeeded? = [" << success << 
"].");
 
 1271  return wait_impl(util::Fine_duration::zero(), 
true, err_code);
 
 1276  wait_impl(util::Fine_duration::max(), 
true, err_code);
 
 1281  return wait_impl(timeout_from_now, 
true, err_code);
 
 1286  return wait_impl(util::Fine_duration::zero(), 
false, err_code);
 
 1291  wait_impl(util::Fine_duration::max(), 
false, err_code);
 
 1296  return wait_impl(timeout_from_now, 
false, err_code);
 
 1307  using boost::system::system_category;
 
 1310  if (flow::error::exec_void_and_throw_on_error
 
 1313         err_code, 
"Posix_mq_handle::remove_persistent()"))
 
 1319  FLOW_LOG_SET_CONTEXT(logger_ptr, Log_component::S_TRANSPORT);
 
 1321  FLOW_LOG_INFO(
"Posix_mq @ Shared_name[" << 
absolute_name << 
"]: Removing persistent MQ if possible.");
 
 1322  if (mq_unlink(shared_name_to_mq_name(
absolute_name).c_str()) == 0)
 
 1329  FLOW_LOG_WARNING(
"Posix_mq @ Shared_name[" << 
absolute_name << 
"]: While removing persistent MQ:" 
 1330                   "mq_unlink() yielded error.  Details follow.");
 
 1331  const auto& sys_err_code = *err_code = 
Error_code(errno, system_category());
 
 1332  FLOW_ERROR_SYS_ERROR_LOG_WARNING();
 
 1337  using boost::system::system_category;
 
 1346  FLOW_LOG_WARNING(
"Posix_mq_handle [" << *
this << 
"]: mq_*() yielded error; context = [" << context << 
"].  " 
 1348  const auto& sys_err_code = *err_code
 
 1349    = (errno == EMSGSIZE)
 
 1352  FLOW_ERROR_SYS_ERROR_LOG_WARNING();
 
 1364  os << 
'@' << &val << 
": sh_name[" << val.
absolute_name() << 
"] native_handle[";
 
 1366  if (native_handle.null())
 
 1368    return os << 
"null]";
 
 1371  return os << native_handle << 
']';
 
 1377std::string shared_name_to_mq_name(
const Shared_name& name)
 
 1380  std::string mq_name(
"/");
 
 1381  return mq_name += name.
str();
 
Implements the Persistent_mq_handle concept by wrapping the POSIX message queue API (see man mq_overv...
bool allow_sends()
Implements Persistent_mq_handle API: Turn off preemptive/concurrent interruption of blocking-sends an...
bool interrupt_receives()
Implements Persistent_mq_handle API: Turn on preemptive/concurrent interruption of blocking-receives ...
bool interrupt_sends()
Implements Persistent_mq_handle API: Turn on preemptive/concurrent interruption of blocking-sends and...
util::Pipe_reader Pipe_reader
Short-hand for anonymous pipe read end.
void wait_receivable(Error_code *err_code=0)
Implements Persistent_mq_handle API: Like receive() but without the actual popping of a message.
static const Shared_name S_RESOURCE_TYPE_ID
Implements concept API.
bool handle_mq_api_result(int result, Error_code *err_code, util::String_view context) const
Helper that handles the result of an mq_*() call by logging WARNING(s) on error; setting *err_code on...
Pipe_reader m_interrupt_detector_snd
A byte is read from this end by allow_sends() to make it not-readable for the poll-wait in wait_impl(...
Native_handle m_epoll_hndl_snd
epoll_*() handle (.null() if and only if m_mq is null) that is level-triggered to be active (with onl...
bool is_receivable(Error_code *err_code=0)
Implements Persistent_mq_handle API: Like try_receive() but without the actual popping of a message.
bool allow_receives()
Implements Persistent_mq_handle API: Turn off preemptive/concurrent interruption of blocking-receives...
bool is_sendable(Error_code *err_code=0)
Implements Persistent_mq_handle API: Like try_send() but without the actual pushing of a message.
size_t max_msg_size() const
Implements Persistent_mq_handle API: Returns the max message size of the underlying queue.
Pipe_reader m_interrupt_detector_rcv
Other-direction counterpart to m_interrupt_detector_snd.
Error_code epoll_setup()
Ctor helper that sets up m_epoll_hndl_snd and m_epoll_hndl_rcv.
util::Pipe_writer Pipe_writer
Short-hand for anonymous pipe write end.
size_t max_n_msgs() const
Implements Persistent_mq_handle API: Returns the max message count of the underlying queue.
bool try_send(const util::Blob_const &blob, Error_code *err_code=0)
Implements Persistent_mq_handle API: Non-blocking send: pushes copy of message to queue and returns t...
Pipe_writer m_interrupter_rcv
Other-direction counterpart to m_interrupter_snd.
Native_handle native_handle() const
Implements Persistent_mq_handle API: Returns the stored native MQ handle; null if not open.
bool timed_wait_receivable(util::Fine_duration timeout_from_now, Error_code *err_code=0)
Implements Persistent_mq_handle API: Like timed_receive() but without the actual popping of a message...
bool wait_impl(util::Fine_duration timeout_from_now_or_none, bool snd_else_rcv, Error_code *err_code)
Impl body for *_sendable() and *_receivable().
friend void swap(Posix_mq_handle &val1, Posix_mq_handle &val2)
Implements Persistent_mq_handle related concept: Swaps two objects.
static void remove_persistent(flow::log::Logger *logger_ptr, const Shared_name &name, Error_code *err_code=0)
Implements Persistent_mq_handle API: Removes the named persistent MQ.
void receive(util::Blob_mutable *blob, Error_code *err_code=0)
Implements Persistent_mq_handle API: Blocking receive: pops copy of message from queue into buffer; i...
void send(const util::Blob_const &blob, Error_code *err_code=0)
Implements Persistent_mq_handle API: Blocking send: pushes copy of message to queue; if queue is full...
Posix_mq_handle()
Implements Persistent_mq_handle API: Construct null handle.
bool m_interrupting_snd
Starting at false, this is made true via interrupt_sends(), and back by allow_sends(); acts as a guar...
const Shared_name & absolute_name() const
Implements Persistent_mq_handle API: Returns name equal to absolute_name passed to ctor.
Native_handle m_epoll_hndl_rcv
epoll_*() handle (.null() if and only if m_mq is null) that is level-triggered to be active (with onl...
bool allow_impl()
Impl body for allow_*().
bool interrupt_impl()
Impl body for interrupt_*().
bool timed_wait_sendable(util::Fine_duration timeout_from_now, Error_code *err_code=0)
Implements Persistent_mq_handle API: Like timed_send() but without the actual pushing of a message.
bool timed_receive(util::Blob_mutable *blob, util::Fine_duration timeout_from_now, Error_code *err_code=0)
Implements Persistent_mq_handle API: Blocking timed receive: pops copy of message from queue into buf...
~Posix_mq_handle()
Implements Persistent_mq_handle API: Destroys this handle (or no-op if no handle was successfully con...
bool set_non_blocking(bool nb, Error_code *err_code)
Sets m_mq to blocking or non-blocking and returns true on success and clears *err_code; otherwise ret...
Shared_name m_absolute_name
See absolute_name().
bool timed_send(const util::Blob_const &blob, util::Fine_duration timeout_from_now, Error_code *err_code=0)
Implements Persistent_mq_handle API: Blocking timed send: pushes copy of message to queue; if queue i...
bool try_receive(util::Blob_mutable *blob, Error_code *err_code=0)
Implements Persistent_mq_handle API: Non-blocking receive: pops copy of message from queue into buffe...
Error_code pipe_setup()
Ctor helper that sets up m_interrupt* pipe items.
void wait_sendable(Error_code *err_code=0)
Implements Persistent_mq_handle API: Like send() but without the actual pushing of a message.
bool m_interrupting_rcv
Other-direction counterpart to m_interrupting_snd.
Pipe_writer m_interrupter_snd
A byte is written to this end by interrupt_sends() to make it readable for the poll-wait in wait_impl...
Native_handle m_mq
Underlying MQ handle.
flow::util::Task_engine m_nb_task_engine
Never used for .run() or .async() – just so we can construct Pipe_reader, Pipe_writer.
Posix_mq_handle & operator=(Posix_mq_handle &&src)
Implements Persistent_mq_handle API: Replaces handle with the source handle while making the latter i...
String-wrapping abstraction representing a name uniquely distinguishing a kernel-persistent entity fr...
static Shared_name ct(const Source &src)
Copy-constructs from a char-sequence container (including string, util::String_view,...
void clear()
Makes it so empty() == true.
const std::string & str() const
Returns (sans copying) ref to immutable entire wrapped name string, suitable to pass into sys calls w...
@ S_INTERRUPTED
A blocking operation was intentionally interrupted or preemptively canceled.
@ S_MQ_MESSAGE_SIZE_OVER_OR_UNDERFLOW
Low-level message queue send-op buffer overflow (> max size) or receive-op buffer underflow (< max si...
Flow-IPC module providing transmission of structured messages and/or low-level blobs (and more) betwe...
util::Shared_name Shared_name
Convenience alias for the commonly used type util::Shared_name.
std::ostream & operator<<(std::ostream &os, const Bipc_mq_handle &val)
Prints string representation of the given Bipc_mq_handle to the given ostream.
void swap(Posix_mq_handle &val1, Posix_mq_handle &val2)
Implements Persistent_mq_handle related concept: Swaps two objects.
util::Native_handle Native_handle
Convenience alias for the commonly used type util::Native_handle.
void swap(Bipc_mq_handle &val1, Bipc_mq_handle &val2)
Implements Persistent_mq_handle related concept: Swaps two objects.
bipc::permissions Permissions
Short-hand for Unix (POSIX) permissions class.
const uint8_t * blob_data(const Blob_const &blob)
Syntactic-sugary helper that returns pointer to first byte in an immutable buffer,...
bipc::open_only_t Open_only
Tag type indicating an ideally-atomic open-if-exists-else-fail operation.
void pipe_produce(flow::log::Logger *logger_ptr, Pipe_writer *pipe)
Writes a byte to the given pipe writer.
const Open_or_create OPEN_OR_CREATE
Tag value indicating an open-if-exists-else-create operation.
boost::asio::mutable_buffer Blob_mutable
Short-hand for an mutable blob somewhere in memory, stored as exactly a void* and a size_t.
void pipe_consume(flow::log::Logger *logger_ptr, Pipe_reader *pipe)
Reads a byte via the given pipe reader.
boost::asio::writable_pipe Pipe_writer
Short-hand for anonymous pipe write end.
bipc::open_or_create_t Open_or_create
Tag type indicating an atomic open-if-exists-else-create operation.
bipc::create_only_t Create_only
Tag type indicating a create-unless-exists-else-fail operation.
flow::Fine_time_pt Fine_time_pt
Short-hand for Flow's Fine_time_pt.
boost::asio::readable_pipe Pipe_reader
Short-hand for anonymous pipe read end.
void set_resource_permissions(flow::log::Logger *logger_ptr, const fs::path &path, const Permissions &perms, Error_code *err_code)
Utility that sets the permissions of the given resource (at the supplied file system path) to specifi...
flow::Fine_duration Fine_duration
Short-hand for Flow's Fine_duration.
const Create_only CREATE_ONLY
Tag value indicating an atomic create-unless-exists-else-fail operation.
boost::asio::const_buffer Blob_const
Short-hand for an immutable blob somewhere in memory, stored as exactly a void const * and a size_t.
flow::util::String_view String_view
Short-hand for Flow's String_view.
Log_component
The flow::log::Component payload enumeration containing various log components used by Flow-IPC inter...
flow::Error_code Error_code
Short-hand for flow::Error_code which is very common.
A monolayer-thin wrapper around a native handle, a/k/a descriptor a/k/a FD.
bool null() const
Returns true if and only if m_native_handle equals S_NULL_HANDLE.
handle_t m_native_handle
The native handle (possibly equal to S_NULL_HANDLE), the exact payload of this Native_handle.