Flow-IPC 1.0.0
Flow-IPC project: Full implementation reference.
async_adapter_rcv.hpp
Go to the documentation of this file.
1/* Flow-IPC: Core
2 * Copyright 2023 Akamai Technologies, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the
5 * "License"); you may not use this file except in
6 * compliance with the License. You may obtain a copy
7 * of the License at
8 *
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in
12 * writing, software distributed under the License is
13 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
14 * CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing
16 * permissions and limitations under the License. */
17
18/// @file
19#pragma once
20
23#include <flow/log/log.hpp>
24#include <flow/async/single_thread_task_loop.hpp>
25#include <boost/move/make_unique.hpp>
26#include <queue>
27
29{
30
31// Types.
32
33/**
34 * Internal-use type that adapts a given PEER-state sync_io::Native_handle_receiver or sync_io::Blob_receiver *core*
35 * into the async-I/O-pattern Native_handle_receiver or Blob_receiver. State-mutating logic of the latter is forwarded
36 * to a `*this`; while trivial `const` (in PEER state) things like `.receive_blob_max_size()` are forwarded directly to
37 * the core `sync_io::X`.
38 *
39 * @see transport::Native_socket_stream::Impl uses this for 99% of its incoming-direction
40 * (PEER-state by definition) logic.
41 * @see transport::Blob_stream_mq_receiver_impl uses this for 99% of its logic.
42 *
43 * @see Async_adapter_sender for the opposite-direction thing. E.g., transport::Native_socket_stream::Impl
44 * uses that for 99% of its outgoing-direction logic.
45 *
46 * ### Threads and thread nomenclature; locking ###
47 * Thread U, thread W... locking... just see those sections in transport::Native_socket_stream::Impl class doc header.
48 * We adopt that nomenclature and logic. However, as we are concerned with only one direction (op-type),
49 * we only deal with code in either thread U or W concerned with that. The other-direction code -- if applicable
50 * (e.g., applicable for `Native_socket_stream` which deals with both over 1 socket connection; N/A
51 * for `Blob_stream_mq_*` which uses separate objects entirely) -- simply co-exists in the same thread W and "thread"
52 * U. (If `Native_socket_stream` wanted to, it could even parallelize stuff in thread W by using separate worker
53 * threads Ws and Wr. As of this writing it does not, but it could -- nothing in `*this` would change.)
54 *
55 * Note, again, that we have our own `m_mutex`. If there is an opposing-direction counterpart Async_adapter_sender,
56 * then it has its own `m_mutex`; hence things can proceed concurrently.
57 *
58 * ### Impl design ###
59 * This is almost entirely subsumed by our `sync_io` core, Async_adapter_receiver::Core, an instance of
60 * sync_io::Native_handle_receiver or sync_io::Blob_receiver. It has a receive op-type (possibly among others), so we
61 * invoke its `"sync_io::*_sender:start_receive_blob_ops()"` during our initialization. After that:
62 *
63 * For idle_timer_run(), we can again just forward it to `m_sync_io`. There's no completion
64 * handler either, unlike with `.async_end_sending()`, so it's even simpler -- just straight forwarding to
65 * the `sync_io` core.
66 *
67 * For async_receive_native_handle() (and its degenerate version async_receive_blob()) the `*_receiver` concept
68 * we implement has an extra feature on top of sync_io::Native_handle_receiver (and degenerate version,
69 * sync_io::Blob_receiver): when 1 async_receive_native_handle() is in progress asynchronously, it is allowed
70 * to invoke it again an arbitrary number of times, and they are to be served in the order they were called.
71 * Therefore:
72 * - We maintain a "deficit" queue of these requests. The currently-served request is stored in
73 * `User_request m_user_request`. The queued-up subsequent ones are stored in queue with that
74 * same element type, `m_pending_user_requests_q`.
75 * - When the ongoing (single) `m_sync_io.async_receive_*()` does complete -- which occurs in thread W --
76 * we emit the result (`Error_code`, `sz`) to the `"User_request::m_on_done_func"` (the completion
77 * handler from the user). Then we pop `m_pending_user_requests_q` (unless empty -- no further "deficit")
78 * into `m_user_request` and service *that* one via `m_sync_io.async_receive_*()`. Rinse/repeat.
79 *
80 * If the destructor is invoked before `m_user_request` can get serviced, then in the dtor
81 * we execute `on_done_func(E)`, where E is operation-aborted. Once that's done the dtor can finish.
82 */
83template<typename Core_t>
85 public flow::log::Log_context,
86 private boost::noncopyable
87{
88public:
89 // Types.
90
91 /// The `sync_io::X` type being adapted into async-I/O-pattern `X`.
92 using Core = Core_t;
93
94 // Constructors/destructor.
95
96 /**
97 * Constructs the adapter around `sync_io::X` object `*sync_io`.
98 *
99 * @param logger_ptr
100 * Logger to use for logging subsequently.
101 * @param log_pfx
102 * String that shall precede ~all logged messages (e.g., `lexical_cast<string>(x)`, where `x` is an `X`.)
103 * @param worker
104 * The worker thread loop of `X`. Background work, as needed, will be posted onto this
105 * "thread W." Note that `X` may (or may not) share this thread with unrelated tasks;
106 * for example `Native_socket_stream` uses it for both a `*this` (outgoing-direction)
107 * and an Async_adapter_receiver (incoming-direction). `*worker* must already be `->start()`ed.
108 * @param sync_io
109 * The core object of `X`. It should have just (irreversibly) entered state PEER.
110 */
111 Async_adapter_receiver(flow::log::Logger* logger_ptr, util::String_view log_pfx,
112 flow::async::Single_thread_task_loop* worker, Core* sync_io);
113
114 /**
115 * To be invoked after `->stop()`ping `*worker` (from ctor), as well as flushing any still-queued
116 * tasks in its `Task_engine` (via `.restart()` and `.poll()`), this satisfies the customer adapter
117 * dtor's contract which is to invoke any not-yet-fired completion handlers with special
118 * operation-aborted error code. In our case that is either nothing or 1 `async_end_sending()` completion
119 * handler. If applicable the dtor returns once that handler has completed in an unspecified thread
120 * that is not the calling thread.
121 */
123
124 // Methods.
125
126 /**
127 * See Native_handle_sender counterpart. However, this one is `void`, as there is no way `*this` is not in PEER state
128 * (by definition).
129 *
130 * @param target_hndl
131 * See Native_handle_sender counterpart.
132 * @param target_meta_blob
133 * See Native_handle_sender counterpart.
134 * @param on_done_func
135 * See Native_handle_sender counterpart.
136 */
138 const util::Blob_mutable& target_meta_blob,
139 flow::async::Task_asio_err_sz&& on_done_func);
140
141 /**
142 * See Blob_receiver counterpart. However, this one is `void`, as there is no way `*this` is not in PEER state
143 * (by definition).
144 *
145 * @param target_blob
146 * See Blob_receiver counterpart.
147 * @param on_done_func
148 * See Blob_receiver counterpart.
149 */
150 void async_receive_blob(const util::Blob_mutable& target_blob,
151 flow::async::Task_asio_err_sz&& on_done_func);
152
153 /**
154 * See Native_handle_receiver counterpart.
155 *
156 * @param timeout
157 * See Native_handle_receiver counterpart.
158 * @return See Native_handle_receiver counterpart.
159 */
161
162private:
163 // Types.
164
165 /**
166 * Data store representing a deficit user async-receive request: either one being currently handled
167 * by `m_sync_io` -- which can handle one `m_sync_io.async_receive_*()` at a time, no more -- or
168 * one queued up behind it, if `async_receive_*()` was called before the current one could complete.
169 *
170 * Essentially this stores args to async_receive_native_handle() (or degenerate version, async_receive_blob())
171 * which is an async-operating method. We store them in queue via #Ptr.
172 */
174 {
175 // Types.
176
177 /// Short-hand for `unique_ptr` to this.
178 using Ptr = boost::movelib::unique_ptr<User_request>;
179
180 // Data.
181
182 /// See async_receive_native_handle() `target_hndl`. Null for `async_receive_blob()`.
184
185 /// See async_receive_native_handle() `target_meta_blob`. Or see async_receive_blob() `target_blob`.
187
188 /// See async_receive_native_handle() or async_receive_blob() `on_done_func`.
189 flow::async::Task_asio_err_sz m_on_done_func;
190 }; // struct User_request
191
192 // Methods.
193
194 /**
195 * Body of async_receive_native_handle() and async_receive_blob(); with `target_hndl` null if and only if
196 * it's the latter as opposed to the former.
197 *
198 * @param target_hndl_or_null
199 * See async_receive_native_handle(); or null if it's the other API.
200 * If `!(Core::S_TRANSMIT_NATIVE_HANDLES)` this must be null.
201 * @param target_meta_blob
202 * See async_receive_native_handle().
203 * @param on_done_func
204 * See async_receive_native_handle().
205 */
206 void async_receive_native_handle_impl(Native_handle* target_hndl_or_null,
207 const util::Blob_mutable& target_meta_blob,
208 flow::async::Task_asio_err_sz&& on_done_func);
209
210 /**
211 * Invoked via active-event API, handles the async completion
212 * of `m_sync_io.async_receive_*()` operation. Can be invoked from thread W only, and #m_mutex must be
213 * locked. #m_user_request must not be null.
214 *
215 * This method iteratively, synchronously leverages #m_sync_io to read as many in-messages available
216 * in the transport as possible, until: the request deficit is met (either by reading enough messages
217 * to satisfy #m_user_request and #m_pending_user_requests_q; or by encountering pipe-hosing error)
218 * or would-block. In the latter case another async-wait is initiated by this method synchronously.
219 *
220 * The first action, before those potential further reads, is to process_msg_or_error() the just-received
221 * (or pipe-hosing would-be) message. Then for each further in-message process_msg_or_error() is again
222 * invoked.
223 *
224 * For each request satisfied, a separate user handler is posted onto thread W to execute in order.
225 *
226 * @param err_code
227 * Result to pass to user (if truthy, all pending requests; else to #m_user_request only).
228 * @param sz
229 * Result to pass to user (ditto).
230 */
231 void on_sync_io_rcv_done(const Error_code& err_code, size_t sz);
232
233 /**
234 * Invoked from thread U/W (async_receive_native_handle_impl()) or thread W (active-event API), handles
235 * a completed `m_sync_io.async_receive_*()` -- whose results are to be given as args -- by (1) updating
236 * #m_user_request and #m_pending_user_requests_q and (2) posting any appropriate handlers onto thread W.
237 *
238 * See notes for on_sync_io_rcv_done().
239 *
240 * @param err_code
241 * See on_sync_io_rcv_done().
242 * @param sz
243 * See on_sync_io_rcv_done().
244 */
245 void process_msg_or_error(const Error_code& err_code, size_t sz);
246
247 // Data.
248
249 /// See `log_pfx` arg of ctor.
250 const std::string m_log_pfx;
251
252 /**
253 * The *head slot* containing the currently-being-serviced "deficit" async-receive request, with a meta-blob
254 * *potentially* being async-written to; null if there is no pending async_receive_native_handle().
255 * It is the "fulcrum" of the consumer-producer state machine described in doc header impl section's design
256 * discussion: If null there is no deficit; if not null there is an *overall deficit*. In the former case,
257 * at steady state, `m_pending_user_requests_q.empty() == true`.
258 *
259 * Protected by #m_mutex.
260 *
261 * @see #m_pending_user_requests_q
262 */
264
265 /**
266 * Queue storing deficit async-receive requests queued up due to #m_user_request being not null while
267 * more `async_receive_*()` invocations being made by user. One can think of the "overall" queue as being
268 * #m_user_request followed by the elements in this #m_pending_user_requests_q.
269 * See class doc header for design discussion.
270 *
271 * Protected by #m_mutex.
272 *
273 * ### Rationale for not subsuming #m_user_request directly into this queue ###
274 * It's the same amount of stuff; so the reason is stylistic in a subjective way. Basically a low-level
275 * async read-op will target the meta-blob *directly* inside the head User_request in the "overall" queue but
276 * never any of the subsequently queued requests; in my (ygoldfel) view it is clearer to express it as always
277 * targeting #m_user_request rather than `*(m_pending_user_requests_q.front())`.
278 */
279 std::queue<typename User_request::Ptr> m_pending_user_requests_q;
280
281 /// Protects #m_user_request, #m_pending_user_requests_q, and receive-ops data of #m_sync_io.
282 mutable flow::util::Mutex_non_recursive m_mutex;
283
284 /// Single-thread worker pool for all internal async work. Referred to as thread W in comments.
285 flow::async::Single_thread_task_loop& m_worker;
286
287 /**
288 * The core #Core engine, implementing the `sync_io` pattern (see util::sync_io doc header).
289 * See our class doc header for overview of how we use it (the aforementioned `sync_io` doc header talks about
290 * the `sync_io` pattern generally).
291 *
292 * Thus, #m_sync_io is the synchronous engine that we use to perform our work in our asynchronous boost.asio
293 * loop running in thread W (#m_worker) while collaborating with user thread(s) a/k/a thread U.
294 * (Recall that the user may choose to set up their own event loop/thread(s) --
295 * boost.asio-based or otherwise -- and use their own equivalent of an #m_sync_io instead.)
296 */
298}; // class Async_adapter_receiver
299
300// Template implementations.
301
302template<typename Core_t>
304 util::String_view log_pfx,
305 flow::async::Single_thread_task_loop* worker,
306 Core* sync_io) :
307 flow::log::Log_context(logger_ptr, Log_component::S_TRANSPORT),
308 m_log_pfx(log_pfx),
309 m_worker(*worker),
310 m_sync_io(*sync_io)
311{
314 using flow::util::Lock_guard;
315
316 // We've just entered PEER state, so set up the receive-ops.
317
318 /* Hook up the m_sync_io=>*this interaction. m_sync_io will use these callbacks to ask
319 * us to ->async_wait() on `Asio_waitable_native_handle`s for it.
320 *
321 * The *this=>m_sync_io interaction shall be our APIs, like async_receive_native_handle(),
322 * simply invoking the same API in m_sync_io (m_sync_io.async_receive_native_handle() for that example). */
323
324 /* (.start_receive_native_handler_ops() would do the same thing, if it exists. If it exists, that's because it has
325 * both to satisfy two concepts -- for when the user uses the sync_io::X directly -- but we don't care about that;
326 * we know they are the same in this case; so just use the one we know exists for any X.) */
327#ifndef NDEBUG
328 const bool ok =
329#endif
330 m_sync_io.start_receive_blob_ops([this](Asio_waitable_native_handle* hndl_of_interest,
331 bool ev_of_interest_snd_else_rcv,
332 Task_ptr&& on_active_ev_func)
333 {
334 /* We are in thread U or thread W; m_sync_io.<?>() has called its m_..._ev_wait_func();
335 * it has protected access to *hndl_of_interest as needed. */
336
337 FLOW_LOG_TRACE(m_log_pfx << ": Sync-IO receive-ops event-wait request: "
338 "descriptor [" << hndl_of_interest->native_handle() << "], "
339 "writable-else-readable [" << ev_of_interest_snd_else_rcv << "].");
340
341 // They want this async_wait(). Oblige.
342 assert(hndl_of_interest);
343 hndl_of_interest->async_wait(ev_of_interest_snd_else_rcv
344 ? Asio_waitable_native_handle::Base::wait_write
345 : Asio_waitable_native_handle::Base::wait_read,
346 [this, on_active_ev_func = std::move(on_active_ev_func)]
347 (const Error_code& err_code)
348 {
349 // We are in thread W. Nothing is locked.
350
351 if (err_code == boost::asio::error::operation_aborted)
352 {
353 return; // Stuff is shutting down. GTFO.
354 }
355 // else
356
357 // They want to know about completed async_wait(). Oblige.
358
359 // Protect m_sync_io and non-const non-ref m_* against receive-ops (async_receive_*(), ...).
360 Lock_guard<decltype(m_mutex)> lock(m_mutex);
361
362 /* Inform m_sync_io of the event. This can synchronously invoke handler we have registered via m_sync_io
363 * API (e.g., `send_*()`, auto_ping()). In our case -- if indeed it triggers a handler -- it will
364 * have to do with async_end_sending() completion. */
365
366 (*on_active_ev_func)();
367 // (That would have logged sufficiently inside m_sync_io; let's not spam further.)
368
369 // Lock_guard<decltype(m_mutex)> lock(m_mutex): unlocks here.
370 }); // hndl_of_interest->async_wait()
371 }); // m_sync_io.start_receive_blob_ops()
372 assert(ok);
373} // Async_adapter_receiver::Async_adapter_receiver()
374
375template<typename Core_t>
377{
378 using flow::async::Single_thread_task_loop;
379 using flow::util::ostream_op_string;
380
381 /* Pre-condition: m_worker is stop()ed, and any pending tasks on it have been executed.
382 * Our promised job is to invoke any pending handlers with operation-aborted.
383 * The decision to do it from a one-off thread is explained in transport::Native_socket_stream::Impl::~Impl()
384 * and used in a few places; so see that. Let's just do it.
385 * @todo It would be cool, I guess, to do it all in one one-off thread instead of potentially starting, like,
386 * 3 for some of our customers. Well, whatever. */
387
388 if (m_user_request)
389 {
390 Single_thread_task_loop one_thread(get_logger(), ostream_op_string(m_log_pfx, "-rcv-temp_deinit"));
391 one_thread.start([&]()
392 {
393 FLOW_LOG_TRACE("Running head slot async-receive completion handler.");
394 m_user_request->m_on_done_func(error::Code::S_OBJECT_SHUTDOWN_ABORTED_COMPLETION_HANDLER, 0);
395 FLOW_LOG_TRACE("User receive handler finished.");
396
397 while (!m_pending_user_requests_q.empty())
398 {
399 FLOW_LOG_TRACE("Running a queued async-receive completion handler.");
400 m_pending_user_requests_q.front()
402 m_pending_user_requests_q.pop();
403 FLOW_LOG_TRACE("User receive handler finished. Popped from user request deficit queue.");
404 } // while (!m_pending_user_requests_q.empty())
405 });
406 } // if (m_user_request)
407 else
408 {
409 assert(m_pending_user_requests_q.empty()); // Sanity check.
410 }
411} // Async_adapter_receiver::~Async_adapter_receiver()
412
413template<typename Core_t>
415 const util::Blob_mutable& target_meta_blob,
416 flow::async::Task_asio_err_sz&& on_done_func)
417{
418 assert(target_hndl && "Native_socket_stream::async_receive_native_handle() must take non-null Native_handle ptr.");
419 async_receive_native_handle_impl(target_hndl, target_meta_blob, std::move(on_done_func));
420
421 /* Note, if our customer lacks async_receive_native_handle(), then they'll forward to (call) this method;
422 * therefore it (*this being a template instance) will never be compiled; therefore target_hndl_or_null will never
423 * be non-null; which is why that assert(false) inside `if constexpr(!S_TRANSMIT_NATIVE_HANDLES)` compile-time clause
424 * shall never be reached. */
425}
426
427template<typename Core_t>
429 flow::async::Task_asio_err_sz&& on_done_func)
430{
431 async_receive_native_handle_impl(nullptr, target_blob, std::move(on_done_func));
432}
433
434template<typename Core_t>
436 const util::Blob_mutable& target_meta_blob,
437 flow::async::Task_asio_err_sz&& on_done_func)
438{
439 using flow::util::Lock_guard;
440 using boost::movelib::make_unique;
441
442 // We are in thread U (or thread W in a completion handler, but not concurrently).
443
444 /* We will be accessing m_user_request, m_pending_user_requests_q, and possibly m_sync_io receive-ops
445 * sub-API -- while ctor's ev-wait function's async_wait() handler will be accessing them too from thread W -- so: */
446
447 Lock_guard<decltype(m_mutex)> lock(m_mutex);
448
449 /* This is essentially the only place in *this where we're more than a mere forwarder of the core m_sync_io
450 * sync_io::*_receiver (which is synchronous) into an async *_receiver. I.e., in this
451 * case we add a feature on top; namely more than 1 async_receive_*() can be pending for us, whereas only
452 * 1 async_receive_*() is allowed in the sync_io one. We call this is a *deficit*, meaning there are more
453 * user requests than available messages (there's only ever 1 message "available" -- once the
454 * m_sync_io.async_receive_*() succeeds -- and only momentarily, as it's immediately fed to the
455 * completion handler of our async_receive_*() that precipitated it.
456 *
457 * Thus, if there is no deficit, and async_receive_*() comes in, we trigger m_sync_io.async_receive_*()
458 * and save the associated request info (basically our 3 args above) into m_user_request.
459 * Once it succeeds we feed the result to the completion handler saved among those 3 args and nullify
460 * m_user_request. Meanwhile, if another one comes in while there's already a deficit (m_user_request
461 * is not null), we queue it in m_pending_user_requests_q. Lastly, having nullified
462 * m_user_request as noted a couple sentences ago, we pop the _q (if not empty) into m_user_request
463 * and trigger another m_sync_io.async_receive_*(), continuing the chain this way until _q is empty
464 * and m_user_request is null (at which point there's no deficit and not reason to
465 * m_sync_io.async_receive_*()).
466 *
467 * Oh, also, if an m_sync_io.async_receive_*() yields a socket-hosing error, then it is fed to the entire
468 * deficit queue's handlers; as in any case any subsequent attempted m_sync_io.async_receive_*() would fail.
469 *
470 * Note that there can be a deficit (pending user requests) but no surplus (pending messages): we don't
471 * make unnecessary m_sync_io.async_receive_*() calls and thus don't need to save surplus messages.
472 * If we did, we'd need to copy any such surplus message (into user buffer) once this->async_receive_*() does
473 * come in. This is discussed and rationalized elsewhere, but for convenience, recap: The reason is, basically,
474 * the send-side concept is obligated to internally copy-and-queue messages on encountering would-block.
475 * Since the system will thus not lose messages (if the receiver side is being slow in popping them from the pipe),
476 * the complexity of queuing stuff -- and the perf loss due to copying -- is kept to one side. */
477
478 FLOW_LOG_TRACE(m_log_pfx << ": Incoming user async-receive request for "
479 "possible native handle and meta-blob (located @ [" << target_meta_blob.data() << "] of "
480 "max size [" << target_meta_blob.size() << "]). In worker now? = [" << m_worker.in_thread() << "].");
481
482 auto new_user_request = make_unique<User_request>();
483 new_user_request->m_target_hndl_ptr = target_hndl_or_null;
484 new_user_request->m_target_meta_blob = target_meta_blob;
485 new_user_request->m_on_done_func = std::move(on_done_func);
486
487 if (m_user_request)
488 {
489 m_pending_user_requests_q.emplace(std::move(new_user_request));
490 FLOW_LOG_TRACE("At least 1 async-receive request is already in progress. "
491 "After registering the new async-receive request: Head slot is non-empty; and "
492 "subsequently-pending deficit queue has size [" << m_pending_user_requests_q.size() << "]. "
493 "Will sync-IO-receive to handle this request once it reaches the front of that queue.");
494 return;
495 }
496 // else if (!m_user_request):
497
498 FLOW_LOG_TRACE("No async-receive request is currently in progress. Starting sync-IO-receive chain to service the "
499 "new request and any further-queued requests that might appear in the meantime.");
500 m_user_request = std::move(new_user_request);
501 // new_user_request is now hosed.
502
503 /* If receive completes synchronously (there are data pending on the "wire"), these will reflect that.
504 * If not then sync_err_code will indicate would-block. */
505 Error_code sync_err_code;
506 size_t sync_sz;
507
508#ifndef NDEBUG
509 bool ok;
510#endif
511 if (m_user_request->m_target_hndl_ptr)
512 {
513 if constexpr(Core::S_TRANSMIT_NATIVE_HANDLES) // Prevent compile error from code that could never be reached.
514 {
515#ifndef NDEBUG
516 ok =
517#endif
518 m_sync_io.async_receive_native_handle(m_user_request->m_target_hndl_ptr, m_user_request->m_target_meta_blob,
519 &sync_err_code, &sync_sz,
520 [this](const Error_code& err_code, size_t sz)
521 {
522 // We are in thread W. m_mutex is locked.
523 on_sync_io_rcv_done(err_code, sz); // Emit to handler; possibly pop queue and continue chain.
524 });
525 }
526 else // if constexpr(!S_TRANSMIT_NATIVE_HANDLES)
527 {
528 assert(false && "This code should never be reached.");
529 }
530 } // if (m_user_request->m_target_hndl_ptr)
531 else // Same deal/keeping comments light.
532 {
533#ifndef NDEBUG
534 ok =
535#endif
536 m_sync_io.async_receive_blob(m_user_request->m_target_meta_blob, &sync_err_code, &sync_sz,
537 [this](const Error_code& err_code, size_t sz)
538 { on_sync_io_rcv_done(err_code, sz); });
539 } // else if (!m_user_request->m_target_hndl_ptr)
540
541 assert(ok && "We are by definition in PEER state, and ctor starts receive-ops, and we never start a "
542 "sync_io async-receive before ensuring previous one has executed; so that should never "
543 "return false. Bug somewhere?");
544
545 if (sync_err_code == error::Code::S_SYNC_IO_WOULD_BLOCK)
546 {
547 // Async-wait started by m_sync_io. It logged plenty. We live to fight another day.
548 return;
549 }
550 // else:
551
552 /* Process the message and nullify m_user_request. (It would also promote any m_pending_user_requests_q head
553 * to m_user_request; but in our case that is not possible. We are still in the user async-receive API!) */
554 process_msg_or_error(sync_err_code, sync_sz);
555
556 FLOW_LOG_TRACE("Message was immediately available; synchronously returned to user; handler posted onto "
557 "async worker thread. Done until next request.");
558} // Async_adapter_receiver::async_receive_native_handle_impl()
559
560template<typename Core_t>
562{
563 using flow::util::Lock_guard;
564 using std::queue;
565
566 // We are in thread U or W. m_mutex is locked.
567
568 assert(err_code != boost::asio::error::operation_aborted);
569 assert((err_code != error::Code::S_SYNC_IO_WOULD_BLOCK) && "By our contract.");
570
571 FLOW_LOG_TRACE(m_log_pfx << ": Earlier async-wait => event active => rcv-mutex lock => "
572 "on-active-event-func => sync_io module => on-rcv-done handler => here. "
573 "Or else: rcv-mutex lock => sync_io module => no async-wait needed => here.");
574
575 /* As noted in our doc header, we have roughly two items on the agenda.
576 *
577 * 1, we need to invoke m_user_request->m_on_done_func, passing it the results (which are our args).
578 *
579 * 2, we need to update our m_* structures, such as popping stuff off m_pending_user_requests_q
580 * and starting the next m_sync_io.async_receive_*() if any -- and so on.
581 *
582 * Oh and 2b: if err_code is truthy, we need to invoke all the pending handlers (if any) and nullify/empty
583 * all of that.
584 *
585 * The question is what thread to do that stuff in. There are a couple of approaches, at least, but the following
586 * seems the least latency-ridden:
587 * - For 1 and 2b, we must post() the handler(s) onto thread W.
588 * - If we are in thread U: It is plainly required, as we promised to invoke completion handlers
589 * from unspecified thread that isn't U.
590 * - If we are in thread W: To avoid recursive mayhem if they choose to call some other API
591 * inside an on_done_func(), we queue it onto thread W by itself. (But if it's 2b in addition to 1,
592 * then we can bundle them all together into one thread-W task; no need for the churn of post()ing
593 * each one individually.)
594 * - However for 2, there's no need to post() anything. The mutex is already locked, so even if we're in thread
595 * U (m_sync_io.async_receive_*() succeeded synchronously, and this->async_receive_native_handle_impl()
596 * therefore invoked us synchronously, from thread U) we are within
597 * our rights to just finish the job here synchronously as well. In fact, that's great! It means
598 * data were waiting to be read right away, and we did everything we could synchronously in the
599 * original async_receive_*() call, post()ing onto thread W only 1 above -- the completion handler invoking (which
600 * we must do by contract). Yay! By the way, if that's the case (we are in thread U), then 2b doesn't apply;
601 * only 1 does. (The situation where there are items in m_pending_user_requests_q means we wouldn't have
602 * started an actual m_sync_io.async_receive_*() in the first place, as one was already in progress. So
603 * it couldn't have magically completed synchronously -- not having begun and all.) */
604
605 // Okay, so first deal with m_*, here and now as discussed. In so doing prepare the stuff to call in thread W.
606
607 assert(m_user_request);
608 typename User_request::Ptr ex_user_request(std::move(m_user_request));
609 assert(!m_user_request);
610
611 queue<typename User_request::Ptr> ex_pending_user_requests_q_or_none;
612 if (err_code)
613 {
614 FLOW_LOG_TRACE("Error emitted by sync-IO => all [" << m_pending_user_requests_q.size() << "] pending "
615 "handlers (may well be none) will be invoked with error in addition to the head handler.");
616
617 ex_pending_user_requests_q_or_none = std::move(m_pending_user_requests_q);
618 assert(m_pending_user_requests_q.empty());
619 }
620 else if (!m_pending_user_requests_q.empty()) // && (!err_code)
621 {
622 FLOW_LOG_TRACE("Success emitted by sync-IO => lead handler will be invoked; next pending request will "
623 "be serviced by sync-IO; pending request count has become (after popping from queue into lead "
624 "slot): [" << (m_pending_user_requests_q.size() - 1) << "].\n");
625
626 m_user_request = std::move(m_pending_user_requests_q.front());
627 // m_pending_user_requests_q.front() is now null; we're gonna pop that null ptr presently.
628 m_pending_user_requests_q.pop();
629 }
630 else // if (_q.empty() && (!err_code))
631 {
632 FLOW_LOG_TRACE("Success emitted by sync-IO => lead handler will be invoked; no pending requests queued up.\n");
633 }
634
635 // Second: Post the completion handlers as discussed.
636 m_worker.post([this, err_code, sz,
637 /* Have to upgrade to shared_ptr<>s due to capturing requiring copyability (even though copying is not
638 * actually invoked by us). unique_ptr and queue<unique_ptr> = not copyable. */
639 ex_user_request = boost::shared_ptr<User_request>(std::move(ex_user_request)),
640 ex_pending_user_requests_q_or_none
641 = boost::make_shared<decltype(ex_pending_user_requests_q_or_none)>
642 (std::move(ex_pending_user_requests_q_or_none))]()
643 {
644 // We are in thread W. Nothing is locked.
645
646 assert(ex_user_request);
647 FLOW_LOG_TRACE(m_log_pfx << ": Invoking head handler.");
648 (ex_user_request->m_on_done_func)(err_code, sz);
649 FLOW_LOG_TRACE("Handler completed.");
650 if (!ex_pending_user_requests_q_or_none->empty())
651 {
652 assert(err_code);
653 assert(sz == 0);
654
655 FLOW_LOG_TRACE(m_log_pfx << ": Invoking [" << ex_pending_user_requests_q_or_none->size() << "] "
656 "pending handlers in one shot (due to error).");
657 while (!ex_pending_user_requests_q_or_none->empty())
658 {
659 ex_pending_user_requests_q_or_none->front()->m_on_done_func(err_code, 0);
660 ex_pending_user_requests_q_or_none->pop();
661 FLOW_LOG_TRACE("In-queue handler finished.");
662 }
663 // @todo For modest perf, iterate through it; then .clear(). Maybe use an underlying deque<> or list<> directly.
664 } // if (!ex_pending_user_requests_q_or_none->empty())
665 }); // m_worker.post()
666
667 assert(!ex_user_request); // Hosed by move().
668 assert(ex_pending_user_requests_q_or_none.empty());
669} // Async_adapter_receiver::process_msg_or_error()
670
671template<typename Core_t>
673{
674 using flow::util::Lock_guard;
675 using std::queue;
676
677 // We are in thread W. m_mutex is locked.
678
679 assert(err_code != boost::asio::error::operation_aborted);
680 assert(err_code != error::Code::S_SYNC_IO_WOULD_BLOCK);
681
682 FLOW_LOG_TRACE(m_log_pfx << ": Earlier async-wait => event active => rcv-mutex lock => "
683 "on-active-event-func => sync_io module => here (on-rcv-done handler).");
684
685 /* This is not *too* different from thread-U (or thread-W if invoked from our own handler)
686 * async_receive_native_handle_impl()... except that in our case more requests may have been queued (as summarized
687 * in top comment in that method) during our async-wait that just finished. And, of course, we need
688 * to process the ready message first-thing. But let's say that's taken care of. After that: we can't just
689 * stop; there may be queued request. We shall process them as synchronously as possible in a do-while()
690 * loop. That's the executive summary. */
691
692 /* So handle the message or error -- W-post any relevant handlers; update m_user_request and
693 * m_pending_user_requests_q. */
694 process_msg_or_error(err_code, sz);
695
696 Error_code sync_err_code;
697 auto& sync_sz = sz; // (Might as well reuse the arg.)
698
699 /* First iteration: sync_err_code is definitely not would-block; m_user_request may be null.
700 * Subsequent iterations: sync_err_code may be would-block. */
701 while (m_user_request && (sync_err_code != error::Code::S_SYNC_IO_WOULD_BLOCK))
702 {
703 // @todo Code reuse with async_receive_native_handle_impl()? For now keeping comments light.
704#ifndef NDEBUG
705 bool ok;
706#endif
707 if (m_user_request->m_target_hndl_ptr)
708 {
709 if constexpr(Core::S_TRANSMIT_NATIVE_HANDLES)
710 {
711#ifndef NDEBUG
712 ok =
713#endif
714 m_sync_io.async_receive_native_handle(m_user_request->m_target_hndl_ptr, m_user_request->m_target_meta_blob,
715 &sync_err_code, &sync_sz,
716 [this](const Error_code& err_code, size_t sz)
717 {
718 on_sync_io_rcv_done(err_code, sz); // Back to us (async path).
719 });
720 }
721 else // if constexpr(!S_TRANSMIT_NATIVE_HANDLES)
722 {
723 assert(false && "This code should never be reached.");
724 }
725 } // if (m_user_request->m_target_hndl_ptr)
726 else
727 {
728#ifndef NDEBUG
729 ok =
730#endif
731 m_sync_io.async_receive_blob(m_user_request->m_target_meta_blob, &sync_err_code, &sync_sz,
732 [this](const Error_code& err_code, size_t sz)
733 { on_sync_io_rcv_done(err_code, sz); });
734 } // else if (!m_user_request->m_target_hndl_ptr)
735 assert(ok && "We are by definition in PEER state, and ctor starts receive-ops, and we never start a "
736 "sync_io async-receive before ensuring previous one has executed; so that should never "
737 "return false. Bug somewhere?");
738
739 if (sync_err_code == error::Code::S_SYNC_IO_WOULD_BLOCK)
740 {
741 continue; // Loop will end.
742 }
743 // else
744
745 /* Remaining outcomes: sync_err_code truthy => process_msg_or_error() will do the right thing.
746 * sync_err_code is falsy => ditto. */
747
748 process_msg_or_error(sync_err_code, sync_sz);
749
750 /* Outcomes: Error => m_user_request is null, m_pending_user_requests_q is null. No req to service. Loop end.
751 * Success => m_user_request is null, m_pending_user_requests_q is null. No req to service. Loop end.
752 * Success => m_user_request is NOT null, m_pending_user_requests_q is ???. Req needs service.
753 * In no case is sync_err_code (at this point) would-block. Hence: time to check loop condition. */
754 } // while (m_user_request && (sync_err_code != error::Code::S_SYNC_IO_WOULD_BLOCK));
755} // Async_adapter_receiver::on_sync_io_rcv_done()
756
757template<typename Core_t>
759{
760 using flow::util::Lock_guard;
761
762 // Like Async_adapter_sender::send_native_handle() and others (keeping comments light).
763
764 Lock_guard<decltype(m_mutex)> lock(m_mutex);
765 return m_sync_io.idle_timer_run(timeout);
766}
767
768} // namespace ipc::transport::sync_io
Internal-use type that adapts a given PEER-state sync_io::Native_handle_receiver or sync_io::Blob_rec...
flow::async::Single_thread_task_loop & m_worker
Single-thread worker pool for all internal async work. Referred to as thread W in comments.
flow::util::Mutex_non_recursive m_mutex
Protects m_user_request, m_pending_user_requests_q, and receive-ops data of m_sync_io.
std::queue< typename User_request::Ptr > m_pending_user_requests_q
Queue storing deficit async-receive requests queued up due to m_user_request being not null while mor...
bool idle_timer_run(util::Fine_duration timeout)
See Native_handle_receiver counterpart.
Async_adapter_receiver(flow::log::Logger *logger_ptr, util::String_view log_pfx, flow::async::Single_thread_task_loop *worker, Core *sync_io)
Constructs the adapter around sync_io::X object *sync_io.
const std::string m_log_pfx
See log_pfx arg of ctor.
void async_receive_native_handle_impl(Native_handle *target_hndl_or_null, const util::Blob_mutable &target_meta_blob, flow::async::Task_asio_err_sz &&on_done_func)
Body of async_receive_native_handle() and async_receive_blob(); with target_hndl null if and only if ...
User_request::Ptr m_user_request
The head slot containing the currently-being-serviced "deficit" async-receive request,...
~Async_adapter_receiver()
To be invoked after ->stop()ping *worker (from ctor), as well as flushing any still-queued tasks in i...
void on_sync_io_rcv_done(const Error_code &err_code, size_t sz)
Invoked via active-event API, handles the async completion of m_sync_io.async_receive_*() operation.
void async_receive_native_handle(Native_handle *target_hndl, const util::Blob_mutable &target_meta_blob, flow::async::Task_asio_err_sz &&on_done_func)
See Native_handle_sender counterpart.
void async_receive_blob(const util::Blob_mutable &target_blob, flow::async::Task_asio_err_sz &&on_done_func)
See Blob_receiver counterpart.
Core_t Core
The sync_io::X type being adapted into async-I/O-pattern X.
Core & m_sync_io
The core Core engine, implementing the sync_io pattern (see util::sync_io doc header).
void process_msg_or_error(const Error_code &err_code, size_t sz)
Invoked from thread U/W (async_receive_native_handle_impl()) or thread W (active-event API),...
Useful if using the sync_io pattern within a user event loop built on boost.asio (optionally with flo...
@ S_SYNC_IO_WOULD_BLOCK
A sync_io operation could not immediately complete; it will complete contingent on active async-wait ...
@ S_OBJECT_SHUTDOWN_ABORTED_COMPLETION_HANDLER
Async completion handler is being called prematurely, because underlying object is shutting down,...
sync_io-pattern counterparts to async-I/O-pattern object types in parent namespace ipc::transport.
boost::shared_ptr< Task > Task_ptr
Short-hand for ref-counted pointer to a Function<> that takes no arguments and returns nothing; in pa...
boost::asio::mutable_buffer Blob_mutable
Short-hand for an mutable blob somewhere in memory, stored as exactly a void* and a size_t.
Definition: util_fwd.hpp:134
flow::Fine_duration Fine_duration
Short-hand for Flow's Fine_duration.
Definition: util_fwd.hpp:111
flow::util::String_view String_view
Short-hand for Flow's String_view.
Definition: util_fwd.hpp:109
Log_component
The flow::log::Component payload enumeration containing various log components used by Flow-IPC inter...
Definition: common.hpp:322
flow::Error_code Error_code
Short-hand for flow::Error_code which is very common.
Definition: common.hpp:297
Data store representing a deficit user async-receive request: either one being currently handled by m...
Native_handle * m_target_hndl_ptr
See async_receive_native_handle() target_hndl. Null for async_receive_blob().
util::Blob_mutable m_target_meta_blob
See async_receive_native_handle() target_meta_blob. Or see async_receive_blob() target_blob.
boost::movelib::unique_ptr< User_request > Ptr
Short-hand for unique_ptr to this.
flow::async::Task_asio_err_sz m_on_done_func
See async_receive_native_handle() or async_receive_blob() on_done_func.
A monolayer-thin wrapper around a native handle, a/k/a descriptor a/k/a FD.