Flow-IPC 2.0.0
Flow-IPC project: Full implementation reference.
async_adapter_rcv.hpp
Go to the documentation of this file.
1/* Flow-IPC: Core
2 * Copyright 2023 Akamai Technologies, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the
5 * "License"); you may not use this file except in
6 * compliance with the License. You may obtain a copy
7 * of the License at
8 *
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in
12 * writing, software distributed under the License is
13 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
14 * CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing
16 * permissions and limitations under the License. */
17
18/// @file
19#pragma once
20
23#include <flow/log/log.hpp>
24#include <flow/async/single_thread_task_loop.hpp>
25#include <boost/move/make_unique.hpp>
26#include <queue>
27
29{
30
31// Types.
32
33/**
34 * Internal-use type that adapts a given PEER-state sync_io::Native_handle_receiver or sync_io::Blob_receiver *core*
35 * into the async-I/O-pattern Native_handle_receiver or Blob_receiver. State-mutating logic of the latter is forwarded
36 * to a `*this`; while trivial `const` (in PEER state) things like `.receive_blob_max_size()` are forwarded directly to
37 * the core `sync_io::X`.
38 *
39 * @see transport::Native_socket_stream::Impl uses this for 99% of its incoming-direction
40 * (PEER-state by definition) logic.
41 * @see transport::Blob_stream_mq_receiver_impl uses this for 99% of its logic.
42 *
43 * @see Async_adapter_sender for the opposite-direction thing. E.g., transport::Native_socket_stream::Impl
44 * uses that for 99% of its outgoing-direction logic.
45 *
46 * ### Threads and thread nomenclature; locking ###
47 * Thread U, thread W... locking... just see those sections in transport::Native_socket_stream::Impl class doc header.
48 * We adopt that nomenclature and logic. However, as we are concerned with only one direction (op-type),
49 * we only deal with code in either thread U or W concerned with that. The other-direction code -- if applicable
50 * (e.g., applicable for `Native_socket_stream` which deals with both over 1 socket connection; N/A
51 * for `Blob_stream_mq_*` which uses separate objects entirely) -- simply co-exists in the same thread W and "thread"
52 * U. (If `Native_socket_stream` wanted to, it could even parallelize stuff in thread W by using separate worker
53 * threads Ws and Wr. As of this writing it does not, but it could -- nothing in `*this` would change.)
54 *
55 * Note, again, that we have our own `m_mutex`. If there is an opposing-direction counterpart Async_adapter_sender,
56 * then it has its own `m_mutex`; hence things can proceed concurrently.
57 *
58 * ### Impl design ###
59 * This is almost entirely subsumed by our `sync_io` core, Async_adapter_receiver::Core, an instance of
60 * sync_io::Native_handle_receiver or sync_io::Blob_receiver. It has a receive op-type (possibly among others), so we
61 * invoke its `"sync_io::*_sender:start_receive_blob_ops()"` during our initialization. After that:
62 *
63 * For idle_timer_run(), we can again just forward it to `m_sync_io`. There's no completion
64 * handler either, unlike with `.async_end_sending()`, so it's even simpler -- just straight forwarding to
65 * the `sync_io` core.
66 *
67 * For async_receive_native_handle() (and its degenerate version async_receive_blob()) the `*_receiver` concept
68 * we implement has an extra feature on top of sync_io::Native_handle_receiver (and degenerate version,
69 * sync_io::Blob_receiver): when 1 async_receive_native_handle() is in progress asynchronously, it is allowed
70 * to invoke it again an arbitrary number of times, and they are to be served in the order they were called.
71 * Therefore:
72 * - We maintain a "deficit" queue of these requests. The currently-served request is stored in
73 * `User_request m_user_request`. The queued-up subsequent ones are stored in queue with that
74 * same element type, `m_pending_user_requests_q`.
75 * - When the ongoing (single) `m_sync_io.async_receive_*()` does complete -- which occurs in thread W --
76 * we emit the result (`Error_code`, `sz`) to the `"User_request::m_on_done_func"` (the completion
77 * handler from the user). Then we pop `m_pending_user_requests_q` (unless empty -- no further "deficit")
78 * into `m_user_request` and service *that* one via `m_sync_io.async_receive_*()`. Rinse/repeat.
79 *
80 * If the destructor is invoked before `m_user_request` can get serviced, then in the dtor
81 * we execute `on_done_func(E)`, where E is operation-aborted. Once that's done the dtor can finish.
82 */
83template<typename Core_t>
85 public flow::log::Log_context,
86 private boost::noncopyable
87{
88public:
89 // Types.
90
91 /// The `sync_io::X` type being adapted into async-I/O-pattern `X`.
92 using Core = Core_t;
93
94 // Constructors/destructor.
95
96 /**
97 * Constructs the adapter around `sync_io::X` object `*sync_io`.
98 *
99 * @param logger_ptr
100 * Logger to use for logging subsequently.
101 * @param log_pfx
102 * String that shall precede ~all logged messages (e.g., `lexical_cast<string>(x)`, where `x` is an `X`.)
103 * @param worker
104 * The worker thread loop of `X`. Background work, as needed, will be posted onto this
105 * "thread W." Note that `X` may (or may not) share this thread with unrelated tasks;
106 * for example `Native_socket_stream` uses it for both a `*this` (outgoing-direction)
107 * and an Async_adapter_receiver (incoming-direction). `*worker* must already be `->start()`ed.
108 * @param sync_io
109 * The core object of `X`. It should have just (irreversibly) entered state PEER.
110 */
111 Async_adapter_receiver(flow::log::Logger* logger_ptr, util::String_view log_pfx,
112 flow::async::Single_thread_task_loop* worker, Core* sync_io);
113
114 /**
115 * To be invoked after `->stop()`ping `*worker` (from ctor), as well as flushing any still-queued
116 * tasks in its `Task_engine` (via `.restart()` and `.poll()`), this satisfies the customer adapter
117 * dtor's contract which is to invoke any not-yet-fired completion handlers with special
118 * operation-aborted error code. In our case that is either nothing or 1 `async_end_sending()` completion
119 * handler. If applicable the dtor returns once that handler has completed in an unspecified thread
120 * that is not the calling thread.
121 */
123
124 // Methods.
125
126 /**
127 * See Native_handle_sender counterpart. However, this one is `void`, as there is no way `*this` is not in PEER state
128 * (by definition).
129 *
130 * @param target_hndl
131 * See Native_handle_sender counterpart.
132 * @param target_meta_blob
133 * See Native_handle_sender counterpart.
134 * @param on_done_func
135 * See Native_handle_sender counterpart.
136 */
138 const util::Blob_mutable& target_meta_blob,
139 flow::async::Task_asio_err_sz&& on_done_func);
140
141 /**
142 * See Blob_receiver counterpart. However, this one is `void`, as there is no way `*this` is not in PEER state
143 * (by definition).
144 *
145 * @param target_blob
146 * See Blob_receiver counterpart.
147 * @param on_done_func
148 * See Blob_receiver counterpart.
149 */
150 void async_receive_blob(const util::Blob_mutable& target_blob,
151 flow::async::Task_asio_err_sz&& on_done_func);
152
153 /**
154 * See Native_handle_receiver counterpart.
155 *
156 * @param timeout
157 * See Native_handle_receiver counterpart.
158 * @return See Native_handle_receiver counterpart.
159 */
161
162private:
163 // Types.
164
165 /**
166 * Data store representing a deficit user async-receive request: either one being currently handled
167 * by `m_sync_io` -- which can handle one `m_sync_io.async_receive_*()` at a time, no more -- or
168 * one queued up behind it, if `async_receive_*()` was called before the current one could complete.
169 *
170 * Essentially this stores args to async_receive_native_handle() (or degenerate version, async_receive_blob())
171 * which is an async-operating method. We store them in queue via #Ptr.
172 */
174 {
175 // Types.
176
177 /// Short-hand for `unique_ptr` to this.
178 using Ptr = boost::movelib::unique_ptr<User_request>;
179
180 // Data.
181
182 /// See async_receive_native_handle() `target_hndl`. Null for `async_receive_blob()`.
184
185 /// See async_receive_native_handle() `target_meta_blob`. Or see async_receive_blob() `target_blob`.
187
188 /// See async_receive_native_handle() or async_receive_blob() `on_done_func`.
189 flow::async::Task_asio_err_sz m_on_done_func;
190 }; // struct User_request
191
192 // Methods.
193
194 /**
195 * Body of async_receive_native_handle() and async_receive_blob(); with `target_hndl` null if and only if
196 * it's the latter as opposed to the former.
197 *
198 * @param target_hndl_or_null
199 * See async_receive_native_handle(); or null if it's the other API.
200 * If `!(Core::S_TRANSMIT_NATIVE_HANDLES)` this must be null.
201 * @param target_meta_blob
202 * See async_receive_native_handle().
203 * @param on_done_func
204 * See async_receive_native_handle().
205 */
206 void async_receive_native_handle_impl(Native_handle* target_hndl_or_null,
207 const util::Blob_mutable& target_meta_blob,
208 flow::async::Task_asio_err_sz&& on_done_func);
209
210 /**
211 * Invoked via active-event API, handles the async completion
212 * of `m_sync_io.async_receive_*()` operation. Can be invoked from thread W only, and #m_mutex must be
213 * locked. #m_user_request must not be null.
214 *
215 * This method iteratively, synchronously leverages #m_sync_io to read as many in-messages available
216 * in the transport as possible, until: the request deficit is met (either by reading enough messages
217 * to satisfy #m_user_request and #m_pending_user_requests_q; or by encountering pipe-hosing error)
218 * or would-block. In the latter case another async-wait is initiated by this method synchronously.
219 *
220 * The first action, before those potential further reads, is to process_msg_or_error() the just-received
221 * (or pipe-hosing would-be) message. Then for each further in-message process_msg_or_error() is again
222 * invoked.
223 *
224 * For each request satisfied, a separate user handler is posted onto thread W to execute in order.
225 *
226 * @param err_code
227 * Result to pass to user (if truthy, all pending requests; else to #m_user_request only).
228 * @param sz
229 * Result to pass to user (ditto).
230 */
231 void on_sync_io_rcv_done(const Error_code& err_code, size_t sz);
232
233 /**
234 * Invoked from thread U/W (async_receive_native_handle_impl()) or thread W (active-event API), handles
235 * a completed `m_sync_io.async_receive_*()` -- whose results are to be given as args -- by (1) updating
236 * #m_user_request and #m_pending_user_requests_q and (2) posting any appropriate handlers onto thread W.
237 *
238 * See notes for on_sync_io_rcv_done().
239 *
240 * @param err_code
241 * See on_sync_io_rcv_done().
242 * @param sz
243 * See on_sync_io_rcv_done().
244 */
245 void process_msg_or_error(const Error_code& err_code, size_t sz);
246
247 // Data.
248
249 /// See `log_pfx` arg of ctor.
250 const std::string m_log_pfx;
251
252 /**
253 * The *head slot* containing the currently-being-serviced "deficit" async-receive request, with a meta-blob
254 * *potentially* being async-written to; null if there is no pending async_receive_native_handle().
255 * It is the "fulcrum" of the consumer-producer state machine described in doc header impl section's design
256 * discussion: If null there is no deficit; if not null there is an *overall deficit*. In the former case,
257 * at steady state, `m_pending_user_requests_q.empty() == true`.
258 *
259 * Protected by #m_mutex.
260 *
261 * @see #m_pending_user_requests_q
262 */
264
265 /**
266 * Queue storing deficit async-receive requests queued up due to #m_user_request being not null while
267 * more `async_receive_*()` invocations being made by user. One can think of the "overall" queue as being
268 * #m_user_request followed by the elements in this #m_pending_user_requests_q.
269 * See class doc header for design discussion.
270 *
271 * Protected by #m_mutex.
272 *
273 * ### Rationale for not subsuming #m_user_request directly into this queue ###
274 * It's the same amount of stuff; so the reason is stylistic in a subjective way. Basically a low-level
275 * async read-op will target the meta-blob *directly* inside the head User_request in the "overall" queue but
276 * never any of the subsequently queued requests; in my (ygoldfel) view it is clearer to express it as always
277 * targeting #m_user_request rather than `*(m_pending_user_requests_q.front())`.
278 */
279 std::queue<typename User_request::Ptr> m_pending_user_requests_q;
280
281 /// Protects #m_user_request, #m_pending_user_requests_q, and receive-ops data of #m_sync_io.
282 mutable flow::util::Mutex_non_recursive m_mutex;
283
284 /// Single-thread worker pool for all internal async work. Referred to as thread W in comments.
285 flow::async::Single_thread_task_loop& m_worker;
286
287 /**
288 * The core #Core engine, implementing the `sync_io` pattern (see util::sync_io doc header).
289 * See our class doc header for overview of how we use it (the aforementioned `sync_io` doc header talks about
290 * the `sync_io` pattern generally).
291 *
292 * Thus, #m_sync_io is the synchronous engine that we use to perform our work in our asynchronous boost.asio
293 * loop running in thread W (#m_worker) while collaborating with user thread(s) a/k/a thread U.
294 * (Recall that the user may choose to set up their own event loop/thread(s) --
295 * boost.asio-based or otherwise -- and use their own equivalent of an #m_sync_io instead.)
296 */
298}; // class Async_adapter_receiver
299
300// Template implementations.
301
302template<typename Core_t>
304 util::String_view log_pfx,
305 flow::async::Single_thread_task_loop* worker,
306 Core* sync_io) :
307 flow::log::Log_context(logger_ptr, Log_component::S_TRANSPORT),
308 m_log_pfx(log_pfx),
309 m_worker(*worker),
310 m_sync_io(*sync_io)
311{
314 using flow::util::Lock_guard;
315
316 // We've just entered PEER state, so set up the receive-ops.
317
318 /* Hook up the m_sync_io=>*this interaction. m_sync_io will use these callbacks to ask
319 * us to ->async_wait() on `Asio_waitable_native_handle`s for it.
320 *
321 * The *this=>m_sync_io interaction shall be our APIs, like async_receive_native_handle(),
322 * simply invoking the same API in m_sync_io (m_sync_io.async_receive_native_handle() for that example). */
323
324 /* (.start_receive_native_handler_ops() would do the same thing, if it exists. If it exists, that's because it has
325 * both to satisfy two concepts -- for when the user uses the sync_io::X directly -- but we don't care about that;
326 * we know they are the same in this case; so just use the one we know exists for any X.) */
327#ifndef NDEBUG
328 const bool ok =
329#endif
330 m_sync_io.start_receive_blob_ops([this](Asio_waitable_native_handle* hndl_of_interest,
331 bool ev_of_interest_snd_else_rcv,
332 Task_ptr&& on_active_ev_func)
333 {
334 /* We are in thread U or thread W; m_sync_io.<?>() has called its m_..._ev_wait_func();
335 * it has protected access to *hndl_of_interest as needed. */
336
337 FLOW_LOG_TRACE(m_log_pfx << ": Sync-IO receive-ops event-wait request: "
338 "descriptor [" << hndl_of_interest->native_handle() << "], "
339 "writable-else-readable [" << ev_of_interest_snd_else_rcv << "].");
340
341 // They want this async_wait(). Oblige.
342 assert(hndl_of_interest);
343 hndl_of_interest->async_wait(ev_of_interest_snd_else_rcv
344 ? Asio_waitable_native_handle::Base::wait_write
345 : Asio_waitable_native_handle::Base::wait_read,
346 [this, on_active_ev_func = std::move(on_active_ev_func)]
347 (const Error_code& err_code)
348 {
349 // We are in thread W. Nothing is locked.
350
351 if (err_code == boost::asio::error::operation_aborted)
352 {
353 return; // Stuff is shutting down. GTFO.
354 }
355 // else
356
357 // They want to know about completed async_wait(). Oblige.
358
359 // Protect m_sync_io and non-const non-ref m_* against receive-ops (async_receive_*(), ...).
360 Lock_guard<decltype(m_mutex)> lock(m_mutex);
361
362 /* Inform m_sync_io of the event. This can synchronously invoke handler we have registered via m_sync_io
363 * API (e.g., `send_*()`, auto_ping()). In our case -- if indeed it triggers a handler -- it will
364 * have to do with async_end_sending() completion. */
365
366 (*on_active_ev_func)();
367 // (That would have logged sufficiently inside m_sync_io; let's not spam further.)
368
369 // Lock_guard<decltype(m_mutex)> lock(m_mutex): unlocks here.
370 }); // hndl_of_interest->async_wait()
371 }); // m_sync_io.start_receive_blob_ops()
372 assert(ok);
373} // Async_adapter_receiver::Async_adapter_receiver()
374
375template<typename Core_t>
377{
378 using flow::async::Single_thread_task_loop;
379 using flow::async::reset_thread_pinning;
380 using flow::util::ostream_op_string;
381
382 /* Pre-condition: m_worker is stop()ed, and any pending tasks on it have been executed.
383 * Our promised job is to invoke any pending handlers with operation-aborted.
384 * The decision to do it from a one-off thread is explained in transport::Native_socket_stream::Impl::~Impl()
385 * and used in a few places; so see that. Let's just do it.
386 * @todo It would be cool, I guess, to do it all in one one-off thread instead of potentially starting, like,
387 * 3 for some of our customers. Well, whatever. */
388
389 if (m_user_request)
390 {
391 Single_thread_task_loop one_thread(get_logger(),
392 ostream_op_string("ARcDeinit-", m_log_pfx));
393 one_thread.start([&]()
394 {
395 reset_thread_pinning(get_logger()); // Don't inherit any strange core-affinity. Float free.
396
397 FLOW_LOG_TRACE("Running head slot async-receive completion handler.");
398 m_user_request->m_on_done_func(error::Code::S_OBJECT_SHUTDOWN_ABORTED_COMPLETION_HANDLER, 0);
399 FLOW_LOG_TRACE("User receive handler finished.");
400
401 while (!m_pending_user_requests_q.empty())
402 {
403 FLOW_LOG_TRACE("Running a queued async-receive completion handler.");
404 m_pending_user_requests_q.front()
406 m_pending_user_requests_q.pop();
407 FLOW_LOG_TRACE("User receive handler finished. Popped from user request deficit queue.");
408 } // while (!m_pending_user_requests_q.empty())
409 });
410 } // if (m_user_request)
411 else
412 {
413 assert(m_pending_user_requests_q.empty()); // Sanity check.
414 }
415} // Async_adapter_receiver::~Async_adapter_receiver()
416
417template<typename Core_t>
419 const util::Blob_mutable& target_meta_blob,
420 flow::async::Task_asio_err_sz&& on_done_func)
421{
422 assert(target_hndl && "Native_socket_stream::async_receive_native_handle() must take non-null Native_handle ptr.");
423 async_receive_native_handle_impl(target_hndl, target_meta_blob, std::move(on_done_func));
424
425 /* Note, if our customer lacks async_receive_native_handle(), then they'll forward to (call) this method;
426 * therefore it (*this being a template instance) will never be compiled; therefore target_hndl_or_null will never
427 * be non-null; which is why that assert(false) inside `if constexpr(!S_TRANSMIT_NATIVE_HANDLES)` compile-time clause
428 * shall never be reached. */
429}
430
431template<typename Core_t>
433 flow::async::Task_asio_err_sz&& on_done_func)
434{
435 async_receive_native_handle_impl(nullptr, target_blob, std::move(on_done_func));
436}
437
438template<typename Core_t>
440 const util::Blob_mutable& target_meta_blob,
441 flow::async::Task_asio_err_sz&& on_done_func)
442{
443 using flow::util::Lock_guard;
444 using boost::movelib::make_unique;
445
446 // We are in thread U (or thread W in a completion handler, but not concurrently).
447
448 /* We will be accessing m_user_request, m_pending_user_requests_q, and possibly m_sync_io receive-ops
449 * sub-API -- while ctor's ev-wait function's async_wait() handler will be accessing them too from thread W -- so: */
450
451 Lock_guard<decltype(m_mutex)> lock(m_mutex);
452
453 /* This is essentially the only place in *this where we're more than a mere forwarder of the core m_sync_io
454 * sync_io::*_receiver (which is synchronous) into an async *_receiver. I.e., in this
455 * case we add a feature on top; namely more than 1 async_receive_*() can be pending for us, whereas only
456 * 1 async_receive_*() is allowed in the sync_io one. We call this is a *deficit*, meaning there are more
457 * user requests than available messages (there's only ever 1 message "available" -- once the
458 * m_sync_io.async_receive_*() succeeds -- and only momentarily, as it's immediately fed to the
459 * completion handler of our async_receive_*() that precipitated it.
460 *
461 * Thus, if there is no deficit, and async_receive_*() comes in, we trigger m_sync_io.async_receive_*()
462 * and save the associated request info (basically our 3 args above) into m_user_request.
463 * Once it succeeds we feed the result to the completion handler saved among those 3 args and nullify
464 * m_user_request. Meanwhile, if another one comes in while there's already a deficit (m_user_request
465 * is not null), we queue it in m_pending_user_requests_q. Lastly, having nullified
466 * m_user_request as noted a couple sentences ago, we pop the _q (if not empty) into m_user_request
467 * and trigger another m_sync_io.async_receive_*(), continuing the chain this way until _q is empty
468 * and m_user_request is null (at which point there's no deficit and not reason to
469 * m_sync_io.async_receive_*()).
470 *
471 * Oh, also, if an m_sync_io.async_receive_*() yields a socket-hosing error, then it is fed to the entire
472 * deficit queue's handlers; as in any case any subsequent attempted m_sync_io.async_receive_*() would fail.
473 *
474 * Note that there can be a deficit (pending user requests) but no surplus (pending messages): we don't
475 * make unnecessary m_sync_io.async_receive_*() calls and thus don't need to save surplus messages.
476 * If we did, we'd need to copy any such surplus message (into user buffer) once this->async_receive_*() does
477 * come in. This is discussed and rationalized elsewhere, but for convenience, recap: The reason is, basically,
478 * the send-side concept is obligated to internally copy-and-queue messages on encountering would-block.
479 * Since the system will thus not lose messages (if the receiver side is being slow in popping them from the pipe),
480 * the complexity of queuing stuff -- and the perf loss due to copying -- is kept to one side. */
481
482 FLOW_LOG_TRACE(m_log_pfx << ": Incoming user async-receive request for "
483 "possible native handle and meta-blob (located @ [" << target_meta_blob.data() << "] of "
484 "max size [" << target_meta_blob.size() << "]). In worker now? = [" << m_worker.in_thread() << "].");
485
486 auto new_user_request = make_unique<User_request>();
487 new_user_request->m_target_hndl_ptr = target_hndl_or_null;
488 new_user_request->m_target_meta_blob = target_meta_blob;
489 new_user_request->m_on_done_func = std::move(on_done_func);
490
491 if (m_user_request)
492 {
493 m_pending_user_requests_q.emplace(std::move(new_user_request));
494 FLOW_LOG_TRACE("At least 1 async-receive request is already in progress. "
495 "After registering the new async-receive request: Head slot is non-empty; and "
496 "subsequently-pending deficit queue has size [" << m_pending_user_requests_q.size() << "]. "
497 "Will sync-IO-receive to handle this request once it reaches the front of that queue.");
498 return;
499 }
500 // else if (!m_user_request):
501
502 FLOW_LOG_TRACE("No async-receive request is currently in progress. Starting sync-IO-receive chain to service the "
503 "new request and any further-queued requests that might appear in the meantime.");
504 m_user_request = std::move(new_user_request);
505 // new_user_request is now hosed.
506
507 /* If receive completes synchronously (there are data pending on the "wire"), these will reflect that.
508 * If not then sync_err_code will indicate would-block. */
509 Error_code sync_err_code;
510 size_t sync_sz;
511
512#ifndef NDEBUG
513 bool ok;
514#endif
515 if (m_user_request->m_target_hndl_ptr)
516 {
517 if constexpr(Core::S_TRANSMIT_NATIVE_HANDLES) // Prevent compile error from code that could never be reached.
518 {
519#ifndef NDEBUG
520 ok =
521#endif
522 m_sync_io.async_receive_native_handle(m_user_request->m_target_hndl_ptr, m_user_request->m_target_meta_blob,
523 &sync_err_code, &sync_sz,
524 [this](const Error_code& err_code, size_t sz)
525 {
526 // We are in thread W. m_mutex is locked.
527 on_sync_io_rcv_done(err_code, sz); // Emit to handler; possibly pop queue and continue chain.
528 });
529 }
530 else // if constexpr(!S_TRANSMIT_NATIVE_HANDLES)
531 {
532 assert(false && "This code should never be reached.");
533 }
534 } // if (m_user_request->m_target_hndl_ptr)
535 else // Same deal/keeping comments light.
536 {
537#ifndef NDEBUG
538 ok =
539#endif
540 m_sync_io.async_receive_blob(m_user_request->m_target_meta_blob, &sync_err_code, &sync_sz,
541 [this](const Error_code& err_code, size_t sz)
542 { on_sync_io_rcv_done(err_code, sz); });
543 } // else if (!m_user_request->m_target_hndl_ptr)
544
545 assert(ok && "We are by definition in PEER state, and ctor starts receive-ops, and we never start a "
546 "sync_io async-receive before ensuring previous one has executed; so that should never "
547 "return false. Bug somewhere?");
548
549 if (sync_err_code == error::Code::S_SYNC_IO_WOULD_BLOCK)
550 {
551 // Async-wait started by m_sync_io. It logged plenty. We live to fight another day.
552 return;
553 }
554 // else:
555
556 /* Process the message and nullify m_user_request. (It would also promote any m_pending_user_requests_q head
557 * to m_user_request; but in our case that is not possible. We are still in the user async-receive API!) */
558 process_msg_or_error(sync_err_code, sync_sz);
559
560 FLOW_LOG_TRACE("Message was immediately available; synchronously returned to user; handler posted onto "
561 "async worker thread. Done until next request.");
562} // Async_adapter_receiver::async_receive_native_handle_impl()
563
564template<typename Core_t>
566{
567 using flow::util::Lock_guard;
568 using std::queue;
569
570 // We are in thread U or W. m_mutex is locked.
571
572 assert(err_code != boost::asio::error::operation_aborted);
573 assert((err_code != error::Code::S_SYNC_IO_WOULD_BLOCK) && "By our contract.");
574
575 FLOW_LOG_TRACE(m_log_pfx << ": Earlier async-wait => event active => rcv-mutex lock => "
576 "on-active-event-func => sync_io module => on-rcv-done handler => here. "
577 "Or else: rcv-mutex lock => sync_io module => no async-wait needed => here.");
578
579 /* As noted in our doc header, we have roughly two items on the agenda.
580 *
581 * 1, we need to invoke m_user_request->m_on_done_func, passing it the results (which are our args).
582 *
583 * 2, we need to update our m_* structures, such as popping stuff off m_pending_user_requests_q
584 * and starting the next m_sync_io.async_receive_*() if any -- and so on.
585 *
586 * Oh and 2b: if err_code is truthy, we need to invoke all the pending handlers (if any) and nullify/empty
587 * all of that.
588 *
589 * The question is what thread to do that stuff in. There are a couple of approaches, at least, but the following
590 * seems the least latency-ridden:
591 * - For 1 and 2b, we must post() the handler(s) onto thread W.
592 * - If we are in thread U: It is plainly required, as we promised to invoke completion handlers
593 * from unspecified thread that isn't U.
594 * - If we are in thread W: To avoid recursive mayhem if they choose to call some other API
595 * inside an on_done_func(), we queue it onto thread W by itself. (But if it's 2b in addition to 1,
596 * then we can bundle them all together into one thread-W task; no need for the churn of post()ing
597 * each one individually.)
598 * - However for 2, there's no need to post() anything. The mutex is already locked, so even if we're in thread
599 * U (m_sync_io.async_receive_*() succeeded synchronously, and this->async_receive_native_handle_impl()
600 * therefore invoked us synchronously, from thread U) we are within
601 * our rights to just finish the job here synchronously as well. In fact, that's great! It means
602 * data were waiting to be read right away, and we did everything we could synchronously in the
603 * original async_receive_*() call, post()ing onto thread W only 1 above -- the completion handler invoking (which
604 * we must do by contract). Yay! By the way, if that's the case (we are in thread U), then 2b doesn't apply;
605 * only 1 does. (The situation where there are items in m_pending_user_requests_q means we wouldn't have
606 * started an actual m_sync_io.async_receive_*() in the first place, as one was already in progress. So
607 * it couldn't have magically completed synchronously -- not having begun and all.) */
608
609 // Okay, so first deal with m_*, here and now as discussed. In so doing prepare the stuff to call in thread W.
610
611 assert(m_user_request);
612 typename User_request::Ptr ex_user_request(std::move(m_user_request));
613 assert(!m_user_request);
614
615 queue<typename User_request::Ptr> ex_pending_user_requests_q_or_none;
616 if (err_code)
617 {
618 FLOW_LOG_TRACE("Error emitted by sync-IO => all [" << m_pending_user_requests_q.size() << "] pending "
619 "handlers (may well be none) will be invoked with error in addition to the head handler.");
620
621 ex_pending_user_requests_q_or_none = std::move(m_pending_user_requests_q);
622 assert(m_pending_user_requests_q.empty());
623 }
624 else if (!m_pending_user_requests_q.empty()) // && (!err_code)
625 {
626 FLOW_LOG_TRACE("Success emitted by sync-IO => lead handler will be invoked; next pending request will "
627 "be serviced by sync-IO; pending request count has become (after popping from queue into lead "
628 "slot): [" << (m_pending_user_requests_q.size() - 1) << "].\n");
629
630 m_user_request = std::move(m_pending_user_requests_q.front());
631 // m_pending_user_requests_q.front() is now null; we're gonna pop that null ptr presently.
632 m_pending_user_requests_q.pop();
633 }
634 else // if (_q.empty() && (!err_code))
635 {
636 FLOW_LOG_TRACE("Success emitted by sync-IO => lead handler will be invoked; no pending requests queued up.\n");
637 }
638
639 // Second: Post the completion handlers as discussed.
640 m_worker.post([this, err_code, sz,
641 /* Have to upgrade to shared_ptr<>s due to capturing requiring copyability (even though copying is not
642 * actually invoked by us). unique_ptr and queue<unique_ptr> = not copyable. */
643 ex_user_request = boost::shared_ptr<User_request>(std::move(ex_user_request)),
644 ex_pending_user_requests_q_or_none
645 = boost::make_shared<decltype(ex_pending_user_requests_q_or_none)>
646 (std::move(ex_pending_user_requests_q_or_none))]()
647 {
648 // We are in thread W. Nothing is locked.
649
650 assert(ex_user_request);
651 FLOW_LOG_TRACE(m_log_pfx << ": Invoking head handler.");
652 (ex_user_request->m_on_done_func)(err_code, sz);
653 FLOW_LOG_TRACE("Handler completed.");
654 if (!ex_pending_user_requests_q_or_none->empty())
655 {
656 assert(err_code);
657 assert(sz == 0);
658
659 FLOW_LOG_TRACE(m_log_pfx << ": Invoking [" << ex_pending_user_requests_q_or_none->size() << "] "
660 "pending handlers in one shot (due to error).");
661 while (!ex_pending_user_requests_q_or_none->empty())
662 {
663 ex_pending_user_requests_q_or_none->front()->m_on_done_func(err_code, 0);
664 ex_pending_user_requests_q_or_none->pop();
665 FLOW_LOG_TRACE("In-queue handler finished.");
666 }
667 // @todo For modest perf, iterate through it; then .clear(). Maybe use an underlying deque<> or list<> directly.
668 } // if (!ex_pending_user_requests_q_or_none->empty())
669 }); // m_worker.post()
670
671 assert(!ex_user_request); // Hosed by move().
672 assert(ex_pending_user_requests_q_or_none.empty());
673} // Async_adapter_receiver::process_msg_or_error()
674
675template<typename Core_t>
677{
678 using flow::util::Lock_guard;
679 using std::queue;
680
681 // We are in thread W. m_mutex is locked.
682
683 assert(err_code != boost::asio::error::operation_aborted);
684 assert(err_code != error::Code::S_SYNC_IO_WOULD_BLOCK);
685
686 FLOW_LOG_TRACE(m_log_pfx << ": Earlier async-wait => event active => rcv-mutex lock => "
687 "on-active-event-func => sync_io module => here (on-rcv-done handler).");
688
689 /* This is not *too* different from thread-U (or thread-W if invoked from our own handler)
690 * async_receive_native_handle_impl()... except that in our case more requests may have been queued (as summarized
691 * in top comment in that method) during our async-wait that just finished. And, of course, we need
692 * to process the ready message first-thing. But let's say that's taken care of. After that: we can't just
693 * stop; there may be queued request. We shall process them as synchronously as possible in a do-while()
694 * loop. That's the executive summary. */
695
696 /* So handle the message or error -- W-post any relevant handlers; update m_user_request and
697 * m_pending_user_requests_q. */
698 process_msg_or_error(err_code, sz);
699
700 Error_code sync_err_code;
701 auto& sync_sz = sz; // (Might as well reuse the arg.)
702
703 /* First iteration: sync_err_code is definitely not would-block; m_user_request may be null.
704 * Subsequent iterations: sync_err_code may be would-block. */
705 while (m_user_request && (sync_err_code != error::Code::S_SYNC_IO_WOULD_BLOCK))
706 {
707 // @todo Code reuse with async_receive_native_handle_impl()? For now keeping comments light.
708#ifndef NDEBUG
709 bool ok;
710#endif
711 if (m_user_request->m_target_hndl_ptr)
712 {
713 if constexpr(Core::S_TRANSMIT_NATIVE_HANDLES)
714 {
715#ifndef NDEBUG
716 ok =
717#endif
718 m_sync_io.async_receive_native_handle(m_user_request->m_target_hndl_ptr, m_user_request->m_target_meta_blob,
719 &sync_err_code, &sync_sz,
720 [this](const Error_code& err_code, size_t sz)
721 {
722 on_sync_io_rcv_done(err_code, sz); // Back to us (async path).
723 });
724 }
725 else // if constexpr(!S_TRANSMIT_NATIVE_HANDLES)
726 {
727 assert(false && "This code should never be reached.");
728 }
729 } // if (m_user_request->m_target_hndl_ptr)
730 else
731 {
732#ifndef NDEBUG
733 ok =
734#endif
735 m_sync_io.async_receive_blob(m_user_request->m_target_meta_blob, &sync_err_code, &sync_sz,
736 [this](const Error_code& err_code, size_t sz)
737 { on_sync_io_rcv_done(err_code, sz); });
738 } // else if (!m_user_request->m_target_hndl_ptr)
739 assert(ok && "We are by definition in PEER state, and ctor starts receive-ops, and we never start a "
740 "sync_io async-receive before ensuring previous one has executed; so that should never "
741 "return false. Bug somewhere?");
742
743 if (sync_err_code == error::Code::S_SYNC_IO_WOULD_BLOCK)
744 {
745 continue; // Loop will end.
746 }
747 // else
748
749 /* Remaining outcomes: sync_err_code truthy => process_msg_or_error() will do the right thing.
750 * sync_err_code is falsy => ditto. */
751
752 process_msg_or_error(sync_err_code, sync_sz);
753
754 /* Outcomes: Error => m_user_request is null, m_pending_user_requests_q is null. No req to service. Loop end.
755 * Success => m_user_request is null, m_pending_user_requests_q is null. No req to service. Loop end.
756 * Success => m_user_request is NOT null, m_pending_user_requests_q is ???. Req needs service.
757 * In no case is sync_err_code (at this point) would-block. Hence: time to check loop condition. */
758 } // while (m_user_request && (sync_err_code != error::Code::S_SYNC_IO_WOULD_BLOCK));
759} // Async_adapter_receiver::on_sync_io_rcv_done()
760
761template<typename Core_t>
763{
764 using flow::util::Lock_guard;
765
766 // Like Async_adapter_sender::send_native_handle() and others (keeping comments light).
767
768 Lock_guard<decltype(m_mutex)> lock(m_mutex);
769 return m_sync_io.idle_timer_run(timeout);
770}
771
772} // namespace ipc::transport::sync_io
Internal-use type that adapts a given PEER-state sync_io::Native_handle_receiver or sync_io::Blob_rec...
flow::async::Single_thread_task_loop & m_worker
Single-thread worker pool for all internal async work. Referred to as thread W in comments.
flow::util::Mutex_non_recursive m_mutex
Protects m_user_request, m_pending_user_requests_q, and receive-ops data of m_sync_io.
std::queue< typename User_request::Ptr > m_pending_user_requests_q
Queue storing deficit async-receive requests queued up due to m_user_request being not null while mor...
bool idle_timer_run(util::Fine_duration timeout)
See Native_handle_receiver counterpart.
Async_adapter_receiver(flow::log::Logger *logger_ptr, util::String_view log_pfx, flow::async::Single_thread_task_loop *worker, Core *sync_io)
Constructs the adapter around sync_io::X object *sync_io.
const std::string m_log_pfx
See log_pfx arg of ctor.
void async_receive_native_handle_impl(Native_handle *target_hndl_or_null, const util::Blob_mutable &target_meta_blob, flow::async::Task_asio_err_sz &&on_done_func)
Body of async_receive_native_handle() and async_receive_blob(); with target_hndl null if and only if ...
User_request::Ptr m_user_request
The head slot containing the currently-being-serviced "deficit" async-receive request,...
~Async_adapter_receiver()
To be invoked after ->stop()ping *worker (from ctor), as well as flushing any still-queued tasks in i...
void on_sync_io_rcv_done(const Error_code &err_code, size_t sz)
Invoked via active-event API, handles the async completion of m_sync_io.async_receive_*() operation.
void async_receive_native_handle(Native_handle *target_hndl, const util::Blob_mutable &target_meta_blob, flow::async::Task_asio_err_sz &&on_done_func)
See Native_handle_sender counterpart.
void async_receive_blob(const util::Blob_mutable &target_blob, flow::async::Task_asio_err_sz &&on_done_func)
See Blob_receiver counterpart.
Core_t Core
The sync_io::X type being adapted into async-I/O-pattern X.
Core & m_sync_io
The core Core engine, implementing the sync_io pattern (see util::sync_io doc header).
void process_msg_or_error(const Error_code &err_code, size_t sz)
Invoked from thread U/W (async_receive_native_handle_impl()) or thread W (active-event API),...
Useful if using the sync_io pattern within a user event loop built on boost.asio (optionally with flo...
@ S_SYNC_IO_WOULD_BLOCK
A sync_io operation could not immediately complete; it will complete contingent on active async-wait ...
@ S_OBJECT_SHUTDOWN_ABORTED_COMPLETION_HANDLER
Async completion handler is being called prematurely, because underlying object is shutting down,...
sync_io-pattern counterparts to async-I/O-pattern object types in parent namespace ipc::transport.
boost::shared_ptr< Task > Task_ptr
Short-hand for ref-counted pointer to a Function<> that takes no arguments and returns nothing; in pa...
boost::asio::mutable_buffer Blob_mutable
Short-hand for an mutable blob somewhere in memory, stored as exactly a void* and a size_t.
Definition: util_fwd.hpp:140
flow::Fine_duration Fine_duration
Short-hand for Flow's Fine_duration.
Definition: util_fwd.hpp:117
flow::util::String_view String_view
Short-hand for Flow's String_view.
Definition: util_fwd.hpp:115
Log_component
The flow::log::Component payload enumeration containing various log components used by Flow-IPC inter...
Definition: common.hpp:323
flow::Error_code Error_code
Short-hand for flow::Error_code which is very common.
Definition: common.hpp:298
Data store representing a deficit user async-receive request: either one being currently handled by m...
Native_handle * m_target_hndl_ptr
See async_receive_native_handle() target_hndl. Null for async_receive_blob().
util::Blob_mutable m_target_meta_blob
See async_receive_native_handle() target_meta_blob. Or see async_receive_blob() target_blob.
boost::movelib::unique_ptr< User_request > Ptr
Short-hand for unique_ptr to this.
flow::async::Task_asio_err_sz m_on_done_func
See async_receive_native_handle() or async_receive_blob() on_done_func.
A monolayer-thin wrapper around a native handle, a/k/a descriptor a/k/a FD.