Flow-IPC 1.0.1
Flow-IPC project: Full implementation reference.
blob_stream_mq_impl.hpp
Go to the documentation of this file.
1/* Flow-IPC: Core
2 * Copyright 2023 Akamai Technologies, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the
5 * "License"); you may not use this file except in
6 * compliance with the License. You may obtain a copy
7 * of the License at
8 *
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in
12 * writing, software distributed under the License is
13 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
14 * CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing
16 * permissions and limitations under the License. */
17
18/// @file
19#pragma once
20
25#include <flow/error/error.hpp>
26#include <flow/log/config.hpp>
27#include <boost/interprocess/shared_memory_object.hpp>
28#include <boost/move/make_unique.hpp>
29
30namespace ipc::transport
31{
32// Types.
33
34/**
35 * Internal implementation of Blob_stream_mq_base class template; and common utilities used by
36 * Blob_stream_mq_sender_impl and Blob_stream_mq_receiver_impl (`static` items only as of this writing).
37 *
38 * @tparam Persistent_mq_handle
39 * See Persistent_mq_handle concept doc header.
40 */
41template<typename Persistent_mq_handle>
43{
44public:
45 // Types.
46
47 /// Short-hand for template arg for underlying MQ handle type.
49
50 /**
51 * Persistent_mq_handle holder that takes a deleter lambda on construction, intended here to perform additional deinit
52 * steps in addition to closing the #Mq by deleting it. Used by ensure_unique_peer() machinery.
53 */
54 using Auto_closing_mq = boost::movelib::unique_ptr<Mq, Function<void (Mq*)>>;
55
56 // Methods.
57
58 /**
59 * See Blob_stream_mq_base counterpart.
60 *
61 * @param logger_ptr
62 * See Blob_stream_mq_base counterpart.
63 * @param name
64 * See Blob_stream_mq_base counterpart.
65 * @param err_code
66 * See Blob_stream_mq_base counterpart.
67 */
68 static void remove_persistent(flow::log::Logger* logger_ptr, const Shared_name& name, Error_code* err_code);
69
70 /**
71 * See Blob_stream_mq_base counterpart.
72 *
73 * @tparam Handle_name_func
74 * See Blob_stream_mq_base counterpart.
75 * @param handle_name_func
76 * See Blob_stream_mq_base counterpart.
77 */
78 template<typename Handle_name_func>
79 static void for_each_persistent(const Handle_name_func& handle_name_func);
80
81 /**
82 * Internal helper for Blob_stream_mq_sender and Blob_stream_mq_receiver that operates both the start and end
83 * of the anti-dupe-endpoint machinery used by those 2 classes to prevent more than 1 `_sender` and more than 1
84 * `_receiver` for a given underlying MQ.
85 *
86 * The input is a handle to the MQ, as created by user as of this writing, either on the sender or receiver
87 * end (`snd_else_rcv`). The output is one of:
88 * - a null return, indicating that either there's already a sender (or receiver) already registered (typically),
89 * or some other (unlikely) system error. `*err_code` will indicate what happened; or
90 * - a non-null returned `unique_ptr` that has moved `mq` into itself/taken over ownership of it, with certain
91 * machinery such that when that `unique_ptr` (or moved version thereof) is `.reset()` or destroyed,
92 * the proper deinit will run, making it possible to create another MQ at the same Shared_name.
93 *
94 * `mq` is untouched if null is returned.
95 *
96 * @param logger_ptr
97 * Logger to use subsequently. Errors are logged as WARNING; otherwise nothing of INFO or higher verbosity
98 * is logged.
99 * @param mq
100 * See above.
101 * @param snd_else_rcv
102 * `true` if this is the sender side; else the receiver side.
103 * @param err_code
104 * Must not be null, or behavior undefined; will be set to success if non-null returned or reason for failure
105 * otherwise.
106 * @return null or pointer to new Persistent_mq_handle moved-from `mq`, and with deinit deleter.
107 */
108 static Auto_closing_mq ensure_unique_peer(flow::log::Logger* logger_ptr, Mq&& mq, bool snd_else_rcv,
109 Error_code* err_code);
110
111protected:
112 // Types.
113
114 /**
115 * If Blob_stream_mq_sender_impl sends an empty message, in NORMAL state Blob_stream_mq_receiver enters CONTROL
116 * state and expects one of these values in the next message, to react as documented per `enum` value
117 * (upon re-entering NORMAL state).
118 *
119 * ### Rationale: Why is the underlying type signed? ###
120 * There is a special type of CONTROL command used only at the beginning of the conversation: protocol negotiation;
121 * each side sends to the other the highest protocol version it can speak. In that case instead of using one of
122 * the `enum` encodings it sends the encoding of the inverse of the version number which is always positive
123 * (so -1 means version 1, -2 means 2, etc.). Using a signed type makes working with that eventuality a little
124 * easier and more expressive. (No, we don't really expect the `enum` variants to anywhere close to all these
125 * bits anyway.)
126 *
127 * ### Rationale: Why 64 bits? ###
128 * No huge reason. It felt prudent to leave a bit of reserved space, maybe for forward compatibility.
129 * CONTROL messages are rare, so it should not affect perf.
130 */
131 enum class Control_cmd : int64_t
132 {
133 /// Indicates sender user invoked Blob_sender::end_sending() (graceful close). Emit/queue graceful-close to user.
134 S_END_SENDING,
135
136 /**
137 * Indicates sender user invoked auto_ping() earlier, and this is an incoming periodic ping. If
138 * receiver user invoked idle_timer_run() earlier, reset countdown to death due to lacking in-pings.
139 */
140 S_PING,
141
142 /// Sentinel: not a valid value. May be used to, e.g., ensure validity of incoming value of allegedly this type.
144 }; // enum class Control_cmd
145
146private:
147 // Methods.
148
149 /**
150 * Name of sentinel SHM pool created by ensure_unique_peer() and potentially cleaned up by remove_persistent().
151 *
152 * @param mq_name
153 * `mq.absolute_name()`. See ensure_unique_peer().
154 * @param snd_else_rcv
155 * See ensure_unique_peer().
156 * @return Absolute name.
157 */
158 static Shared_name mq_sentinel_name(const Shared_name& mq_name, bool snd_else_rcv);
159}; // class Blob_stream_mq_base_impl
160
161// Template implementations.
162
163template<typename Persistent_mq_handle>
164void Blob_stream_mq_base_impl<Persistent_mq_handle>::remove_persistent(flow::log::Logger* logger_ptr, // Static.
165 const Shared_name& name, Error_code* err_code)
166{
168
169 if (flow::error::exec_void_and_throw_on_error
170 ([&](Error_code* actual_err_code)
171 { remove_persistent(logger_ptr, name, actual_err_code); },
172 err_code, "Blob_stream_mq_base_impl::remove_persistent()"))
173 {
174 return;
175 }
176 // else
177 assert(err_code);
178
179 // This part is the no-brainer:
180 Error_code mq_err_code;
181 Mq::remove_persistent(logger_ptr, name, &mq_err_code);
182
183 /* This part is what is needed on top of the above, because Blob_stream_mq_*er also creates these little pools
184 * to keep track of uniqueness (that there's only one sender and receiver per MQ). */
185 Error_code sentinel_err_code1;
186 remove_persistent_shm_pool(logger_ptr, mq_sentinel_name(name, true), &sentinel_err_code1);
187 Error_code sentinel_err_code2;
188 remove_persistent_shm_pool(logger_ptr, mq_sentinel_name(name, false), &sentinel_err_code2);
189
190 // Report an error <=> any one of three failed; but the key MQ's error (if it occurred) "wins."
191 *err_code = mq_err_code ? mq_err_code
192 : (sentinel_err_code1 ? sentinel_err_code1 : sentinel_err_code2);
193} // Blob_stream_mq_base_impl::remove_persistent()
194
195template<typename Persistent_mq_handle>
196template<typename Handle_name_func>
197void // Static.
199{
200 // Do exactly what we promised in contract.
201 Mq::for_each_persistent(handle_name_func);
202
203 /* Discussion: Is what we promised actually the right thing to promise though? Answer: Yes and no. Or:
204 * Yes, within reason. ensure_unique_peer(), in the deleter it generated/returns in the Auto_closing_mq,
205 * deletes the MQ and both sentinels. (The other-direction ensure_unique_peer() then loses the race and has
206 * nothing to delete in its Auto_closing_mq; and that's fine.) The deleter first removes the sentinels, then
207 * the MQ; so even if the abort occurred *during* the deleter, listing the still-existing MQs now should
208 * yield all MQs for which there are still resources to delete (at least the MQ entry; and possibly 0, 1, or 2
209 * sentinels, depending on when the abort occurred (probably 2, in most cases, unless unlucky).
210 *
211 * We could also list all SHM pools matching the naming convention we use in mq_sentinel_name(), deducing
212 * the MQ name from any sentinel name found and adding that to the list we emit. Frankly I (ygoldfel) simply
213 * don't want to. This is a best-effort cleanup for abort situation only; and we do remove the MQ last
214 * in the deleter, so everything should work out fine this way, and the code is simple. */
215} // Blob_stream_mq_base_impl::for_each_persistent()
216
217template<typename Persistent_mq_handle>
220 Mq&& mq, bool snd_else_rcv,
221 Error_code* err_code)
222{
224 using flow::error::Runtime_error;
225 using flow::log::Sev;
226 using boost::system::system_category;
227 using boost::movelib::make_unique;
228 using bipc::shared_memory_object;
229
230 FLOW_LOG_SET_CONTEXT(logger_ptr, Log_component::S_TRANSPORT); // @todo Maybe just make this method non-static.
231 assert(err_code);
232
233 /* Read Blob_stream_mq_sender class doc header (particularly regarding lifetimes of things); then return here.
234 * Summarizing:
235 *
236 * We've promised to enforce certain semantics, namely that a given MQ can be accessed, ever, by
237 * at most 1 Blob_stream_mq_sender (and 1 _receiver) -- across all processes, not just this one.
238 * So we do that here by maintaining a separate SHM-stored record indicating a _sender (or _receiver) exists already.
239 *
240 * Also we have to make it so that if _sender dtor (or _receiver dtor,
241 * whichever happens first) runs, within a short amount of time after that, they can again open a *new*
242 * channel at the same Shared_name. (In a sense we're trying to make our _sender/_receiver pair similar to
243 * a peer-pair Unix domain socket connection: once you close it: only two sides to it (though ours is 1-directional);
244 * and destroying either side hoses the connection.) To do that, we put a custom deleter on the MQ unique_ptr:
245 * once it goes away (whether in dtor, or earlier due to fatal error when sending/whatever), we have to "undo"
246 * whatever mechanism is guarding against multiple `_sender`s (or `_receiver`s) at that Shared_name. */
247
248 /* First atomically attempt to create a tiny dummy SHM pool whose existence indicates a Blob_stream_mq_....er
249 * to that absolute_name() exists. Its name: */
250
251 const auto sentinel_name = mq_sentinel_name(mq.absolute_name(), snd_else_rcv);
252
253 FLOW_LOG_TRACE("MQ handle [" << mq << "]: Atomic-creating sentinel SHM-pool [" << sentinel_name << "] "
254 "to ensure no more than 1 [" << (snd_else_rcv ? "snd" : "rcv") << "] side of pipe exists.");
255 try
256 {
257 /* Permissions note: leave at default (*nix: 644, meaning read/write by this user, read only for everyone else).
258 * However this controls writing to the pool itself -- but no one will be writing to the pool; it is only a
259 * sentinel by its existence. Hence maybe we could restrict it even more, but probably it is not important.
260 * @todo For good measure look into restricting it more; maybe even 000. */
261 [[maybe_unused]] auto shm_pool_sentinel
262 = make_unique<shared_memory_object>(util::CREATE_ONLY, sentinel_name.native_str(), bipc::read_only);
263 // On success it is immediately destroyed... but the name, and tiny SHM pool, lives on until deleter below execs.
264 }
265 catch (const bipc::interprocess_exception& exc)
266 {
267 const auto native_code_raw = exc.get_native_error();
268 const auto bipc_err_code_enum = exc.get_error_code();
269 const bool is_dupe_error = bipc_err_code_enum == bipc::already_exists_error;
270 FLOW_LOG_WARNING("MQ handle [" << mq << "]: While creating [" << (snd_else_rcv ? "send" : "receiv") << "er] "
271 "was checking none already exists in any process, as this is a one-directional pipe; "
272 "tried to create SHM sentinel pool [" << sentinel_name << "]; "
273 "bipc threw interprocess_exception; will emit some hopefully suitable Flow-IPC Error_code; "
274 "probably it's already-exists error meaning dupe-sender; "
275 "but here are all the details of the original exception: native code int "
276 "[" << native_code_raw << "]; bipc error_code_t enum->int "
277 "[" << int(bipc_err_code_enum) << "]; latter==already-exists = [" << is_dupe_error << "]; "
278 "message = [" << exc.what() << "].");
279 *err_code = is_dupe_error ? Error_code(snd_else_rcv
282 : Error_code(errno, system_category());
283 return Auto_closing_mq();
284 } // catch (bipc::interprocess_exception)
285 // Got here: OK, no dupe, no other problem.
286 err_code->clear();
287
288 /* Now take over their MQ handle. As noted above set up the auto-closing extra behavior to keep above scheme working,
289 * if they want to reuse the name to make another MQ later (although informally we don't recommend it -- much
290 * like, conceptually, binding a TCP socket to the same port soon after closing another can bring problems). */
291 return Auto_closing_mq(new Mq(std::move(mq)), // Now we own the MQ handle.
292 [get_logger, get_log_component, snd_else_rcv, sentinel_name](Mq* mq_ptr)
293 {
294 const auto other_sentinel_name = mq_sentinel_name(mq_ptr->absolute_name(), !snd_else_rcv);
295
296 FLOW_LOG_INFO("MQ handle [" << *mq_ptr << "]: MQ handle ([" << (snd_else_rcv ? "send" : "receiv") << "er] "
297 "side of pipe) about to close. Hence by contract closing pipe itself too. "
298 "To prevent any attempt to reattach to underlying MQ "
299 "we make MQ anonymous by removing persistent-MQ-name [" << mq_ptr->absolute_name() << "]. "
300 "Then removing sentinel SHM-pool [" << sentinel_name << "] to enable reuse of name for new MQ; "
301 "as well as the other-direction sentinel SHM-pool [" << other_sentinel_name << "].");
302
303 /* (If sender does it first, receiver will fail here; and vice versa. Just eat all logging (null logger);
304 * and ignore any errors below. Nothing we can do anyway.) */
305 Error_code sink;
306
307 // Blob_stream_mq_*er<Mq>(Mq(mq.absolute_name())) would fail until we do:
308
309 remove_persistent_shm_pool(nullptr, sentinel_name, &sink);
310 remove_persistent_shm_pool(nullptr, other_sentinel_name, &sink);
311
312 // Mq(Open_only, mq.absolute_name()) would now succeed. Mq(Create_only, mq.absolute_name()) would now fail.
313 Mq::remove_persistent(nullptr, mq_ptr->absolute_name(), &sink);
314 // Mq(Open_only, mq.absolute_name()) would now fail. Mq(Create_only, mq.absolute_name()) would now succeed.
315
316 /* Discussion:
317 * - The order (sentinels versus MQ itself) is ~reversed compared to the creation, where MQ is made first.
318 * In general that's a good idea (all else being equal); but specifically it could help avoid a corner
319 * case involving post-abort cleanup using for_each_persistent() and remove_persistent(): If an abort
320 * occurs during the code you're reading now, the thing on which for_each_persistent() is keyed
321 * (listing of the MQs) is the last thing removed. Otherwise the sentinel(s) could get leaked, if abort
322 * occurs after MQ removed but before the sentinel(s) are; for_each_persistent() won't detect it/them.
323 *
324 * - Is it weird we *created* sentinel_name pool but are deleting both it and the one that would've been
325 * created by the other-direction peer's invocation of the present function? It does seem weird, but it's
326 * actually good and proactive; and safe.
327 * - Is it safe? Answer: If the other sentinel exists still, then the other-direction peer dtor has not yet run.
328 * The sentinel is only useful if they try to create (open-only) another other-direction peer for the same name.
329 * That will fail in any case though: we just removed the MQ! If they try to create such a peer in
330 * create mode, then they are breaking the contract wherein they must not reuse a name until both dtors
331 * have executed (and even then, it is *recommended* a name is never reused, or not anytime soon at least).
332 * So yes, it is safe, in that the only reason the other-direction sentinel would even be checked does not
333 * validly exist at this stage.
334 * - Is it beneficial? Answer: Yes, because it means in practice all 3 persistent items will be deleted at
335 * essentially the same time. This helps avoid an on-abort leak mentioned in for_each_persistent(). */
336
337 // Lastly... don't forget:
338
339 delete mq_ptr;
340 }); // return make_unique<Mq>()
341} // Blob_stream_mq_base_impl::ensure_unique_peer()
342
343template<typename Persistent_mq_handle>
345 bool snd_else_rcv)
346{
347 using util::String_view;
348
349 auto sentinel_name
351 // That was the standard stuff. Now the us-specific stuff:
352 sentinel_name /= Mq::S_RESOURCE_TYPE_ID; // Differentiate between different MQ types.
353 sentinel_name /= "sentinel";
354 sentinel_name += String_view(snd_else_rcv ? "Snd" : "Rcv");
355 assert(mq_name.absolute());
356 sentinel_name += mq_name; // The MQ name -- ALL of it -- is completely in here (starting with separator).
357
358 return sentinel_name;
359}
360
361} // namespace ipc::transport
Internal implementation of Blob_stream_mq_base class template; and common utilities used by Blob_stre...
Control_cmd
If Blob_stream_mq_sender_impl sends an empty message, in NORMAL state Blob_stream_mq_receiver enters ...
static Auto_closing_mq ensure_unique_peer(flow::log::Logger *logger_ptr, Mq &&mq, bool snd_else_rcv, Error_code *err_code)
Internal helper for Blob_stream_mq_sender and Blob_stream_mq_receiver that operates both the start an...
static void for_each_persistent(const Handle_name_func &handle_name_func)
See Blob_stream_mq_base counterpart.
static void remove_persistent(flow::log::Logger *logger_ptr, const Shared_name &name, Error_code *err_code)
See Blob_stream_mq_base counterpart.
Persistent_mq_handle Mq
Short-hand for template arg for underlying MQ handle type.
boost::movelib::unique_ptr< Mq, Function< void(Mq *)> > Auto_closing_mq
Persistent_mq_handle holder that takes a deleter lambda on construction, intended here to perform add...
static Shared_name mq_sentinel_name(const Shared_name &mq_name, bool snd_else_rcv)
Name of sentinel SHM pool created by ensure_unique_peer() and potentially cleaned up by remove_persis...
A documentation-only concept defining the behavior of an object representing a light-weight handle to...
const Shared_name & absolute_name() const
Returns name equal to absolute_name passed to ctor.
String-wrapping abstraction representing a name uniquely distinguishing a kernel-persistent entity fr...
static const Shared_name S_RESOURCE_TYPE_ID_SHM
Relative-folder fragment (no separators) identifying the resource type for: SHM pools.
bool absolute() const
Returns true if and only if the first character is S_SEPARATOR.
@ S_BLOB_STREAM_MQ_SENDER_EXISTS
Message-queue blob stream outgoing-direction peer could not be created, because one already exists at...
@ S_BLOB_STREAM_MQ_RECEIVER_EXISTS
Message-queue blob stream incoming-direction peer could not be created, because one already exists at...
Flow-IPC module providing transmission of structured messages and/or low-level blobs (and more) betwe...
void remove_persistent_shm_pool(flow::log::Logger *logger_ptr, const Shared_name &pool_name, Error_code *err_code)
Equivalent to shm::classic::Pool_arena::remove_persistent().
Definition: util.cpp:119
Shared_name build_conventional_non_session_based_shared_name(const Shared_name &resource_type)
Builds an absolute name according to the path convention explained in Shared_name class doc header; t...
const Create_only CREATE_ONLY
Tag value indicating an atomic create-unless-exists-else-fail operation.
Definition: util.cpp:32
flow::util::String_view String_view
Short-hand for Flow's String_view.
Definition: util_fwd.hpp:109
@ S_END_SENTINEL
CAUTION – see ipc::Log_component doc header for directions to find actual members of this enum class.
flow::Error_code Error_code
Short-hand for flow::Error_code which is very common.
Definition: common.hpp:297
flow::Function< Signature > Function
Short-hand for polymorphic functor holder which is very common. This is essentially std::function.
Definition: common.hpp:301