25#include <flow/error/error.hpp>
26#include <flow/log/config.hpp>
27#include <boost/interprocess/shared_memory_object.hpp>
28#include <boost/move/make_unique.hpp>
41template<
typename Persistent_mq_handle>
78 template<
typename Handle_name_func>
163template<
typename Persistent_mq_handle>
169 if (flow::error::exec_void_and_throw_on_error
171 { remove_persistent(logger_ptr, name, actual_err_code); },
172 err_code,
"Blob_stream_mq_base_impl::remove_persistent()"))
181 Mq::remove_persistent(logger_ptr, name, &mq_err_code);
191 *err_code = mq_err_code ? mq_err_code
192 : (sentinel_err_code1 ? sentinel_err_code1 : sentinel_err_code2);
195template<
typename Persistent_mq_handle>
196template<
typename Handle_name_func>
201 Mq::for_each_persistent(handle_name_func);
217template<
typename Persistent_mq_handle>
220 Mq&& mq,
bool snd_else_rcv,
224 using flow::error::Runtime_error;
225 using flow::log::Sev;
226 using boost::system::system_category;
227 using boost::movelib::make_unique;
228 using bipc::shared_memory_object;
230 FLOW_LOG_SET_CONTEXT(logger_ptr, Log_component::S_TRANSPORT);
251 const auto sentinel_name = mq_sentinel_name(mq.absolute_name(), snd_else_rcv);
253 FLOW_LOG_TRACE(
"MQ handle [" << mq <<
"]: Atomic-creating sentinel SHM-pool [" << sentinel_name <<
"] "
254 "to ensure no more than 1 [" << (snd_else_rcv ?
"snd" :
"rcv") <<
"] side of pipe exists.");
261 [[maybe_unused]]
auto shm_pool_sentinel
262 = make_unique<shared_memory_object>(
util::CREATE_ONLY, sentinel_name.native_str(), bipc::read_only);
265 catch (
const bipc::interprocess_exception& exc)
267 const auto native_code_raw = exc.get_native_error();
268 const auto bipc_err_code_enum = exc.get_error_code();
269 const bool is_dupe_error = bipc_err_code_enum == bipc::already_exists_error;
270 FLOW_LOG_WARNING(
"MQ handle [" << mq <<
"]: While creating [" << (snd_else_rcv ?
"send" :
"receiv") <<
"er] "
271 "was checking none already exists in any process, as this is a one-directional pipe; "
272 "tried to create SHM sentinel pool [" << sentinel_name <<
"]; "
273 "bipc threw interprocess_exception; will emit some hopefully suitable Flow-IPC Error_code; "
274 "probably it's already-exists error meaning dupe-sender; "
275 "but here are all the details of the original exception: native code int "
276 "[" << native_code_raw <<
"]; bipc error_code_t enum->int "
277 "[" <<
int(bipc_err_code_enum) <<
"]; latter==already-exists = [" << is_dupe_error <<
"]; "
278 "message = [" << exc.what() <<
"].");
279 *err_code = is_dupe_error ?
Error_code(snd_else_rcv
292 [get_logger, get_log_component, snd_else_rcv, sentinel_name](
Mq* mq_ptr)
294 const auto other_sentinel_name = mq_sentinel_name(mq_ptr->
absolute_name(), !snd_else_rcv);
296 FLOW_LOG_INFO(
"MQ handle [" << *mq_ptr <<
"]: MQ handle ([" << (snd_else_rcv ?
"send" :
"receiv") <<
"er] "
297 "side of pipe) about to close. Hence by contract closing pipe itself too. "
298 "To prevent any attempt to reattach to underlying MQ "
299 "we make MQ anonymous by removing persistent-MQ-name [" << mq_ptr->
absolute_name() <<
"]. "
300 "Then removing sentinel SHM-pool [" << sentinel_name <<
"] to enable reuse of name for new MQ; "
301 "as well as the other-direction sentinel SHM-pool [" << other_sentinel_name <<
"].");
313 Mq::remove_persistent(
nullptr, mq_ptr->
absolute_name(), &sink);
343template<
typename Persistent_mq_handle>
352 sentinel_name /= Mq::S_RESOURCE_TYPE_ID;
353 sentinel_name /=
"sentinel";
354 sentinel_name +=
String_view(snd_else_rcv ?
"Snd" :
"Rcv");
356 sentinel_name += mq_name;
358 return sentinel_name;
Internal implementation of Blob_stream_mq_base class template; and common utilities used by Blob_stre...
Control_cmd
If Blob_stream_mq_sender_impl sends an empty message, in NORMAL state Blob_stream_mq_receiver enters ...
static Auto_closing_mq ensure_unique_peer(flow::log::Logger *logger_ptr, Mq &&mq, bool snd_else_rcv, Error_code *err_code)
Internal helper for Blob_stream_mq_sender and Blob_stream_mq_receiver that operates both the start an...
static void for_each_persistent(const Handle_name_func &handle_name_func)
See Blob_stream_mq_base counterpart.
static void remove_persistent(flow::log::Logger *logger_ptr, const Shared_name &name, Error_code *err_code)
See Blob_stream_mq_base counterpart.
Persistent_mq_handle Mq
Short-hand for template arg for underlying MQ handle type.
boost::movelib::unique_ptr< Mq, Function< void(Mq *)> > Auto_closing_mq
Persistent_mq_handle holder that takes a deleter lambda on construction, intended here to perform add...
static Shared_name mq_sentinel_name(const Shared_name &mq_name, bool snd_else_rcv)
Name of sentinel SHM pool created by ensure_unique_peer() and potentially cleaned up by remove_persis...
A documentation-only concept defining the behavior of an object representing a light-weight handle to...
const Shared_name & absolute_name() const
Returns name equal to absolute_name passed to ctor.
String-wrapping abstraction representing a name uniquely distinguishing a kernel-persistent entity fr...
static const Shared_name S_RESOURCE_TYPE_ID_SHM
Relative-folder fragment (no separators) identifying the resource type for: SHM pools.
bool absolute() const
Returns true if and only if the first character is S_SEPARATOR.
@ S_BLOB_STREAM_MQ_SENDER_EXISTS
Message-queue blob stream outgoing-direction peer could not be created, because one already exists at...
@ S_BLOB_STREAM_MQ_RECEIVER_EXISTS
Message-queue blob stream incoming-direction peer could not be created, because one already exists at...
Flow-IPC module providing transmission of structured messages and/or low-level blobs (and more) betwe...
void remove_persistent_shm_pool(flow::log::Logger *logger_ptr, const Shared_name &pool_name, Error_code *err_code)
Equivalent to shm::classic::Pool_arena::remove_persistent().
Shared_name build_conventional_non_session_based_shared_name(const Shared_name &resource_type)
Builds an absolute name according to the path convention explained in Shared_name class doc header; t...
const Create_only CREATE_ONLY
Tag value indicating an atomic create-unless-exists-else-fail operation.
flow::util::String_view String_view
Short-hand for Flow's String_view.
@ S_END_SENTINEL
CAUTION – see ipc::Log_component doc header for directions to find actual members of this enum class.
flow::Error_code Error_code
Short-hand for flow::Error_code which is very common.
flow::Function< Signature > Function
Short-hand for polymorphic functor holder which is very common. This is essentially std::function.