Flow-IPC 1.0.0
Flow-IPC project: Full implementation reference.
blob_stream_mq_rcv.hpp
Go to the documentation of this file.
1/* Flow-IPC: Core
2 * Copyright 2023 Akamai Technologies, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the
5 * "License"); you may not use this file except in
6 * compliance with the License. You may obtain a copy
7 * of the License at
8 *
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in
12 * writing, software distributed under the License is
13 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
14 * CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing
16 * permissions and limitations under the License. */
17
18/// @file
19#pragma once
20
22#include <boost/move/make_unique.hpp>
23#include <experimental/propagate_const>
24
25namespace ipc::transport
26{
27
28// Types.
29
30/**
31 * Implements Blob_receiver concept by using an adopted Persistent_mq_handle MQ handle to an MQ (message queue)
32 * of that type, such as a POSIX or bipc MQ. This allows for high-performance, potentially zero-copy (except
33 * for copying out of the transport MQ) of discrete messages, each containing a binary blob.
34 * This is a low-level (core) transport mechanism; higher-level (structured)
35 * transport mechanisms may use Blob_stream_mq_receiver (and Blob_stream_mq_sender) to enable their work.
36 *
37 * Beyond that: see Blob_stream_mq_sender doc header. We are just the other side of this.
38 *
39 * ### Cleanup; reusing same absolute_name() ###
40 * See Blob_stream_mq_sender doc header. The same applies symmetrically.
41 *
42 * ### Performance, responsiveness ###
43 * See Blob_stream_mq_sender doc header. The same applies symmetrically. Be aware specifically that the same
44 * responsiveness-of-destructor caveat applies to Blob_stream_mq_receiver.
45 *
46 * ### Thread safety ###
47 * We add no more thread safety guarantees than those mandated by the main concepts. To wit:
48 * - Concurrent access to 2 separate Blob_stream_mq_receiver objects is safe.
49 * - After construction, concurrent access to the main transmission API methods and the dtor is not safe for a given
50 * `*this`.
51 *
52 * Methods and situations not covered by that text should have their thread safety explained in their individual doc
53 * headers.
54 *
55 * @internal
56 * ### Implementation design/rationale ###
57 * See pImpl-lite notes in Blob_stream_mq_sender doc header. The exact same applies here.
58 *
59 * @see Blob_stream_mq_receiver_impl doc header.
60 *
61 * @endinternal
62 *
63 * @tparam Persistent_mq_handle
64 * See Persistent_mq_handle concept doc header.
65 *
66 * @see Blob_receiver: implemented concept.
67 */
68template<typename Persistent_mq_handle>
69class Blob_stream_mq_receiver : public Blob_stream_mq_base<Persistent_mq_handle>
70{
71public:
72 // Types.
73
74 /// Short-hand for template arg for underlying MQ handle type.
76
77 /// Useful for generic programming, the `sync_io`-pattern counterpart to `*this` type.
79 /// You may disregard.
81
82 // Constants.
83
84 /// Implements concept API. Equals `Mq::S_RESOURCE_TYPE_ID`.
86
87 /**
88 * Implements concept API; namely it is `true`. Notes for transport::Native_socket_stream apply.
89 *
90 * @see Native_handle_receiver::S_BLOB_UNDERFLOW_ALLOWED: implemented concept. Accordingly also see
91 * "Blob underflow semantics" in transport::Native_handle_receiver doc header.
92 */
93 static constexpr bool S_BLOB_UNDERFLOW_ALLOWED = false;
94
95 // Constructors/destructor.
96
97 /**
98 * Constructs the receiver by taking over an already-opened MQ handle.
99 * Note that this op does not implement any concept; Blob_receiver concept does not define how a Blob_receiver
100 * is created in this explicit fashion.
101 *
102 * No traffic must have occurred on `mq_moved` up to this call. Otherwise behavior is undefined.
103 *
104 * If this fails (sets `*err_code` to truthy if not null; throws if null), all transmission calls on `*this`
105 * will fail with the post-value in `*err_code` emitted. In particular, error::Code::S_BLOB_STREAM_MQ_RECEIVER_EXISTS
106 * is that code, if the reason for failure was that another Blob_stream_mq_receiver to
107 * `mq_moved.absolute_name()` has already been created in this or other process. See Blob_stream_mq_sender class
108 * doc header for discussion of the relationship between Blob_stream_mq_sender, Blob_stream_mq_receiver, the
109 * underlying MQ at a given Shared_name, and that Shared_name as registered in the OS.
110 * In short: there is to be up to 1 Blob_stream_mq_sender and up to 1 Blob_stream_mq_receiver for a given
111 * named persistent MQ. In this way, it is one single-direction pipe with 2 peers, like half of
112 * Native_socket_stream pipe: it is not a MQ with
113 * back-and-forth traffic nor multiple senders or multiple receivers. The underlying MQ supports such things;
114 * but that is not what the Blob_sender/Blob_receiver concepts model.
115 *
116 * Along those same lines note that the dtor (at the latest -- which happens if no fatal error occurs throughout)
117 * will not only close the MQ handle acquired from `mq_moved` but will execute `Mq::remove_persistent(name)`,
118 * where `name == mq_moved.absolute_name()` pre-this-ctor.
119 *
120 * ### Leaks of persistent resources ###
121 * Notes from Blob_stream_mq_sender ctor doc header apply here.
122 *
123 * ### Performance ###
124 * The taking over of `mq_moved` should be thought of as light-weight.
125 *
126 * @param logger_ptr
127 * Logger to use for subsequently logging.
128 * @param mq_moved
129 * An MQ handle to an MQ with no traffic on it so far. Unless an error is emitted, `mq_moved` becomes
130 * nullified upon return from this ctor. `*this` owns the MQ handle from this point on and is reponsible
131 * for closing it.
132 * @param nickname_str
133 * Human-readable nickname of the new object, as of this writing for use in `operator<<(ostream)` and
134 * logging only.
135 * @param err_code
136 * See `flow::Error_code` docs for error reporting semantics. #Error_code generated:
137 * error::Code::S_BLOB_STREAM_MQ_RECEIVER_EXISTS (another Blob_stream_mq_receiver exists),
138 * system codes (other errors, all to do with the creation of a separate internally used tiny SHM pool
139 * used to prevent duplicate Blob_stream_mq_receiver in the system).
140 */
141 explicit Blob_stream_mq_receiver(flow::log::Logger* logger_ptr, util::String_view nickname_str,
142 Mq&& mq_moved, Error_code* err_code = 0);
143
144 /**
145 * Implements Blob_receiver API, per its concept contract.
146 * All the notes for that concept's core-adopting ctor apply.
147 *
148 * @param sync_io_core_in_peer_state_moved
149 * See above.
150 *
151 * @see Blob_receiver::Blob_receiver(): implemented concept.
152 */
153 explicit Blob_stream_mq_receiver(Sync_io_obj&& sync_io_core_in_peer_state_moved);
154
155 /**
156 * Implements Blob_receiver API, per its concept contract.
157 * All the notes for that concept's default ctor apply.
158 *
159 * @see Blob_receiver::Blob_receiver(): implemented concept.
160 */
162
163 /**
164 * Move-constructs from `src`; `src` becomes as-if default-cted (therefore in NULL state).
165 * Implements Blob_receiver API, per its concept contract.
166 *
167 * @param src
168 * See above.
169 *
170 * @see Blob_receiver::Blob_receiver(): implemented concept.
171 */
173
174 /**
175 * Implements Blob_receiver API. All the notes for the concept's destructor apply but as a reminder:
176 *
177 * Destroys this peer endpoint which will end the one-direction pipe and cancel any pending
178 * completion handlers by invoking it ASAP with error::Code::S_OBJECT_SHUTDOWN_ABORTED_COMPLETION_HANDLER.
179 * As of this writing these are the completion handlers that would therefore be called:
180 * - Any handler passed to async_receive_blob() that has not yet been invoked.
181 * There can be 0 or more of these.
182 *
183 * ### Fate of underlying MQ and its `.absolute_name()` ###
184 * Notes from Blob_stream_mq_sender dtor doc header apply here symmetrically.
185 *
186 * @see Blob_receiver::~Blob_receiver(): implemented concept.
187 */
189
190 // Methods.
191
192 /**
193 * Move-assigns from `src`; `*this` acts as if destructed; `src` becomes as-if default-cted (therefore in NULL state).
194 * No-op if `&src == this`.
195 * Implements Blob_receiver API, per its concept contract.
196 *
197 * @see ~Blob_stream_mq_receiver().
198 *
199 * @param src
200 * See above.
201 * @return `*this`.
202 *
203 * @see Blob_receiver move assignment: implemented concept.
204 */
206
207 /// Copy assignment is disallowed.
209
210 /**
211 * Implements Blob_receiver API per contract.
212 *
213 * @return See above.
214 *
215 * @see Blob_receiver::receive_blob_max_size(): implemented concept.
216 */
217 size_t receive_blob_max_size() const;
218
219 /**
220 * Implements Blob_receiver API per contract. Reminder: You may call this directly from within a
221 * completion handler you supplied to an earlier async_receive_blob(). Reminder: It's not thread-safe
222 * to call this concurrently with other transmission methods or destructor on the same `*this`.
223 *
224 * #Error_code generated and passed to `on_done_func()`:
225 * error::Code::S_OBJECT_SHUTDOWN_ABORTED_COMPLETION_HANDLER (destructor called, canceling all pending ops;
226 * spiritually identical to `boost::asio::error::operation_aborted`),
227 * error::Code::S_INVALID_ARGUMENT (non-pipe-hosing error: `target_blob.size()` is smaller than
228 * Persistent_mq_handle::max_msg_size()),
229 * error::Code::S_RECEIVES_FINISHED_CANNOT_RECEIVE (peer gracefully closed pipe via `*end_sending()`),
230 * other system codes, indicating the underlying transport is hosed for that specific reason, as detected during
231 * incoming-direction processing.
232 *
233 * error::Code::S_MESSAGE_SIZE_EXCEEDS_USER_STORAGE shall never be emitted. See
234 * Blob_stream_mq_receiver::S_BLOB_UNDERFLOW_ALLOWED.
235 *
236 * @tparam Task_err_sz
237 * See above.
238 * @param target_blob
239 * See above. Reminder: The memory area described by this arg must be valid up to
240 * completion handler entry.
241 * @param on_done_func
242 * See above. Reminder: any moved/copied version of this callback's associated captured state will
243 * be freed soon after it returns.
244 * @return See above.
245 *
246 * @see Blob_receiver::async_receive_blob(): implemented concept.
247 */
248 template<typename Task_err_sz>
249 bool async_receive_blob(const util::Blob_mutable& target_blob,
250 Task_err_sz&& on_done_func);
251
252 /**
253 * Implements Blob_receiver API per contract. Reminder: You may call this directly from within a
254 * completion handler you supplied to an earlier async_receive_blob().
255 *
256 * @param timeout
257 * See above.
258 * @return See above.
259 *
260 * @see Blob_receiver::idle_timer_run(): implemented concept.
261 */
262 bool idle_timer_run(util::Fine_duration timeout = boost::chrono::seconds(5));
263
264 /**
265 * Returns nickname, a brief string suitable for logging. This is included in the output by the `ostream<<`
266 * operator as well. This method is thread-safe in that it always returns the same value.
267 *
268 * If this object is default-cted (or moved-from), this will return a value equal to "".
269 *
270 * @return See above.
271 */
272 const std::string& nickname() const;
273
274 /**
275 * Returns name equal to `mq.absolute_name()`, where `mq` was passed to ctor, at the time it was passed to ctor.
276 *
277 * If this object is default-cted (or moved-from), this will return Shared_name::S_EMPTY.
278 *
279 * @return See above. Always the same value.
280 */
281 const Shared_name& absolute_name() const;
282
283private:
284 // Types.
285
286 /// Short-hand for `const`-respecting wrapper around Blob_stream_mq_sender_impl for the pImpl idiom.
287 using Impl_ptr = std::experimental::propagate_const<boost::movelib::unique_ptr<Blob_stream_mq_receiver_impl<Mq>>>;
288
289 // Friends.
290
291 /// Friend of Blob_stream_mq_sender.
292 template<typename Persistent_mq_handle2>
293 friend std::ostream& operator<<(std::ostream& os, const Blob_stream_mq_receiver<Persistent_mq_handle2>& val);
294
295 // Methods.
296
297 // Data.
298
299 /// The true implementation of this class. See also our class doc header.
301}; // class Blob_stream_mq_receiver
302
303// Free functions: in *_fwd.hpp.
304
305// Template initializers.
306
307template<typename Persistent_mq_handle>
309
310// Template implementations (strict pImpl-idiom style (albeit pImpl-lite due to template-ness)).
311
312// The performant move semantics we get delightfully free with pImpl; they'll just move-to/from the unique_ptr m_impl.
313
314template<typename Persistent_mq_handle>
316template<typename Persistent_mq_handle>
318 (Blob_stream_mq_receiver&&) = default;
319
320// The NULL state ctor comports with how null m_impl is treated all over below.
321template<typename Persistent_mq_handle>
323
324// The rest is strict forwarding to m_impl, once PEER state is established (non-null m_impl).
325
326template<typename Persistent_mq_handle>
328 (flow::log::Logger* logger_ptr, util::String_view nickname_str, Mq&& mq, Error_code* err_code) :
329 m_impl(boost::movelib::make_unique<Blob_stream_mq_receiver_impl<Mq>>
330 (logger_ptr, nickname_str, std::move(mq), err_code))
331{
332 // Yay.
333}
334
335template<typename Persistent_mq_handle>
337
338 m_impl(boost::movelib::make_unique<Blob_stream_mq_receiver_impl<Mq>>
339 (std::move(sync_io_core_in_peer_state_moved)))
340{
341 // Yay.
342}
343
344// It's only explicitly defined to formally document it.
345template<typename Persistent_mq_handle>
347
348template<typename Persistent_mq_handle>
350{
351 return m_impl ? m_impl->receive_blob_max_size() : 0;
352}
353
354template<typename Persistent_mq_handle>
355template<typename Task_err_sz>
357 (const util::Blob_mutable& target_blob, Task_err_sz&& on_done_func)
358{
359 return m_impl ? (m_impl->async_receive_blob(target_blob, std::move(on_done_func)), true)
360 : false;
361}
362
363template<typename Persistent_mq_handle>
365{
366 return m_impl ? m_impl->idle_timer_run(timeout)
367 : false;
368}
369
370template<typename Persistent_mq_handle>
372{
373 return m_impl ? m_impl->absolute_name() : Shared_name::S_EMPTY;
374}
375
376template<typename Persistent_mq_handle>
378{
379 return m_impl ? m_impl->nickname() : util::EMPTY_STRING;
380}
381
382// `friend`ship needed for this "non-method method":
383
384template<typename Persistent_mq_handle>
385std::ostream& operator<<(std::ostream& os, const Blob_stream_mq_receiver<Persistent_mq_handle>& val)
386{
387 if (val.m_impl)
388 {
389 return os << *val.m_impl;
390 }
391 // else
392 return os << "null";
393}
394
395} // namespace ipc::transport
Base of Blob_stream_mq_sender and Blob_stream_mq_receiver containing certain static facilities,...
Internal, non-movable pImpl-lite implementation of Blob_stream_mq_receiver class template.
typename Base::Mq Mq
Short-hand for template arg for underlying MQ handle type.
Implements Blob_receiver concept by using an adopted Persistent_mq_handle MQ handle to an MQ (message...
Impl_ptr m_impl
The true implementation of this class. See also our class doc header.
std::experimental::propagate_const< boost::movelib::unique_ptr< Blob_stream_mq_receiver_impl< Mq > > > Impl_ptr
Short-hand for const-respecting wrapper around Blob_stream_mq_sender_impl for the pImpl idiom.
const std::string & nickname() const
Returns nickname, a brief string suitable for logging.
static constexpr bool S_BLOB_UNDERFLOW_ALLOWED
Implements concept API; namely it is true.
const Shared_name & absolute_name() const
Returns name equal to mq.absolute_name(), where mq was passed to ctor, at the time it was passed to c...
Blob_stream_mq_receiver & operator=(Blob_stream_mq_receiver &&src)
Move-assigns from src; *this acts as if destructed; src becomes as-if default-cted (therefore in NULL...
Blob_stream_mq_receiver()
Implements Blob_receiver API, per its concept contract.
~Blob_stream_mq_receiver()
Implements Blob_receiver API.
bool idle_timer_run(util::Fine_duration timeout=boost::chrono::seconds(5))
Implements Blob_receiver API per contract.
size_t receive_blob_max_size() const
Implements Blob_receiver API per contract.
Blob_stream_mq_receiver(Blob_stream_mq_receiver &&src)
Move-constructs from src; src becomes as-if default-cted (therefore in NULL state).
Blob_stream_mq_receiver & operator=(const Blob_stream_mq_receiver &)=delete
Copy assignment is disallowed.
bool async_receive_blob(const util::Blob_mutable &target_blob, Task_err_sz &&on_done_func)
Implements Blob_receiver API per contract.
typename Blob_stream_mq_receiver_impl< Persistent_mq_handle >::Mq Mq
Short-hand for template arg for underlying MQ handle type.
static const Shared_name S_RESOURCE_TYPE_ID
Implements concept API. Equals Mq::S_RESOURCE_TYPE_ID.
friend std::ostream & operator<<(std::ostream &os, const Blob_stream_mq_receiver< Persistent_mq_handle2 > &val)
Friend of Blob_stream_mq_sender.
Dummy type for use as a template param to Channel when either the blobs pipe or handles pipe is disab...
Definition: channel.hpp:1000
String-wrapping abstraction representing a name uniquely distinguishing a kernel-persistent entity fr...
static const Shared_name S_EMPTY
A (default-cted) Shared_name. May be useful for functions returning const Shared_name&.
Flow-IPC module providing transmission of structured messages and/or low-level blobs (and more) betwe...
std::ostream & operator<<(std::ostream &os, const Bipc_mq_handle &val)
Prints string representation of the given Bipc_mq_handle to the given ostream.
boost::asio::mutable_buffer Blob_mutable
Short-hand for an mutable blob somewhere in memory, stored as exactly a void* and a size_t.
Definition: util_fwd.hpp:134
flow::Fine_duration Fine_duration
Short-hand for Flow's Fine_duration.
Definition: util_fwd.hpp:111
const std::string EMPTY_STRING
A (default-cted) string. May be useful for functions returning const std::string&.
Definition: util.cpp:33
flow::util::String_view String_view
Short-hand for Flow's String_view.
Definition: util_fwd.hpp:109
flow::Error_code Error_code
Short-hand for flow::Error_code which is very common.
Definition: common.hpp:297