22#include <flow/error/error.hpp> 
   23#include <flow/common.hpp> 
   24#include <boost/interprocess/ipc/message_queue.hpp> 
   25#include <boost/move/make_unique.hpp> 
   38template<
typename Mode_tag>
 
   40                               size_t max_n_msg, 
size_t max_msg_sz,
 
   43  flow::log::Log_context(logger_ptr, 
Log_component::S_TRANSPORT),
 
   44  m_absolute_name(absolute_name_arg),
 
   45  m_interrupting_snd(false),
 
   46  m_interrupting_rcv(false)
 
   49  using boost::io::ios_all_saver;
 
   50  using boost::movelib::make_unique;
 
   51  using bipc::message_queue;
 
   53  assert(max_n_msg >= 1);
 
   54  assert(max_msg_sz >= 1);
 
   56  static_assert(std::is_same_v<Mode_tag, util::Create_only> || std::is_same_v<Mode_tag, util::Open_or_create>,
 
   57                "Can only delegate to this ctor with Mode_tag = Create_only or Open_or_create.");
 
   58  constexpr char const * MODE_STR = std::is_same_v<Mode_tag, util::Create_only>
 
   59                                      ? 
"create-only" : 
"open-or-create";
 
   61  if (get_logger()->should_log(Sev::S_TRACE, get_log_component()))
 
   63    ios_all_saver saver(*(get_logger()->this_thread_ostream())); 
 
   64    FLOW_LOG_TRACE_WITHOUT_CHECKING
 
   65      (
"Bipc_mq_handle [" << *
this << 
"]: Constructing MQ handle to MQ at name [" << 
absolute_name() << 
"] in " 
   66       "[" << MODE_STR << 
"] mode; max msg size [" << max_msg_sz << 
"] x [" << max_n_msg << 
"] msgs; " 
   67       "perms = [" << std::setfill(
'0') << std::setw(4) << std::oct << perms.get_permissions() << 
"].");
 
   75    m_mq = make_unique<message_queue>(mode_tag, 
absolute_name().native_str(), max_n_msg, max_msg_sz, perms);
 
   99  flow::log::Log_context(logger_ptr, 
Log_component::S_TRANSPORT),
 
  100  m_absolute_name(absolute_name_arg),
 
  101  m_interrupting_snd(false),
 
  102  m_interrupting_rcv(false)
 
  104  using boost::movelib::make_unique;
 
  105  using bipc::message_queue;
 
  107  FLOW_LOG_TRACE(
"Bipc_mq_handle [" << *
this << 
"]: Constructing MQ handle to MQ at name " 
  124  FLOW_LOG_TRACE(
"Bipc_mq_handle [" << *
this << 
"]: Closing MQ handle (already null? = [" << (!
m_mq) << 
"]).");
 
  132  using flow::log::Log_context;
 
  137  swap(
static_cast<Log_context&
>(val1), 
static_cast<Log_context&
>(val2));
 
  144  assert(
m_mq && 
"As advertised: max_msg_size() => undefined behavior if not successfully cted or was moved-from.");
 
  145  return m_mq->get_max_msg_size();
 
  150  assert(
m_mq && 
"As advertised: max_n_msgs() => undefined behavior if not successfully cted or was moved-from.");
 
  151  return m_mq->get_max_msg();
 
  156  using flow::util::buffers_dump_string;
 
  158  FLOW_ERROR_EXEC_AND_THROW_ON_ERROR(
bool, 
try_send, blob, _1);
 
  161  assert(
m_mq && 
"As advertised: try_send() => undefined behavior if not successfully cted or was moved-from.");
 
  168    FLOW_LOG_TRACE(
"Bipc_mq_handle [" << *
this << 
"]: Nb-push of blob @[" << 
blob_data << 
"], " 
  169                   "size [" << blob.size() << 
"].");
 
  170    if (blob.size() == 0)
 
  180      FLOW_LOG_DATA(
"Blob contents: [\n" << buffers_dump_string(blob, 
"  ") << 
"].");
 
  187      FLOW_LOG_TRACE(
"Bipc_mq_handle [" << *
this << 
"]: Nb-push of blob @[" << 
blob_data << 
"], " 
  188                     "size [" << blob.size() << 
"]: would-block.");
 
  202  using flow::util::buffers_dump_string;
 
  204  if (flow::error::exec_void_and_throw_on_error
 
  205        ([&](
Error_code* actual_err_code) { 
send(blob, actual_err_code); },
 
  206         err_code, 
"Bipc_mq_handle::send()"))
 
  213  assert(
m_mq && 
"As advertised: send() => undefined behavior if not successfully cted or was moved-from.");
 
  216  FLOW_LOG_TRACE(
"Bipc_mq_handle [" << *
this << 
"]: Blocking-push of blob @[" << 
blob_data << 
"], " 
  217                 "size [" << blob.size() << 
"].  Trying nb-push first; if it succeeds -- great.  " 
  218                 "Else will wait/retry/wait/retry/....");
 
  219  if (blob.size() == 0)
 
  226    FLOW_LOG_DATA(
"Blob contents: [\n" << buffers_dump_string(blob, 
"  ") << 
"].");
 
  248    FLOW_LOG_TRACE(
"Bipc_mq_handle [" << *
this << 
"]: Nb-push of blob @[" << 
blob_data << 
"], " 
  249                   "size [" << blob.size() << 
"]: would-block.  Executing blocking-wait.");
 
  258    FLOW_LOG_TRACE(
"Blocking-wait reported transmissibility.  Retrying.");
 
  265  using flow::util::time_since_posix_epoch;
 
  266  using flow::util::buffers_dump_string;
 
  267  using flow::Fine_clock;
 
  268  using boost::chrono::round;
 
  269  using boost::chrono::microseconds;
 
  271  FLOW_ERROR_EXEC_AND_THROW_ON_ERROR(
bool, 
timed_send, blob, timeout_from_now, _1);
 
  276  assert(
m_mq && 
"As advertised: timed_send() => undefined behavior if not successfully cted or was moved-from.");
 
  279  FLOW_LOG_TRACE(
"Bipc_mq_handle [" << *
this << 
"]: Blocking-timed-push of blob @[" << 
blob_data << 
"], " 
  280                 "size [" << blob.size() << 
"]; timeout ~[" << round<microseconds>(timeout_from_now) << 
"].  " 
  281                 "Trying nb-push first; if it succeeds -- great.  Else will wait/retry/wait/retry/....");
 
  282  if (blob.size() == 0)
 
  289    FLOW_LOG_DATA(
"Blob contents: [\n" << buffers_dump_string(blob, 
"  ") << 
"].");
 
  292  auto now = Fine_clock::now();
 
  315    FLOW_LOG_TRACE(
"Bipc_mq_handle [" << *
this << 
"]: Nb-push of blob @[" << 
blob_data << 
"], " 
  316                   "size [" << blob.size() << 
"]: would-block.  Executing blocking-wait.");
 
  318    timeout_from_now -= (after - now); 
 
  329      FLOW_LOG_TRACE(
"Did not finish before timeout.");
 
  333    FLOW_LOG_TRACE(
"Blocking-wait reported transmissibility.  Retrying.");
 
  335    after = Fine_clock::now();
 
  336    assert((after >= now) && 
"Fine_clock is supposed to never go backwards.");
 
  344  using flow::util::buffers_dump_string;
 
  347  FLOW_ERROR_EXEC_AND_THROW_ON_ERROR(
bool, 
try_receive, blob, _1);
 
  350  assert(
m_mq && 
"As advertised: try_receive() => undefined behavior if not successfully cted or was moved-from.");
 
  356    FLOW_LOG_TRACE(
"Bipc_mq_handle [" << *
this << 
"]: Nb-pop to blob @[" << blob->data() << 
"], " 
  357                   "max-size [" << blob->size() << 
"].");
 
  362    unsigned int pri_ignored;
 
  364      = 
m_mq->try_receive(blob->data(), blob->size(), n_rcvd, pri_ignored);
 
  368      FLOW_LOG_TRACE(
"Received message sized [" << n_rcvd << 
"].");
 
  369      if (blob->size() != 0)
 
  371        FLOW_LOG_DATA(
"Blob contents: [\n" << buffers_dump_string(*blob, 
"  ") << 
"].");
 
  376      FLOW_LOG_TRACE(
"Bipc_mq_handle [" << *
this << 
"]: Nb-pop to blob @[" << blob->data() << 
"], " 
  377                    "max-size [" << blob->size() << 
"]: would-block.");
 
  391  using flow::util::buffers_dump_string;
 
  394  if (flow::error::exec_void_and_throw_on_error
 
  396         err_code, 
"Bipc_mq_handle::receive()"))
 
  403  assert(
m_mq && 
"As advertised: receive() => undefined behavior if not successfully cted or was moved-from.");
 
  405  FLOW_LOG_TRACE(
"Bipc_mq_handle [" << *
this << 
"]: Blocking-pop to blob @[" << blob->data() << 
"], " 
  406                 "max-size [" << blob->size() << 
"].  Trying nb-pop first; if it succeeds -- great.  " 
  407                 "Else will wait/retry/wait/retry/....");
 
  414  unsigned int pri_ignored;
 
  422      ok = 
m_mq->try_receive(blob->data(), blob->size(), 
 
  423                             n_rcvd, pri_ignored);
 
  434      FLOW_LOG_TRACE(
"Received message sized [" << n_rcvd << 
"].");
 
  435      if (blob->size() != 0)
 
  437        FLOW_LOG_DATA(
"Blob contents: [\n" << buffers_dump_string(*blob, 
"  ") << 
"].");
 
  443    FLOW_LOG_TRACE(
"Bipc_mq_handle [" << *
this << 
"]: Nb-pop to blob @[" << blob->data() << 
"], " 
  444                  "max-size [" << blob->size() << 
"]: would-block.  Executing blocking-pop.");
 
  453    FLOW_LOG_TRACE(
"Blocking-wait reported transmissibility.  Retrying.");
 
  460  using flow::util::time_since_posix_epoch;
 
  461  using flow::util::buffers_dump_string;
 
  462  using flow::Fine_clock;
 
  463  using boost::chrono::round;
 
  464  using boost::chrono::microseconds;
 
  466  FLOW_ERROR_EXEC_AND_THROW_ON_ERROR(
bool, 
timed_receive, blob, timeout_from_now, _1);
 
  471  assert(
m_mq && 
"As advertised: timed_receive() => undefined behavior if not successfully cted or was moved-from.");
 
  473  FLOW_LOG_TRACE(
"Bipc_mq_handle [" << *
this << 
"]: Blocking-timed-pop to blob @[" << blob->data() << 
"], " 
  474                 "max-size [" << blob->size() << 
"]; timeout ~[" << round<microseconds>(timeout_from_now) << 
"].  " 
  475                 "Trying nb-pop first; if it succeeds -- great.  Else will wait/retry/wait/retry/....");
 
  478  unsigned int pri_ignored;
 
  480  auto now = Fine_clock::now();
 
  489      ok = 
m_mq->try_receive(blob->data(), blob->size(), 
 
  490                             n_rcvd, pri_ignored);
 
  501      FLOW_LOG_TRACE(
"Received message sized [" << n_rcvd << 
"].");
 
  502      if (blob->size() != 0)
 
  504        FLOW_LOG_DATA(
"Blob contents: [\n" << buffers_dump_string(*blob, 
"  ") << 
"].");
 
  510    FLOW_LOG_TRACE(
"Bipc_mq_handle [" << *
this << 
"]: Nb-pop to blob @[" << blob->data() << 
"], " 
  511                  "max-size [" << blob->size() << 
"]: would-block.  Executing blocking-wait.");
 
  513    timeout_from_now -= (after - now); 
 
  524      FLOW_LOG_TRACE(
"Did not finish before timeout.");
 
  528    FLOW_LOG_TRACE(
"Blocking-wait reported transmissibility.  Retrying.");
 
  530    after = Fine_clock::now();
 
  531    assert((after >= now) && 
"Fine_clock is supposed to never go backwards.");
 
  537template<
bool SND_ELSE_RCV, 
bool ON_ELSE_OFF>
 
  540  using Classic_shm_area = bipc::ipcdetail::managed_open_or_create_impl<bipc::shared_memory_object, 0, true, false>;
 
  541  using Bipc_mq = bipc::message_queue;
 
  542  using Bipc_mq_hdr = bipc::ipcdetail::mq_hdr_t<Bipc_mq::void_pointer>;
 
  543  using Bipc_mq_mtx = bipc::interprocess_mutex;
 
  544  using Bipc_mq_lock = bipc::scoped_lock<Bipc_mq_mtx>;
 
  547         && 
"As advertised: interrupt_allow_impl() => undefined behavior if not successfully cted or was moved-from.");
 
  552  bool* interrupting_ptr;
 
  553  auto& shm_area = 
reinterpret_cast<Classic_shm_area&
>(*m_mq);
 
  554  auto* 
const mq_hdr = 
static_cast<Bipc_mq_hdr*
>(shm_area.get_user_address());
 
  555  decltype(mq_hdr->m_cond_recv)* cond_ptr;
 
  556  if constexpr(SND_ELSE_RCV)
 
  559    cond_ptr = &mq_hdr->m_cond_send;
 
  564    cond_ptr = &mq_hdr->m_cond_recv;
 
  566  auto& interrupting = *interrupting_ptr;
 
  567  auto& cond = *cond_ptr;
 
  570    Bipc_mq_lock lock(mq_hdr->m_mutex);
 
  572    if (interrupting == ON_ELSE_OFF)
 
  574      FLOW_LOG_WARNING(
"Bipc_mq_handle [" << *
this << 
"]: Interrupt mode already set for " 
  575                       "snd_else_rcv [" << SND_ELSE_RCV << 
"], on_else_off [" << ON_ELSE_OFF << 
"].  Ignoring.");
 
  580    interrupting = ON_ELSE_OFF;
 
  581    FLOW_LOG_INFO(
"Bipc_mq_handle [" << *
this << 
"]: Interrupt mode set for " 
  582                  "snd_else_rcv [" << SND_ELSE_RCV << 
"], on_else_off [" << ON_ELSE_OFF << 
"].  If on -- we " 
  583                  "shall now ping the associated condition variable to wake up any ongoing waits.");
 
  585    if constexpr(ON_ELSE_OFF)
 
  608  return interrupt_allow_impl<true, true>();
 
  613  return interrupt_allow_impl<true, false>();
 
  618  return interrupt_allow_impl<false, true>();
 
  623  return interrupt_allow_impl<false, false>();
 
  626template<Bipc_mq_handle::Wait_type WAIT_TYPE, 
bool SND_ELSE_RCV>
 
  630  using flow::util::time_since_posix_epoch;
 
  631  using boost::chrono::round;
 
  632  using boost::chrono::microseconds;
 
  633  using Classic_shm_area = bipc::ipcdetail::managed_open_or_create_impl<bipc::shared_memory_object, 0, true, false>;
 
  634  using Bipc_mq = bipc::message_queue;
 
  635  using Bipc_mq_hdr = bipc::ipcdetail::mq_hdr_t<Bipc_mq::void_pointer>;
 
  636  using Bipc_mq_mtx = bipc::interprocess_mutex;
 
  637  using Bipc_mq_lock = bipc::scoped_lock<Bipc_mq_mtx>;
 
  639  FLOW_ERROR_EXEC_AND_THROW_ON_ERROR(
bool, (wait_impl<WAIT_TYPE, SND_ELSE_RCV>), timeout_from_now, _1);
 
  643         && 
"As advertised: wait_impl() => undefined behavior if not successfully cted or was moved-from.");
 
  648    timeout_since_epoch = 
Fine_time_pt(time_since_posix_epoch() + timeout_from_now);
 
  681    FLOW_LOG_TRACE(
"Bipc_mq_handle [" << *
this << 
"]: Poll-unstarved for snd_else_rcv [" << SND_ELSE_RCV << 
"].");
 
  685    FLOW_LOG_TRACE(
"Bipc_mq_handle [" << *
this << 
"]: Blocking-timed-await-unstarved for " 
  686                   "snd_else_rcv [" << SND_ELSE_RCV << 
"]; " 
  687                   "timeout ~[" << round<microseconds>(timeout_from_now) << 
"].");
 
  692    FLOW_LOG_TRACE(
"Bipc_mq_handle [" << *
this << 
"]: Blocking-await-unstarved for snd_else_rcv " 
  693                   "[" << SND_ELSE_RCV << 
"].");
 
  696  auto& shm_area = 
reinterpret_cast<Classic_shm_area&
>(*m_mq);
 
  697  auto* 
const mq_hdr = 
static_cast<Bipc_mq_hdr*
>(shm_area.get_user_address());
 
  698  size_t* blocked_dudes_ptr;
 
  699  bool* interrupting_ptr;
 
  700  decltype(mq_hdr->m_cond_recv)* cond_ptr;
 
  701  if constexpr(SND_ELSE_RCV)
 
  703    blocked_dudes_ptr = &mq_hdr->m_blocked_senders;
 
  704    cond_ptr = &mq_hdr->m_cond_send;
 
  709    blocked_dudes_ptr = &mq_hdr->m_blocked_receivers;
 
  710    cond_ptr = &mq_hdr->m_cond_recv;
 
  713  auto& blocked_dudes = *blocked_dudes_ptr;
 
  714  auto& cond = *cond_ptr;
 
  715  bool& interrupting = *interrupting_ptr;
 
  717  const auto is_starved_func = [&]() -> 
bool 
  719    if constexpr(SND_ELSE_RCV)
 
  721      return mq_hdr->m_cur_num_msg == mq_hdr->m_max_num_msg;
 
  725      return mq_hdr->m_cur_num_msg == 0;
 
  729  bool interrupted = 
false;
 
  734                                     "Bipc_mq_handle::wait_impl(): " 
  735                                       "bipc::interprocess_condition::[timed_]wait()",
 
  738#ifndef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX 
  740                  "bipc comments show this shall be true as of many Boost versions ago, unless unset which " 
  741                    "would decrease performance; and we do not do that.  Our code for simplicity assumes " 
  742                    "it and does not support the lower-perf bipc MQ algorithm.");
 
  746      Bipc_mq_lock lock(mq_hdr->m_mutex);
 
  751        FLOW_LOG_TRACE(
"Interrupted before wait/poll started (preemptively).");
 
  757      if (is_starved_func())
 
  763          FLOW_LOG_TRACE(
"Not immediatelly unstarved.  Poll = done.");
 
  769          FLOW_LOG_TRACE(
"Not immediatelly unstarved.  Awaiting unstarvedness or timeout.");
 
  783                  FLOW_LOG_TRACE(
"Interruption detected upon waking up from wait (interrupted concurrently).");
 
  794                const bool wait_result = cond.timed_wait(lock, timeout_since_epoch); 
 
  799                  FLOW_LOG_TRACE(
"Interruption detected upon waking up from wait (interrupted concurrently).");
 
  809                  if (is_starved_func())
 
  822            while (is_starved_func());
 
  837        FLOW_LOG_TRACE(
"Immediately unstarved.");
 
  843  if ((!*err_code) && interrupted)
 
  845    FLOW_LOG_INFO(
"Bipc_mq_handle [" << *
this << 
"]: Poll/wait/timed-wait-unstarved for " 
  846                  "snd_else_rcv [" << SND_ELSE_RCV << 
"]: interrupted (TRACE message -- if visible -- " 
  847                  "indicates whether preemptively or concurrently).");
 
  859    FLOW_LOG_TRACE(
"Bipc_mq_handle [" << *
this << 
"]: Poll-unstarved for snd_else_rcv [" << SND_ELSE_RCV << 
"]: " 
  860                   "succeeded? = [" << not_starved << 
"].");
 
  864    FLOW_LOG_TRACE(
"Bipc_mq_handle [" << *
this << 
"]: Blocking-timed-await-unstarved for " 
  865                   "snd_else_rcv [" << SND_ELSE_RCV << 
"]; timeout ~[" << round<microseconds>(timeout_from_now) << 
"]: " 
  866                   "succeeded? = [" << not_starved << 
"].  " 
  867                   "Either was immediately unstarved, or was not but waited until success or timeout+failure.  " 
  868                   "If success: TRACE message (if visible) above indicates which occurred.");
 
  873    FLOW_LOG_TRACE(
"Bipc_mq_handle [" << *
this << 
"]: Blocking-await-unstarved for " 
  874                   "snd_else_rcv [" << SND_ELSE_RCV << 
"]: succeeded eventually.  " 
  875                   "Either was immediately unstarved, or was not but waited it out.  " 
  876                   "TRACE message (if visible) above indicates which occurred.");
 
  894  return wait_impl<Wait_type::S_TIMED_WAIT, true>(timeout_from_now, err_code);
 
  909  return wait_impl<Wait_type::S_TIMED_WAIT, false>(timeout_from_now, err_code);
 
  915  using bipc::message_queue;
 
  916  using boost::system::system_category;
 
  918  if (flow::error::exec_void_and_throw_on_error
 
  921         err_code, 
"Bipc_mq_handle::remove_persistent()"))
 
  927  FLOW_LOG_SET_CONTEXT(logger_ptr, Log_component::S_TRANSPORT);
 
  929  FLOW_LOG_INFO(
"Bipc_mq @ Shared_name[" << 
absolute_name << 
"]: Removing persistent MQ if possible.");
 
  944                "Code in Bipc_mq_handle::remove_persistent() relies on Boost invoking Linux unlink() with errno.");
 
  946  const auto& sys_err_code = *err_code = 
Error_code(errno, system_category());
 
  947  FLOW_ERROR_SYS_ERROR_LOG_WARNING();
 
  950template<
typename Func>
 
  954  using flow::error::Runtime_error;
 
  955  using bipc::interprocess_exception;
 
  956  using boost::system::system_category;
 
  958  if (flow::error::exec_void_and_throw_on_error
 
  970  catch (
const interprocess_exception& exc)
 
  981    const auto native_code_raw = exc.get_native_error();
 
  982    const auto bipc_err_code_enum = exc.get_error_code();
 
  983    const bool is_size_error = bipc_err_code_enum == bipc::size_error;
 
  984    FLOW_LOG_WARNING(
"bipc threw interprocess_exception; will emit some hopefully suitable Flow-IPC Error_code; " 
  985                     "but here are all the details of the original exception: native code int " 
  986                     "[" << native_code_raw << 
"]; bipc error_code_t enum->int " 
  987                     "[" << 
int(bipc_err_code_enum) << 
"]; latter==size_error? = [" << is_size_error << 
"]; " 
  988                     "message = [" << exc.what() << 
"]; context = [" << context << 
"].");
 
  999    if (native_code_raw != 0)
 
 1007      const auto& sys_err_code = *err_code = 
Error_code(native_code_raw, system_category());
 
 1008      FLOW_ERROR_SYS_ERROR_LOG_WARNING();
 
 1030  return os << 
'@' << &val << 
": sh_name[" << val.
absolute_name() << 
']';
 
Implements the Persistent_mq_handle concept by thinly wrapping bipc::message_queue,...
Bipc_mq_handle()
Implements Persistent_mq_handle API: Construct null handle.
bool allow_receives()
Implements Persistent_mq_handle API: Turn off preemptive/concurrent interruption of blocking-receives...
boost::movelib::unique_ptr< bipc::message_queue > m_mq
Underlying MQ handle.
bool timed_wait_sendable(util::Fine_duration timeout_from_now, Error_code *err_code=0)
Implements Persistent_mq_handle API: Like timed_send() but without the actual pushing of a message.
Bipc_mq_handle & operator=(Bipc_mq_handle &&src)
Implements Persistent_mq_handle API: Replaces handle with the source handle while making the latter i...
bool m_interrupting_rcv
Other-direction counterpart to m_interrupting_snd.
bool interrupt_sends()
Implements Persistent_mq_handle API: Turn on preemptive/concurrent interruption of blocking-sends and...
void send(const util::Blob_const &blob, Error_code *err_code=0)
Implements Persistent_mq_handle API: Blocking send: pushes copy of message to queue; if queue is full...
bool try_send(const util::Blob_const &blob, Error_code *err_code=0)
Implements Persistent_mq_handle API: Non-blocking send: pushes copy of message to queue and returns t...
bool timed_wait_receivable(util::Fine_duration timeout_from_now, Error_code *err_code=0)
Implements Persistent_mq_handle API: Like timed_receive() but without the actual popping of a message...
bool timed_receive(util::Blob_mutable *blob, util::Fine_duration timeout_from_now, Error_code *err_code=0)
Implements Persistent_mq_handle API: Blocking timed receive: pops copy of message from queue into buf...
const Shared_name & absolute_name() const
Implements Persistent_mq_handle API: Returns name equal to absolute_name passed to ctor.
static const Shared_name S_RESOURCE_TYPE_ID
Implements concept API.
bool allow_sends()
Implements Persistent_mq_handle API: Turn off preemptive/concurrent interruption of blocking-sends an...
bool try_receive(util::Blob_mutable *blob, Error_code *err_code=0)
Implements Persistent_mq_handle API: Non-blocking receive: pops copy of message from queue into buffe...
@ S_TIMED_WAIT
Timed-wait-type (blocking until timeout).
@ S_WAIT
Wait-type (blocking indefinitely).
@ S_POLL
Poll-type (non-blocking).
void wait_receivable(Error_code *err_code=0)
Implements Persistent_mq_handle API: Like receive() but without the actual popping of a message.
bool interrupt_receives()
Implements Persistent_mq_handle API: Turn on preemptive/concurrent interruption of blocking-receives ...
bool m_interrupting_snd
Starting at false, this is made true via interrupt_sends(), and back by allow_sends(); when true wait...
bool timed_send(const util::Blob_const &blob, util::Fine_duration timeout_from_now, Error_code *err_code=0)
Implements Persistent_mq_handle API: Blocking timed send: pushes copy of message to queue; if queue i...
static void remove_persistent(flow::log::Logger *logger_ptr, const Shared_name &absolute_name, Error_code *err_code=0)
Implements Persistent_mq_handle API: Removes the named persistent MQ.
void op_with_possible_bipc_mq_exception(Error_code *err_code, util::String_view context, const Func &func)
Error helper: Run func() which will perform a bipc::message_queue API that might throw; if it does em...
void receive(util::Blob_mutable *blob, Error_code *err_code=0)
Implements Persistent_mq_handle API: Blocking receive: pops copy of message from queue into buffer; i...
size_t max_n_msgs() const
Implements Persistent_mq_handle API: Returns the max message count of the underlying queue.
~Bipc_mq_handle()
Implements Persistent_mq_handle API: Destroys this handle (or no-op if no handle was successfully con...
bool interrupt_allow_impl()
Impl body for interrupt_*() and allow_*().
void wait_sendable(Error_code *err_code=0)
Implements Persistent_mq_handle API: Like send() but without the actual pushing of a message.
size_t max_msg_size() const
Implements Persistent_mq_handle API: Returns the max message size of the underlying queue.
bool wait_impl(util::Fine_duration timeout_from_now, Error_code *err_code)
Impl body for *_sendable() and *_receivable().
bool is_receivable(Error_code *err_code=0)
Implements Persistent_mq_handle API: Like try_receive() but without the actual popping of a message.
Shared_name m_absolute_name
See absolute_name().
bool is_sendable(Error_code *err_code=0)
Implements Persistent_mq_handle API: Like try_send() but without the actual pushing of a message.
String-wrapping abstraction representing a name uniquely distinguishing a kernel-persistent entity fr...
const char * native_str() const
Returns (sans copying) pointer to NUL-terminated wrapped name string, suitable to pass into sys calls...
static Shared_name ct(const Source &src)
Copy-constructs from a char-sequence container (including string, util::String_view,...
@ S_INTERRUPTED
A blocking operation was intentionally interrupted or preemptively canceled.
@ S_MQ_BIPC_MISC_LIBRARY_ERROR
Low-level message queue: boost.interprocess emitted miscellaneous library exception sans a system cod...
@ S_MQ_MESSAGE_SIZE_OVER_OR_UNDERFLOW
Low-level message queue send-op buffer overflow (> max size) or receive-op buffer underflow (< max si...
Flow-IPC module providing transmission of structured messages and/or low-level blobs (and more) betwe...
util::Shared_name Shared_name
Convenience alias for the commonly used type util::Shared_name.
std::ostream & operator<<(std::ostream &os, const Bipc_mq_handle &val)
Prints string representation of the given Bipc_mq_handle to the given ostream.
void swap(Bipc_mq_handle &val1, Bipc_mq_handle &val2)
Implements Persistent_mq_handle related concept: Swaps two objects.
bipc::permissions Permissions
Short-hand for Unix (POSIX) permissions class.
const uint8_t * blob_data(const Blob_const &blob)
Syntactic-sugary helper that returns pointer to first byte in an immutable buffer,...
bipc::open_only_t Open_only
Tag type indicating an ideally-atomic open-if-exists-else-fail operation.
const Open_or_create OPEN_OR_CREATE
Tag value indicating an open-if-exists-else-create operation.
const Open_only OPEN_ONLY
Tag value indicating an atomic open-if-exists-else-fail operation.
boost::asio::mutable_buffer Blob_mutable
Short-hand for an mutable blob somewhere in memory, stored as exactly a void* and a size_t.
bipc::open_or_create_t Open_or_create
Tag type indicating an atomic open-if-exists-else-create operation.
bipc::create_only_t Create_only
Tag type indicating a create-unless-exists-else-fail operation.
flow::Fine_time_pt Fine_time_pt
Short-hand for Flow's Fine_time_pt.
flow::Fine_duration Fine_duration
Short-hand for Flow's Fine_duration.
const Create_only CREATE_ONLY
Tag value indicating an atomic create-unless-exists-else-fail operation.
boost::asio::const_buffer Blob_const
Short-hand for an immutable blob somewhere in memory, stored as exactly a void const * and a size_t.
flow::util::String_view String_view
Short-hand for Flow's String_view.
Log_component
The flow::log::Component payload enumeration containing various log components used by Flow-IPC inter...
flow::Error_code Error_code
Short-hand for flow::Error_code which is very common.