25#include <flow/error/error.hpp> 
   26#include <flow/log/config.hpp> 
   27#include <boost/interprocess/shared_memory_object.hpp> 
   28#include <boost/move/make_unique.hpp> 
   41template<
typename Persistent_mq_handle>
 
   78  template<
typename Handle_name_func>
 
  163template<
typename Persistent_mq_handle>
 
  169  if (flow::error::exec_void_and_throw_on_error
 
  171           { remove_persistent(logger_ptr, name, actual_err_code); },
 
  172         err_code, 
"Blob_stream_mq_base_impl::remove_persistent()"))
 
  181  Mq::remove_persistent(logger_ptr, name, &mq_err_code);
 
  191  *err_code = mq_err_code ? mq_err_code
 
  192                          : (sentinel_err_code1 ? sentinel_err_code1 : sentinel_err_code2);
 
  195template<
typename Persistent_mq_handle>
 
  196template<
typename Handle_name_func>
 
  201  Mq::for_each_persistent(handle_name_func);
 
  217template<
typename Persistent_mq_handle>
 
  220                                                                     Mq&& mq, 
bool snd_else_rcv,
 
  224  using flow::error::Runtime_error;
 
  225  using flow::log::Sev;
 
  226  using boost::system::system_category;
 
  227  using boost::movelib::make_unique;
 
  228  using bipc::shared_memory_object;
 
  230  FLOW_LOG_SET_CONTEXT(logger_ptr, Log_component::S_TRANSPORT); 
 
  251  const auto sentinel_name = mq_sentinel_name(mq.absolute_name(), snd_else_rcv);
 
  253  FLOW_LOG_TRACE(
"MQ handle [" << mq << 
"]: Atomic-creating sentinel SHM-pool [" << sentinel_name << 
"] " 
  254                 "to ensure no more than 1 [" << (snd_else_rcv ? 
"snd" : 
"rcv") << 
"] side of pipe exists.");
 
  261    [[maybe_unused]] 
auto shm_pool_sentinel
 
  262      = make_unique<shared_memory_object>(
util::CREATE_ONLY, sentinel_name.native_str(), bipc::read_only);
 
  265  catch (
const bipc::interprocess_exception& exc)
 
  267    const auto native_code_raw = exc.get_native_error();
 
  268    const auto bipc_err_code_enum = exc.get_error_code();
 
  269    const bool is_dupe_error = bipc_err_code_enum == bipc::already_exists_error;
 
  270    FLOW_LOG_WARNING(
"MQ handle [" << mq << 
"]: While creating [" << (snd_else_rcv ? 
"send" : 
"receiv") << 
"er] " 
  271                     "was checking none already exists in any process, as this is a one-directional pipe; " 
  272                     "tried to create SHM sentinel pool [" << sentinel_name << 
"]; " 
  273                     "bipc threw interprocess_exception; will emit some hopefully suitable Flow-IPC Error_code; " 
  274                     "probably it's already-exists error meaning dupe-sender; " 
  275                     "but here are all the details of the original exception: native code int " 
  276                     "[" << native_code_raw << 
"]; bipc error_code_t enum->int " 
  277                     "[" << 
int(bipc_err_code_enum) << 
"]; latter==already-exists = [" << is_dupe_error << 
"]; " 
  278                     "message = [" << exc.what() << 
"].");
 
  279    *err_code = is_dupe_error ? 
Error_code(snd_else_rcv
 
  292                         [get_logger, get_log_component, snd_else_rcv, sentinel_name](
Mq* mq_ptr)
 
  294    const auto other_sentinel_name = mq_sentinel_name(mq_ptr->
absolute_name(), !snd_else_rcv);
 
  296    FLOW_LOG_INFO(
"MQ handle [" << *mq_ptr << 
"]: MQ handle ([" << (snd_else_rcv ? 
"send" : 
"receiv") << 
"er] " 
  297                  "side of pipe) about to close.  Hence by contract closing pipe itself too.  " 
  298                  "To prevent any attempt to reattach to underlying MQ " 
  299                  "we make MQ anonymous by removing persistent-MQ-name [" << mq_ptr->
absolute_name() << 
"].  " 
  300                  "Then removing sentinel SHM-pool [" << sentinel_name << 
"] to enable reuse of name for new MQ; " 
  301                  "as well as the other-direction sentinel SHM-pool [" << other_sentinel_name << 
"].");
 
  313    Mq::remove_persistent(
nullptr, mq_ptr->
absolute_name(), &sink);
 
  343template<
typename Persistent_mq_handle>
 
  352  sentinel_name /= Mq::S_RESOURCE_TYPE_ID; 
 
  353  sentinel_name /= 
"sentinel";
 
  354  sentinel_name += 
String_view(snd_else_rcv ? 
"Snd" : 
"Rcv");
 
  356  sentinel_name += mq_name; 
 
  358  return sentinel_name;
 
Internal implementation of Blob_stream_mq_base class template; and common utilities used by Blob_stre...
Control_cmd
If Blob_stream_mq_sender_impl sends an empty message, in NORMAL state Blob_stream_mq_receiver enters ...
static Auto_closing_mq ensure_unique_peer(flow::log::Logger *logger_ptr, Mq &&mq, bool snd_else_rcv, Error_code *err_code)
Internal helper for Blob_stream_mq_sender and Blob_stream_mq_receiver that operates both the start an...
static void for_each_persistent(const Handle_name_func &handle_name_func)
See Blob_stream_mq_base counterpart.
static void remove_persistent(flow::log::Logger *logger_ptr, const Shared_name &name, Error_code *err_code)
See Blob_stream_mq_base counterpart.
Persistent_mq_handle Mq
Short-hand for template arg for underlying MQ handle type.
boost::movelib::unique_ptr< Mq, Function< void(Mq *)> > Auto_closing_mq
Persistent_mq_handle holder that takes a deleter lambda on construction, intended here to perform add...
static Shared_name mq_sentinel_name(const Shared_name &mq_name, bool snd_else_rcv)
Name of sentinel SHM pool created by ensure_unique_peer() and potentially cleaned up by remove_persis...
A documentation-only concept defining the behavior of an object representing a light-weight handle to...
const Shared_name & absolute_name() const
Returns name equal to absolute_name passed to ctor.
String-wrapping abstraction representing a name uniquely distinguishing a kernel-persistent entity fr...
static const Shared_name S_RESOURCE_TYPE_ID_SHM
Relative-folder fragment (no separators) identifying the resource type for: SHM pools.
bool absolute() const
Returns true if and only if the first character is S_SEPARATOR.
@ S_BLOB_STREAM_MQ_SENDER_EXISTS
Message-queue blob stream outgoing-direction peer could not be created, because one already exists at...
@ S_BLOB_STREAM_MQ_RECEIVER_EXISTS
Message-queue blob stream incoming-direction peer could not be created, because one already exists at...
Flow-IPC module providing transmission of structured messages and/or low-level blobs (and more) betwe...
void remove_persistent_shm_pool(flow::log::Logger *logger_ptr, const Shared_name &pool_name, Error_code *err_code)
Equivalent to shm::classic::Pool_arena::remove_persistent().
Shared_name build_conventional_non_session_based_shared_name(const Shared_name &resource_type)
Builds an absolute name according to the path convention explained in Shared_name class doc header; t...
const Create_only CREATE_ONLY
Tag value indicating an atomic create-unless-exists-else-fail operation.
flow::util::String_view String_view
Short-hand for Flow's String_view.
@ S_END_SENTINEL
CAUTION – see ipc::Log_component doc header for directions to find actual members of this enum class.
flow::Error_code Error_code
Short-hand for flow::Error_code which is very common.
flow::Function< Signature > Function
Short-hand for polymorphic functor holder which is very common. This is essentially std::function.