Flow-IPC 1.0.2
Flow-IPC project: Full implementation reference.
async_adapter_snd.hpp
Go to the documentation of this file.
1/* Flow-IPC: Core
2 * Copyright 2023 Akamai Technologies, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the
5 * "License"); you may not use this file except in
6 * compliance with the License. You may obtain a copy
7 * of the License at
8 *
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in
12 * writing, software distributed under the License is
13 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
14 * CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing
16 * permissions and limitations under the License. */
17
18/// @file
19#pragma once
20
24#include <flow/log/log.hpp>
25#include <flow/async/single_thread_task_loop.hpp>
26
28{
29
30// Types.
31
32/**
33 * Internal-use type that adapts a given PEER-state sync_io::Native_handle_sender or sync_io::Blob_sender *core* into
34 * the async-I/O-pattern Native_handle_sender or Blob_sender. State-mutating logic of the latter is forwarded
35 * to a `*this`; while trivial `const` (in PEER state) things like `.send_blob_max_size()` are forwarded directly to the
36 * core `sync_io::X`.
37 *
38 * @see transport::Native_socket_stream::Impl uses this for 99% of its outgoing-direction
39 * (PEER-state by definition) logic.
40 * @see transport::Blob_stream_mq_sender_impl uses this for 99% of its logic.
41 *
42 * @see Async_adapter_receiver for the opposite-direction thing. E.g., transport::Native_socket_stream::Impl
43 * uses that for 99% of its incoming-direction logic.
44 *
45 * ### Threads and thread nomenclature; locking ###
46 * Thread U, thread W... locking... just see those sections in transport::Native_socket_stream::Impl class doc header.
47 * We adopt that nomenclature and logic. However, as we are concerned with only one direction (op-type),
48 * we only deal with code in either thread U or W concerned with that. The other-direction code -- if applicable
49 * (e.g., applicable for `Native_socket_stream` which deals with both over 1 socket connection; N/A
50 * for `Blob_stream_mq_*` which uses separate objects entirely) -- simply co-exists in the same thread W and "thread"
51 * U. (If `Native_socket_stream` wanted to, it could even parallelize stuff in thread W by using separate worker
52 * threads Ws and Wr. As of this writing it does not, but it could -- nothing in `*this` would change.)
53 *
54 * Note, again, that we have our own `m_mutex`. If there is an opposing-direction counterpart Async_adapter_receiver,
55 * then it has its own `m_mutex`; hence things can proceed concurrently.
56 *
57 * ### Impl design ###
58 * This is almost entirely subsumed by our `sync_io` core, Async_adapter_sender::Core, an instance of
59 * sync_io::Native_handle_sender or sync_io::Blob_sender. It has a send op-type (possibly among others), so we invoke
60 * its `"sync_io::*_sender:start_send_blob_ops()"` during our initialization. After that:
61 *
62 * The main method, send_native_handle() (and its degenerate version send_blob())
63 * lacks a completion handler and hence can be forwarded to `Core m_sync_io`... that's it. auto_ping() -- same deal.
64 *
65 * async_end_sending() takes a completion handler, however. Simple enough -- we could just capture
66 * this `on_done_func` (from the user arg) in a lambda, then when `m_sync_io` calls our internal handler
67 * (synchronously), we `post(on_done_func)` onto thread W, and that's it. However, that's not sufficient,
68 * as if they call our dtor before that can complete, then we are to invoke `on_done_func(E)` where E =
69 * operation-aborted (by our contract). So we save `on_done_func` into `m_end_sending_on_done_func_or_empty`
70 * instead of capturing it.
71 */
72template<typename Core_t>
74 public flow::log::Log_context,
75 private boost::noncopyable
76{
77public:
78 // Types.
79
80 /// The `sync_io::X` type being adapted into async-I/O-pattern `X`.
81 using Core = Core_t;
82
83 // Constructors/destructor.
84
85 /**
86 * Constructs the adapter around `sync_io::X` object `*sync_io`.
87 *
88 * @param logger_ptr
89 * Logger to use for logging subsequently.
90 * @param log_pfx
91 * String that shall precede ~all logged messages (e.g., `lexical_cast<string>(x)`, where `x` is an `X`.)
92 * @param worker
93 * The worker thread loop of `X`. Background work, as needed, will be posted onto this
94 * "thread W." Note that `X` may (or may not) share this thread with unrelated tasks;
95 * for example `Native_socket_stream` uses it for both a `*this` (outgoing-direction)
96 * and an Async_adapter_receiver (incoming-direction). `*worker* must already be `->start()`ed.
97 * @param sync_io
98 * The core object of `X`. It should have just (irreversibly) entered state PEER.
99 */
100 Async_adapter_sender(flow::log::Logger* logger_ptr, util::String_view log_pfx,
101 flow::async::Single_thread_task_loop* worker, Core* sync_io);
102
103 /**
104 * To be invoked after `->stop()`ping `*worker` (from ctor), as well as flushing any still-queued
105 * tasks in its `Task_engine` (via `.restart()` and `.poll()`), this satisfies the customer adapter
106 * dtor's contract which is to invoke any not-yet-fired completion handlers with special
107 * operation-aborted error code. In our case that is either nothing or 1 `async_end_sending()` completion
108 * handler. If applicable the dtor returns once that handler has completed in an unspecified thread
109 * that is not the calling thread.
110 */
112
113 // Methods.
114
115 /**
116 * See Native_handle_sender counterpart. However, this one is `void`, as there is no way `*this` is not in PEER state
117 * (by definition).
118 *
119 * @param hndl_or_null
120 * See Native_handle_sender counterpart.
121 * @param meta_blob
122 * See Native_handle_sender counterpart.
123 * @param err_code
124 * See Native_handle_sender counterpart.
125 */
126 void send_native_handle(Native_handle hndl_or_null, const util::Blob_const& meta_blob, Error_code* err_code);
127
128 /**
129 * See Blob_sender counterpart. However, this one is `void`, as there is no way `*this` is not in PEER state
130 * (by definition).
131 *
132 * @param blob
133 * See Blob_sender counterpart.
134 * @param err_code
135 * See Blob_sender counterpart.
136 */
137 void send_blob(const util::Blob_const& blob, Error_code* err_code);
138
139 /**
140 * See Native_handle_sender counterpart; or leave `on_done_func_or_empty.empty()` for
141 * Native_handle_sender::end_sending().
142 *
143 * @param on_done_func_or_empty
144 * See Native_handle_sender counterpart. See above.
145 * @return See Native_handle_sender counterpart.
146 */
147 bool async_end_sending(flow::async::Task_asio_err&& on_done_func_or_empty);
148
149 /**
150 * See Native_handle_sender counterpart.
151 *
152 * @param period
153 * See Native_handle_sender counterpart.
154 * @return See Native_handle_sender counterpart.
155 */
156 bool auto_ping(util::Fine_duration period);
157
158private:
159 // Methods.
160
161 /**
162 * Handles the completion of `m_sync_io.async_end_sending()` operation whether synchronously or asynchronously.
163 * Can be invoked from thread U or thread W, and #m_mutex must be locked. #m_end_sending_on_done_func_or_empty
164 * must not be `.empty()`. Post-condition: it has been made `.empty()`, and a `move()`d version of it has
165 * posted onto thread W.
166 *
167 * @param err_code
168 * Result to pass to user.
169 */
170 void on_sync_io_end_sending_done(const Error_code& err_code);
171
172 // Data.
173
174 /// See `log_pfx` arg of ctor.
175 const std::string m_log_pfx;
176
177 /**
178 * The `on_done_func` argument to async_end_sending(), possibly `.empty()` if originally user invoked `end_sending()`,
179 * or if neither was used, or if it has fired. Note that if async_end_sending() does not complete
180 * before dtor executes, dtor will invoke the handler (unless `.empty()`) with operation-aborted code.
181 *
182 * It would not need to be a member -- could just be captured in lambdas while async_end_sending() is outstanding --
183 * except for the need to still invoke it with operation-aborted from dtor in the aforementioned case.
184 *
185 * Protected by #m_mutex.
186 */
187 flow::async::Task_asio_err m_end_sending_on_done_func_or_empty;
188
189 /**
190 * Protects #m_end_sending_on_done_func_or_empty and, more importantly, send-ops data of #m_sync_io.
191 * For example `send_*()` engages `Core::send_*()` in thread U which might
192 * add messages to its internal pending-out-messages-during-would-block queue; while
193 * a util::sync_io::Event_wait_func `on_active_ev_func()` invocation (on writable transport) as requested by
194 * #m_sync_io will invoke logic inside the latter, which might pop items from that queue
195 * and send them off -- all in thread W. (This information is provided for context, as formally it's a black
196 * box inside `m_sync_io`.)
197 */
198 mutable flow::util::Mutex_non_recursive m_mutex;
199
200 /// Single-thread worker pool for all internal async work. Referred to as thread W in comments.
201 flow::async::Single_thread_task_loop& m_worker;
202
203 /**
204 * The core #Core engine, implementing the `sync_io` pattern (see util::sync_io doc header).
205 * See our class doc header for overview of how we use it (the aforementioned `sync_io` doc header talks about
206 * the `sync_io` pattern generally).
207 *
208 * Thus, #m_sync_io is the synchronous engine that we use to perform our work in our asynchronous boost.asio
209 * loop running in thread W (#m_worker) while collaborating with user thread(s) a/k/a thread U.
210 * (Recall that the user may choose to set up their own event loop/thread(s) --
211 * boost.asio-based or otherwise -- and use their own equivalent of an #m_sync_io instead.)
212 */
214}; // class Async_adapter_sender
215
216// Template implementations.
217
218template<typename Core_t>
220 util::String_view log_pfx,
221 flow::async::Single_thread_task_loop* worker,
222 Core* sync_io) :
223 flow::log::Log_context(logger_ptr, Log_component::S_TRANSPORT),
224 m_log_pfx(log_pfx),
225 m_worker(*worker),
226 m_sync_io(*sync_io)
227{
230 using flow::util::Lock_guard;
231
232 // We've just entered PEER state, so set up the send-ops.
233
234 /* Hook up the m_sync_io=>*this interaction. m_sync_io will use these callbacks to ask
235 * us to ->async_wait() on `Asio_waitable_native_handle`s for it.
236 *
237 * The *this=>m_sync_io interaction shall be our APIs, like send_blob(),
238 * simply invoking the same API in m_sync_io (m_sync_io.send_blob() for that example). */
239
240 /* (.start_send_native_handler_ops() would do the same thing, if it exists. If it exists, that's because it has
241 * both to satisfy two concepts -- for when the user uses the sync_io::X directly -- but we don't care about that;
242 * we know they are the same in this case; so just use the one we know exists for any X.) */
243#ifndef NDEBUG
244 const bool ok =
245#endif
246 m_sync_io.start_send_blob_ops([this](Asio_waitable_native_handle* hndl_of_interest,
247 bool ev_of_interest_snd_else_rcv,
248 Task_ptr&& on_active_ev_func)
249 {
250 /* We are in thread U or thread W; m_sync_io.<?>() has called its m_..._ev_wait_func();
251 * it has protected access to *hndl_of_interest as needed. */
252
253 FLOW_LOG_TRACE(m_log_pfx << ": Sync-IO send-ops event-wait request: "
254 "descriptor [" << hndl_of_interest->native_handle() << "], "
255 "writable-else-readable [" << ev_of_interest_snd_else_rcv << "].");
256
257 // They want this async_wait(). Oblige.
258 assert(hndl_of_interest);
259 hndl_of_interest->async_wait(ev_of_interest_snd_else_rcv
260 ? Asio_waitable_native_handle::Base::wait_write
261 : Asio_waitable_native_handle::Base::wait_read,
262 [this, on_active_ev_func = std::move(on_active_ev_func)]
263 (const Error_code& err_code)
264 {
265 // We are in thread W. Nothing is locked.
266
267 if (err_code == boost::asio::error::operation_aborted)
268 {
269 return; // Stuff is shutting down. GTFO.
270 }
271 // else
272
273 // They want to know about completed async_wait(). Oblige.
274
275 // Protect m_sync_io and m_* against send-ops (`send_*()`, *end_sending(), ...).
276 Lock_guard<decltype(m_mutex)> lock(m_mutex);
277
278 /* Inform m_sync_io of the event. This can synchronously invoke handler we have registered via m_sync_io
279 * API (e.g., `send_*()`, auto_ping()). In our case -- if indeed it triggers a handler -- it will
280 * have to do with async_end_sending() completion. */
281
282 (*on_active_ev_func)();
283 // (That would have logged sufficiently inside m_sync_io; let's not spam further.)
284
285 // Lock_guard<decltype(m_mutex)> lock(m_mutex): unlocks here.
286 }); // hndl_of_interest->async_wait()
287 }); // m_sync_io.start_send_blob_ops()
288 assert(ok);
289} // Async_adapter_sender::Async_adapter_sender()
290
291template<typename Core_t>
293{
294 using flow::async::Single_thread_task_loop;
295 using flow::util::ostream_op_string;
296
297 /* Pre-condition: m_worker is stop()ed, and any pending tasks on it have been executed.
298 * Our promised job is to invoke any pending handlers with operation-aborted.
299 * The decision to do it from a one-off thread is explained in transport::Native_socket_stream::Impl::~Impl()
300 * and used in a few places; so see that. Let's just do it.
301 * @todo It would be cool, I guess, to do it all in one one-off thread instead of potentially starting, like,
302 * 2 for some of our customers. Well, whatever. At least we can avoid it if we know there are no handlers
303 * to invoke; speaking of which there's just the one: */
304
305 if (!m_end_sending_on_done_func_or_empty.empty())
306 {
307 Single_thread_task_loop one_thread(get_logger(), ostream_op_string(m_log_pfx, "-snd-temp_deinit"));
308 one_thread.start([&]()
309 {
310 FLOW_LOG_INFO(m_log_pfx << ": In transient snd-finisher thread: "
311 "Shall run pending graceful-sends-close completion handler.");
312 m_end_sending_on_done_func_or_empty(error::Code::S_OBJECT_SHUTDOWN_ABORTED_COMPLETION_HANDLER);
313 FLOW_LOG_INFO("Transient finisher exiting.");
314 });
315 } // if (!m_end_sending_on_done_func_or_empty.empty())
316} // Async_adapter_sender::~Async_adapter_sender()
317
318template<typename Core_t>
320{
321 using flow::util::Lock_guard;
322
323 // We are in thread U (or thread W in a completion handler, but not concurrently).
324
325 /* See comments in send_native_handle(); we are just a somewhat degraded version of that.
326 * It's just that send_native_handle() might not even compile, if m_sync_io does not have that method.
327 * Logic, as pertains to us, is the same though. */
328 Lock_guard<decltype(m_mutex)> lock(m_mutex);
329#ifndef NDEBUG
330 const bool ok =
331#endif
332 m_sync_io.send_blob(blob, err_code);
333 assert(ok && "We are by definition in PEER state, and ctor starts send-ops; so that should never "
334 "return false. Bug somewhere?");
335}
336
337template<typename Core_t>
339 Error_code* err_code)
340{
341 using flow::util::Lock_guard;
342
343 // We are in thread U (or thread W in a completion handler, but not concurrently).
344
345 /* This one is particularly simple, as there is no async completion handler to invoke. So we essentially just
346 * forward it to m_sync_io.send_native_handler() with identical signature. The only subtlety -- and it is
347 * quite important -- is that we must lock m_mutex. That might seem odd if one does not remember that
348 * m_sync_io send-ops explicit API (.send_native_handler() in this case) must never be invoked concurrently
349 * with the start_send_*_ops()-passed Event_wait_func's `on_active_ev_func()` -- which might call
350 * on_sync_io_end_sending_done(). So we lock it, and elsewhere we lock it when calling on_active_ev_func().
351 *
352 * Specifically (though formally speaking we are just following the sync_io pattern, and technically the following
353 * info is about inside the black box that is m_sync_io), m_sync_io at least stores an out-queue of messages
354 * to send, non-empty in would-block conditions; send_native_handle() pushes items onto that queue, while
355 * on_active_ev_func() (on writable socket) pops items off it upon successfully sending them off over socket. */
356
357 Lock_guard<decltype(m_mutex)> lock(m_mutex);
358#ifndef NDEBUG
359 const bool ok =
360#endif
361 m_sync_io.send_native_handle(hndl, meta_blob, err_code);
362 assert(ok && "We are by definition in PEER state, and ctor starts send-ops; so that should never "
363 "return false. Bug somewhere?");
364} // Async_adapter_sender::send_native_handle()
365
366template<typename Core_t>
367bool Async_adapter_sender<Core_t>::async_end_sending(flow::async::Task_asio_err&& on_done_func_or_empty)
368{
369 using flow::util::Lock_guard;
370
371 // We are in thread U (or thread W in a completion handler, but not concurrently).
372
373 /* While the same comments about locking m_mutex apply, in addition to that:
374 * This is somewhat more complex than the other send-ops, as there is a completion handler. */
375
376 Lock_guard<decltype(m_mutex)> lock(m_mutex);
377
378 if (on_done_func_or_empty.empty())
379 {
380 return m_sync_io.end_sending();
381 }
382 // else
383
384 if (!m_end_sending_on_done_func_or_empty.empty())
385 {
386 FLOW_LOG_WARNING(m_log_pfx << ": async_end_sending(F) invoked; but we have a saved "
387 "completion handler -- which has not yet fired -- for it already, so it must be a dupe-call. "
388 "Ignoring.");
389 return false;
390 }
391 /* else: It might also be .empty() and still be a dupe call (if it has fired already -- then they called us again).
392 * m_sync_io.async_end_sending() will catch that fine. For now though: */
393
394 /* Save it (instead of capturing it in lambda) in case dtor runs before async-end-sending completes;
395 * it would then invoke it with operation-aborted code. */
396 m_end_sending_on_done_func_or_empty = std::move(on_done_func_or_empty);
397
398 Error_code sync_err_code;
399 const bool ok = m_sync_io.async_end_sending(&sync_err_code, [this](const Error_code& err_code)
400 {
401 /* We are in thread W. m_mutex is locked (by the handler inside the function inside ctor).
402 * This'll, inside, post the handler onto thread W. */
403 on_sync_io_end_sending_done(err_code);
404 });
405
406 if (!ok)
407 {
408 // False start (dupe call). It logged enough.
409 m_end_sending_on_done_func_or_empty.clear();
410 return false;
411 }
412 // else: It either finished synchronously; or it will finish asynchronously.
413
414 if (sync_err_code == error::Code::S_SYNC_IO_WOULD_BLOCK)
415 {
416 // Async-wait needed before we can complete. Live to fight another day.
417 return true;
418 }
419 /* else: Completed synchronously. Since there can be no async-end-sending chains, it's no big deal
420 * to just use on_sync_io_end_sending_done() again. */
421
422 FLOW_LOG_INFO(m_log_pfx << ": Sync-IO async-end-sending completed immediately. "
423 "Posting handler onto async-worker thread.");
424
425 on_sync_io_end_sending_done(sync_err_code);
426
427 return true;
428 // Lock_guard<decltype(m_mutex)> lock(m_mutex): unlocks here.
429} // Async_adapter_sender::async_end_sending()
430
431template<typename Core_t>
433{
434 using flow::util::Lock_guard;
435
436 /* We are in thread U or W. m_mutex is locked (as required for m_end_sending_on_done_func_or_empty
437 * and m_sync_io access). */
438
439 assert(err_code != boost::asio::error::operation_aborted);
440
441 FLOW_LOG_TRACE(m_log_pfx << ":: Earlier async-wait => event active => snd-mutex lock => "
442 "on-active-event-func => sync_io module => here (on-end-sending-done handler). Or else "
443 "snd-mutex lock => no async-wait needed => here... (ditto).");
444
445 /* Just invoke user completion handler. post() it onto thread W. Why post() onto W?
446 * - If we are in thread U: It is plainly required, as we promised to invoke completion handlers
447 * from unspecified thread that isn't U.
448 * - If we are in thread W: To avoid recursive mayhem if they choose to call some other API
449 * inside on_done_func(), we queue it onto thread W by itself. */
450 m_worker.post([this, err_code, on_done_func = std::move(m_end_sending_on_done_func_or_empty)]()
451 {
452 // We are in thread W. Nothing is locked.
453 FLOW_LOG_TRACE(m_log_pfx << ": Invoking on-end-sending-done handler.");
454 on_done_func(err_code);
455 FLOW_LOG_TRACE("Handler completed.");
456 });
457
458 m_end_sending_on_done_func_or_empty.clear(); // Just in case move() didn't do it.
459} // Async_adapter_sender::on_sync_io_end_sending_done()
460
461template<typename Core_t>
463{
464 using flow::util::Lock_guard;
465
466 // Like send_native_handle() and others (keeping comments light).
467
468 Lock_guard<decltype(m_mutex)> lock(m_mutex);
469 return m_sync_io.auto_ping(period);
470}
471
472} // namespace ipc::transport::sync_io
Internal-use type that adapts a given PEER-state sync_io::Native_handle_sender or sync_io::Blob_sende...
~Async_adapter_sender()
To be invoked after ->stop()ping *worker (from ctor), as well as flushing any still-queued tasks in i...
void send_native_handle(Native_handle hndl_or_null, const util::Blob_const &meta_blob, Error_code *err_code)
See Native_handle_sender counterpart.
flow::async::Task_asio_err m_end_sending_on_done_func_or_empty
The on_done_func argument to async_end_sending(), possibly .empty() if originally user invoked end_se...
Core_t Core
The sync_io::X type being adapted into async-I/O-pattern X.
void on_sync_io_end_sending_done(const Error_code &err_code)
Handles the completion of m_sync_io.async_end_sending() operation whether synchronously or asynchrono...
bool async_end_sending(flow::async::Task_asio_err &&on_done_func_or_empty)
See Native_handle_sender counterpart; or leave on_done_func_or_empty.empty() for Native_handle_sender...
void send_blob(const util::Blob_const &blob, Error_code *err_code)
See Blob_sender counterpart.
flow::util::Mutex_non_recursive m_mutex
Protects m_end_sending_on_done_func_or_empty and, more importantly, send-ops data of m_sync_io.
flow::async::Single_thread_task_loop & m_worker
Single-thread worker pool for all internal async work. Referred to as thread W in comments.
Core & m_sync_io
The core Core engine, implementing the sync_io pattern (see util::sync_io doc header).
const std::string m_log_pfx
See log_pfx arg of ctor.
bool auto_ping(util::Fine_duration period)
See Native_handle_sender counterpart.
Async_adapter_sender(flow::log::Logger *logger_ptr, util::String_view log_pfx, flow::async::Single_thread_task_loop *worker, Core *sync_io)
Constructs the adapter around sync_io::X object *sync_io.
Useful if using the sync_io pattern within a user event loop built on boost.asio (optionally with flo...
@ S_SYNC_IO_WOULD_BLOCK
A sync_io operation could not immediately complete; it will complete contingent on active async-wait ...
@ S_OBJECT_SHUTDOWN_ABORTED_COMPLETION_HANDLER
Async completion handler is being called prematurely, because underlying object is shutting down,...
sync_io-pattern counterparts to async-I/O-pattern object types in parent namespace ipc::transport.
boost::shared_ptr< Task > Task_ptr
Short-hand for ref-counted pointer to a Function<> that takes no arguments and returns nothing; in pa...
flow::Fine_duration Fine_duration
Short-hand for Flow's Fine_duration.
Definition: util_fwd.hpp:117
boost::asio::const_buffer Blob_const
Short-hand for an immutable blob somewhere in memory, stored as exactly a void const * and a size_t.
Definition: util_fwd.hpp:134
flow::util::String_view String_view
Short-hand for Flow's String_view.
Definition: util_fwd.hpp:115
Log_component
The flow::log::Component payload enumeration containing various log components used by Flow-IPC inter...
Definition: common.hpp:323
flow::Error_code Error_code
Short-hand for flow::Error_code which is very common.
Definition: common.hpp:298
A monolayer-thin wrapper around a native handle, a/k/a descriptor a/k/a FD.