Flow-IPC 1.0.0
Flow-IPC project: Full implementation reference.
sync_io_fwd.hpp
Go to the documentation of this file.
1/* Flow-IPC: Core
2 * Copyright 2023 Akamai Technologies, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the
5 * "License"); you may not use this file except in
6 * compliance with the License. You may obtain a copy
7 * of the License at
8 *
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in
12 * writing, software distributed under the License is
13 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
14 * CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing
16 * permissions and limitations under the License. */
17
18/// @file
19#pragma once
20
21#include "ipc/util/util_fwd.hpp"
22#include <boost/shared_ptr.hpp>
23
24/**
25 * Contains common code, as well as important explanatory documentation in the following text, for the `sync_io`
26 * pattern used in ipc::transport and ipc::session to provide fine-tuned control over integrating
27 * asynchronous Flow-IPC activities into the user's event loop.
28 *
29 * ### What's all this, then? ###
30 * You may notice that for ~every class/class template `X` in this library that provides 1+ `X::async_*(..., F)`
31 * method(s), where `F()` is a completion handler for an async operation, there exists also -- in a sub-namespace
32 * named `sync_io` -- a similar-looking type `sync_io::X`.
33 *
34 * @note At times, generally in ipc::session and class template ipc::transport::struc::Channel specifically, async
35 * ops don't necessarily use `async_*` naming, but the point remains that: an operation occurs in some
36 * sense in the background, and then a *handler* function, given as an arg, is called when the operation is
37 * completed. Neverthelss for this discussion we will often take the case of, indeed, an operation
38 * named `X::async_*(..., F)`, with `F()` a one-off completion handler. It's the basic, most mainstream pattern.
39 *
40 * Long story short, all else being equal, we would recommend the use of `X`
41 * over `sync_io::X`: it is easier (in most cases), less error-prone, and delivers
42 * similar performance -- quite possibly even better (due to automatic parallelization albeit at the cost
43 * of mandatory context-switching). However `sync_io::X` does exist for a good reason, at least for some important
44 * `X`es, and in an advanced, highly peformance-sensitive application it may be worth considering switching
45 * to the *direct* use of `sync_io::X`. For what it is worth, internally `X` is usually written
46 * in terms of `sync_io::X`; the latter is essentially the core logic, while the former provides
47 * auto-parallelization and a simpler interface.
48 *
49 * @note For this reason comment sometimes refer to a `sync_io::X` *core*: where the basic `X`-ish capabalities and
50 * data live. The *async-I/O* `X` is then often built around a `sync_io::X` core. Because of this it is
51 * usually easy, and fast, to convert a `sync_io::X` into an `X` -- via a move-like ctor called
52 * `sync_io`-core *adopting ctor*. Additionally, ipc::transport::Channel template -- which bundles local peer
53 * objects of 1-2 IPC pipes -- can bundle *either* async-I/O peer objects *or* `sync_io` peer objects, and
54 * one can always convert the latter to the former by calling `x.async_io_obj()`.
55 *
56 * @note In cases where performance is not a real concern, such as for the assumed-rare
57 * ipc::session::Server_session::async_accept() operations, internally `sync_io::X` may actually be written
58 * in terms of `X` instead... but we digress. Either way it is a black box.
59 *
60 * Some examples of `X`es that have `sync_io::X` counterparts:
61 * - core-layer transport::Native_socket_stream
62 * (including its `async_receive_*()`, and the handler-less -- but nevertheless potentially
63 * asynchronous -- `send_*()`) and all other `Blob_sender`, `Blob_receiver`, `Native_handle_sender`,
64 * `Native_handle_receiver` concept impls including the bundling transport::Channel;
65 * - structured-layer ipc::transport::struc::Channel;
66 * - session::Client_session (with its channel-accept and error handlers) and session::Session_server (ditto, plus
67 * with its `async_accept()`);
68 * - all their session::shm counterparts.
69 *
70 * ### The async-I/O (default) pattern ###
71 * Consider `X` -- take for example transport::Native_socket_stream -- and a particular async operation -- take,
72 * e.g., transport::Native_socket_stream::async_receive_blob().
73 *
74 * When `x.async_receive_blob(..., F)` is invoked, `F()` is the user-specified completion handler, while ...
75 * specifies details about the operation, in this case the target buffer where to write data. It works as follows:
76 * `x` attempts to perform the operation (in this case receive a single in-message as soon as it becomes available
77 * which may or may not be instant); and once it has suceeded, it invokes `F(...)` (where ... indicates
78 * results, usually at least an `Error_code`) from *an unspecified thread* that is not the user's calling thread
79 * (call it thread U, where `x.async_*()` was called). Even if the op completes immediately, `x.async_*()` will
80 * never invoke `F()` synchronously; always from the *unspecified thread*.
81 *
82 * That's great, but what does really happen? Answer: `x`, usually at construction, invisibly, starts a separate
83 * thread (technically it could be co-using a thread with other objects; but in reality as of this writing each
84 * object really starts a thread). An async operation might complete synchronously (perhaps a message is available
85 * in a kernel receive buffer and is therefore immediately, internally, received inside `x.async_receive_blob()`
86 * body); or it might occur in the background and involve (internally) async waiting of native-handle readability --
87 * possibly even more threads might start (internally) to get things to work. *Either* way, there is that thread --
88 * call it thread W -- where *at least* the completion handler `F()` will be called.
89 *
90 * (If `x` is destroyed before this has a chance to happen, the `x`
91 * destructor shall -- last-thing -- invoke `F()`, passing it the special
92 * operation-aborted `Error_code`. That is the case for one-off async-ops like that one. There are also variations
93 * such as the completion handlers of transport::struc::Channel, but the key point -- that work happens in
94 * the background, in the object-created own thread W, and user-supplied handlers are run from thread W -- remains
95 * the same. Another variation is async-ops that don't require a completion handler; for example
96 * transport::Native_socket_stream::send_blob() may perform work in the background upon encountering would-block
97 * conditions internally -- and this again occurs in thread W -- but there is no completion handler to invoke.)
98 *
99 * What is the user supposed to do with an async-op like this? In practice we tend to think of this in terms
100 * of 2 basic possiblities for how the user's own event loop might be organized.
101 * - Proactor pattern: This is what boost.asio uses, what flow.async facilitates further, and what we generally
102 * recommend all else being equal. We won't describe it here in detail, but integrating such a loop with
103 * this async-I/O pattern in Flow-IPC is quite simple:
104 * -# Call `x.async_*(..., F)` as explained above.
105 * -# As `F()` supply a short wrapper that will place the true handling of the event onto the same event loop --
106 * thread U (though multiple such threads might be in use alternatively) -- where you invoked `x.async_*(F)`.
107 * For example:
108 *
109 * ~~~
110 * ...
111 * m_my_asio_loop.post([t]() { start_async_op(); }
112 * ...
113 * void start_async_op()
114 * {
115 * // We are in thread U.
116 * x.async_receive_blob(..., [this](const Error_code& err_code, size_t sz)
117 * {
118 * // We are in "unspecified thread" W.
119 * m_my_asio_loop.post([this, err_code, sz]() { on_async_op_done(err_code, sz); }
120 * }
121 * }
122 * void on_async_op_done(const Error_code& err_code, size_t sz)
123 * {
124 * // We are in thread U.
125 * // ...handle it....
126 * }
127 * ~~~
128 *
129 * Alternatively:
130 * - Reactor pattern: Usually built on top of an OS-supplied blocking polling method of some kind -- in POSIX
131 * nowadays usually at least `poll()`, in Linux possibly using the more advanced `epoll_*()` -- which centers
132 * on an "FD-set" (native handle set), where one describes `Native_handle`s (FDs) and the events one awaits
133 * (readable, writable) for each; and in each event loop iteration one runs a poll-wait operation
134 * (like `poll()` or `epoll_wait()`). This blocks until 1 or more events-of-interest are active; then wakes up
135 * and reports which ones they were. User code then synchronously invokes handling for each event of interest;
136 * such handling might modify the events-of-interest set, etc., until all such work is done, and the
137 * next poll-wait op executes. Thus in the reactor pattern methods like `on_async_op_done()` are invoked
138 * in a flow-control setup inverted versus the proactor pattern; but ultimately they're both doing the same thing.
139 * To integrate such a loop with this async-I/O pattern in Flow-IPC, a little extra work is required:
140 * - Sometimes event-loop libraries provide this as a built-in feature: a *task queue*. So a facility similar to
141 * `post()` above is supplied. Hence the code above would be adapted to a reactor-pattern loop and end up
142 * fairly similar: In `F()` do some equivalent to the `post()` in the snippet above. Internally it'll set up
143 * some "interrupter" handle in the central `[e]poll*()` handle-set and cause -- from thread W -- for thread U's
144 * poll-wait to wake up. Otherwise:
145 * - This task-queue facility can be written with relatively little difficulty. Essentially it involves
146 * a simple IPC mechanism, perhaps an anonymous pipe, through which thread-W-invoked `F()`s can
147 * inform the thread-U poll-wait that it must wake up and handle events, among any others that the poll-wait
148 * covers.
149 *
150 * So that's the async-I/O (default) pattern in Flow-IPC. Generally it is easy to work with -- especially
151 * in a proactor-pattern event loop, but otherwise also not hard. It cleanly separates Flow-IPC's internal needs
152 * from the rest of the application's: Flow-IPC needs to do background work? It takes care of its own needs:
153 * it starts and ends threads without your participation. Moreover this may
154 * well help performance of the user's own event loop: Flow-IPC's
155 * cycles are mostly spent in separate threads, reducing the length of your loop's single iteration and thus
156 * helping reduce your latency. The processors' context-switching is automatic and usually efficient; and it
157 * automatically makes use of multiple hardware cores.
158 *
159 * ### Rationale for the `sync_io` pattern ###
160 * So why might the default pattern described above be insufficient? A detailed study of this is outside our scope
161 * here; but basically it is a matter of control. The way it starts threads, in a way that cannot be specified
162 * by you (the user), and switches between them may be helpful in 95% of cases; but some applications want complete
163 * control of any such thing. For instance suppose I'd like to start with doing all that background work
164 * of `Native_socket_stream::async_receive_blob()` directly in thread U. It should be possible, right? Whatever
165 * events it waits on -- in reality, internally, principally it waits for readability of a Unix domain socket --
166 * I could just wait-on in my own thread-U `epoll_wait()`. When an active event is detected, I could do the
167 * resulting non-blocking-reads -- that normally would be done in the background thread W -- directly after
168 * the poll-wait.
169 *
170 * Maybe that would be good, reducing context-switching overhead. Or maybe it wouldn't be good, as a big fat
171 * loop iteration could cause latency in serving the next batch of work. If so, and I *did* want
172 * to do some of the work in some other thread for parallelization, maybe I want to share that other thread with
173 * some other processing. Or... or.... Point is: perhaps I want to explicitly structure what threads do what, whether
174 * or not I want multi-threaded processing.
175 *
176 * If that is the case, then the `sync_io` pattern will serve that need. In this pattern, for example
177 * in transport::sync_io::Native_socket_stream, you'll notice completion handlers are still used as part of the API.
178 * However, they are *never* invoked in the background: *you* call into a `sync_io::X` API, and it might
179 * *synchronously only* and at very specific points invoke a completion handler `F()` that you supplied it earlier.
180 *
181 * We'll get into details below, but to summarize how this is integrated with the 2 above-covered user event loop
182 * patterns:
183 * - Proactor pattern: The `sync_io` pattern in Flow-IPC has first-class support for a boost.asio event loop
184 * on the user's part. So -- basically -- if you've got a `boost::asio::io_context E` (`flow::util::Task_engine E`)
185 * `run()`ning over 1+ threads U, then `sync_io::X` shall give you boost.asio `descriptor` objects associated
186 * with `E` and ask *you* -- yourself, albeit on its behalf -- to perform `.async_wait(write, G)` or
187 * `.async_wait(read, G)` on those `descriptor`s; and inform the `sync_io::X` when they've completed, inside `G()`.
188 * As a result, `sync_io::X` internals might inform you of the completion of an `async_...()` op earlier requested
189 * by you.
190 * - Reactor pattern: If your loop isn't boost.asio-based, then `sync_io::X` will similarly ask you to perform
191 * async-waits on this or that handle for read or write or both, just over a slightly different API -- one
192 * conducive to `poll()`, `epoll_ctl()`/`epoll_wait()`, and the like. In short it'll tell you what FD and what
193 * event it wants you to wait-on (and again give you function `G()` to call when it is indeed active).
194 *
195 * Thus, *you* control what happens in what thread -- and everything can happen in *your* single thread, if you
196 * so desire. *You* can create the other threads, arrange necessary synchronization -- including of access to
197 * the `sync_io::X` in question -- as opposed to rely on whatever we've internally designed inside non-`sync_io` `X`.
198 * *You* control when a `sync_io::X` does something. In particular, it is only in a couple of
199 * specific `sync_io::X::*` APIs that a completion handler you gave it can actually be called. *If* it is called,
200 * it is always called synchronously right then and there, not from some unknown background thread.
201 *
202 * ### So `sync_io`-pattern-implementing APIs will never start threads? ###
203 * Well, they might. In some cases it's an internal need that cannot be avoided. However, when *both* (1) it can be
204 * avoided, *and* (2) performance could possibly be affected, then correct: Flow-IPC will avoid starting a thread and
205 * performing context-switching. If it's immaterial for performance in practice, then it absolutely reserves the
206 * right to make background threads, whether for ease of internal implementation or some other reason. And,
207 * of course, if there's some blocking API that must be used internally -- and there is simply no choice but to
208 * use that API -- then a thread will need to be started behind the scenes. We can't very well block your thread U, so
209 * at that point we do what we must.
210 *
211 * However, even in *that* case, a `sync_io` *API* is still supplied. This may be helpful to more easily integrate
212 * with your reactor-pattern event loop. (However, if you have a proactor like boost.asio as your event loop, then
213 * in our view it is unlikely to be helpful in that sense. At that point you might as well use the async-I/O
214 * alternative API -- unless, again, there is some performance benefit to maintaining greater control of what
215 * part of Flow-IPC executes when from what thread.)
216 *
217 * ### Using the `sync_io` pattern: design rationale ###
218 * Though it will look different and perhaps complex, it is actually at its core similar to other third-party
219 * APIs that require the user to perform async-waits on their behalf. The most well known example of such an API
220 * is perhaps OpenSSL. Take `SSL_read()` -- quite similar in spirit to `sync_io`-pattern `x.async_read_blob()`.
221 * When invoked, barring connection-hosing errors, one of 2 things will happen:
222 * - It will succeed and read 1 or more bytes and return this fact ("no error, I have read N bytes").
223 * - It will fail due to would-block, because it either needs the underlying stream socket to be
224 * readable, or to be writable. (The latter -- needing writability -- may seem strange for `SSL_read()`,
225 * but it's normal: perhaps the connection is in the middle of a cert negotiation, and at this stage needs
226 * to write something *out* first; and the connection happens to be in such a state as to not be able to write
227 * bytes to the kernel send buffer at that moment.)
228 * So it will return either:
229 * - `SSL_ERROR_WANT_READ` (meaning, "the underlying stream-socket handle (FD) needs to be readable -- call me
230 * again after you've *async-waited* successfully on this *event-of-interest*, and I will try again"); or
231 * - `SSL_ERROR_WANT_WRITE` (same but needs writability isntead).
232 *
233 * Your application would then internally register interest in FD so-and-so to be readable or writable. Perhaps
234 * some `SSL_write()` would be interested in another such event simultaneously too. So then the next time the
235 * event loop came up to the next `poll()` or `epoll_wait()`, you'd indeed wait on these registered events.
236 * If the `SSL_read()`-related event-of-interest was indeed returned as active, your program would know that
237 * fact, based on its own data structures, and know to try `SSL_read()` again. That time `SSL_read()` might
238 * succeed; or it might require writability again, or readability this time, and would return
239 * `SSL_ERROR_WANT_*` again. Eventually it'd get what it needs and return success.
240 *
241 * `sync_io` pattern in Flow-IPC is not much different. It is arguably more complex to use, but there are good
242 * reasons for it. Namely there are some differences as to our requirements compared to OpenSSL's. To wit:
243 * - For a given operation -- whether it's `Native_socket_stream::async_receive_blob()` or the even more
244 * internally complex transport::struc::Channel::expect_msg() -- there will usually be more than 1
245 * event of interest a time, in fact spread out over more than 1 handle (FD) at a time at that. Namely
246 * in addition to readability/writability of the underlying low-level trqnsport, there are timer(s)
247 * (such as the idle timer -- `idle_timer_run()`); and `struc::Channel` can be configured to have
248 * 2 in-pipes and thus will be async-waiting on 2 FDs' read events.
249 * - Hence we need to be able to express the desired async-waits to the user in a more flexible way than
250 * a simple "can't do it, tell me when my socket is Xable and call me again." We have to communicate
251 * an entire set-of-events-of-interest as well as changes in it over time.
252 * - OpenSSL is targeted at old-school event loops -- ~all written in the reactor-pattern style.
253 * Flow-IPC is meant to be usable by modern proactor-pattern applications *and* old-school reactor-pattern
254 * ones as well. It would be unfortunate, in particular, if a given `X` is written in the modern handler-based way,
255 * while `X::sync_io` counterpart is just entirely different. In fact, suppose `sync_io::X` is available;
256 * *our* `X` internally "wants" to be written around a boost.asio loop *and* reuse `sync_io::X` internally
257 * as well. So in other words, both aesthetically and practically, an OpenSSL-style old-school API would be
258 * a pain to use in a boost.asio-based application (our own async-I/O-pattern classes being a good test case).
259 *
260 * Therefore the `sync_io` pattern is strongly inspired by the boost.asio proactor pattern API. As a corollary it
261 * provides special support for the case when the user's event loop *is* boost.asio-based. If your event loop
262 * is old-school, you will lose nothing however -- just don't use that feature.
263 *
264 * ### Using the `sync_io` pattern: how-to ###
265 * Take `x.async_receive_blob(..., F)`. If `x` is an `X`, we've already described it. Now let `x` be a `sync_io::X`.
266 * The good news is initiating the same operation uses *almost* the exact same signature. It takes the same arguments
267 * including a completion handler `F()` with the exact same signature itself. There is however one difference:
268 * if `F()` takes 1+ args (1st one usually `const Error_code& err_code`; followed at times by something like
269 * `size_t sz` indicating how many bytes were transferred) -- then the `sync_io` form of the `async_...()` method
270 * shall take `sync_`-prefixed *out-arg* counterparts to those args directly as well. Thus specifically:
271 * - If async-I/O sig is `x.async_receive_blob(..., F)`,
272 * where `F()` is of form `void (const Error_code& err_code, size_t sz)`, then:
273 * - `sync_io` sig is: `x.async_receive_blob(..., Error_code* sync_err_code, size_t* sync_sz, F)`,
274 * where `F()` is of the same form as before. You shall provide `&sync_err_code, &sync_sz`, and your local
275 * variables shall be set to certain values upon return.
276 * - You may also leave `sync_err_code` (if applicable) null. In that case standard Flow error-reporting
277 * semantics are active: if (and only if) `*sync_err_code` *would* be set to truthy (non-success) value,
278 * but `sync_err_code` is a null pointer, then a `flow::error::Runtime_error` is thrown with
279 * the would be `*sync_err_code` stored inside the exception object (and a message in its `.what()`).
280 *
281 * So let's say you call `sync_io` x.async_receive_blob(), providing it `&sync_err_code, &sz` args, otherwise
282 * using the same arg values as with async-I/O. (For simplicity of discussion let's assume you did not pass
283 * null pointer for the sync-err-code arg.) Then: `F()` will no longer
284 * execute from some unspecified thread at some unknown future time. Instead there are 2 possibilities.
285 * - If `x.async_receive_blob()` was able to receive a message (more generally -- complete the operation)
286 * synchronously (immediately), then:
287 * - The method *itself* shall emit (via `sync_err_code` out-arg) either *no* error (falsy value)
288 * or an error (truthy value) that is specifically *not* ipc::transport::error::Code::S_SYNC_IO_WOULD_BLOCK.
289 * - This is quite analogous to `SSL_read()` succeeding immediately, without `SSL_ERROR_WANT_*`.
290 * - The method *itself* shall synchronously emit other `sync_` out-arg values indicating the result
291 * of the operation. In this case that is `sync_sz == 0` on error, or `sync_sz > 0` indicating how many
292 * bytes long the successfully, *synchronously* received blob is.
293 * - Again: This is quite analogous to `SSL_read()` succeeding immediately, without `SSL_ERROR_WANT_*`.
294 * - The async completion handler, `F`, shall be utterly and completely ignored. It will *not* be saved.
295 * It will *not* be called. The operation already finished: you get to deal with it right then and there
296 * like a normal synchronous human being.
297 * - Conversely, if `x.async_receive_blob()` can only complete the op after some events-of-interest (to `x`)
298 * become active (we'll discuss how this works in a moment) -- then:
299 * - The method *itself* shall emit (via `sync_err_code` out-arg) the specific code
300 * ipc::transport::error::Code::S_SYNC_IO_WOULD_BLOCK.
301 * - This is analogous to `SSL_read()` returning `SSL_ERROR_WANT_*`.
302 * - It shall save `F` internally inside `x`.
303 * - Later, when your loop *does* detect an event-of-interest (to `x`) is active, it shall explicitly
304 * *call into another `x` API method* -- we call it `(*on_active_ev_func)()` -- and *that* method may
305 * *synchronously* invoke the memorized completion handler `F()`. In the case of `x.async_receive_blob()`
306 * this `F()` call will be just as you're used-to with async-I/O pattern:
307 * `err_code` truthy on error, `sz == 0`; or `err_code` falsy on success, `sz > 0` indicating size of
308 * received blob.
309 * - Since `x.async_receive_blob()` has one-off completion handler semantics, at this point it'll forget `F`.
310 * - However, with `sync_io` pattern, `x` shall *never* issue an operation-aborted call to `F()` or
311 * `x` destruction or anything like that. That's an async-I/O thing.
312 *
313 * The "later, when your loop" step is analogous to: your loop awaiting an event as asked by the would-block
314 * `SSL_read()`, then once active calling `SSL_read()` again; and the latter, this time, returning success.
315 * Note, however, that you do *not* re-call `x.async_receive_blob()` on the desired active event. Conceptually
316 * you're doing the same thing -- you're saying, "the event you wanted is ready; if you can get what I wanted
317 * then do it now" -- but you're doing it by using a separate API.
318 *
319 * That leaves the other other major piece of the API: how to in fact be informed of a desired event-of-interest
320 * and subsequently indicate that event-of-interest is indeed active. In terms of the API, this procedure is
321 * decoupled from the actual `x.async_receive_blob()` API. Moreover it is not expressed as some kind of big
322 * set-of-handles-and-events-in-which-`x`-has-interest-at-a-given-time either. Instead, conceptually, it is
323 * expressed similarly to boost.asio: `x` says to itself: I want to do
324 * `handle_so_and_so.async_wait(readable, F)`; or: I want to do
325 * `handle_so_and_so.async_wait(writable, F)`. But since `handle_so_and_so` is not a boost.asio I/O object, it has
326 * to express the same thing to the user of `x`. How? Answer: sync_io::Event_wait_func. Its doc header explains
327 * the details. We summarize for our chosen example of `x.async_receive_blob()`:
328 * - Just after constructing the `x`, set up the async-wait function: `x.start_receive_blob_ops(E)`,
329 * where `E` is a function with signature a-la `Event_wait_func`.
330 * - (Note that `receive_blob` fragment of the method name. This is replaced with other fragments for other
331 * operations. In some cases an object -- ipc::transport::Native_socket_stream being a prime example -- supports
332 * 2+ sets of operations. For example `Native_socket_stream::start_send_blob_ops()` is for the *independent*
333 * outgoing-direction operations that can occur *concurrently to* incoming-direction `start_receive_blob_ops()`
334 * ops.)
335 * No need to worry about that for now though; we continue with our `receive_blob` example use case.
336 * - *If* (and only if!) your event loop is boost.asio-based, then precede this call with one that supplies
337 * the boost.asio `Task_engine` (or strand or...) where you do your async work: `.replace_event_wait_handles()`.
338 *
339 * ~~~
340 * using ipc::util::sync_io::Task_ptr;
341 * using ipc::util::sync_io::Asio_waitable_native_handle;
342 *
343 * x.replace_event_wait_handles
344 * ([this]() -> auto { return Asio_waitable_native_handle(m_my_task_engine); });
345 * x.start_receive_blob_ops([this](Asio_waitable_native_handle* hndl_of_interest,
346 * bool ev_of_interest_snd_else_rcv,
347 * Task_ptr&& on_active_ev_func)
348 * {
349 * my_rcv_blob_sync_io_ev_wait(hndl_of_interest, ev_of_interest_snd_else_rcv, std::move(on_active_ev_func));
350 * });
351 * ~~~
352 *
353 * We are now ready for the last sub-piece: named `my_rcv_blob_sync_io_ev_wait()` in that snippet. This key piece
354 * responds to the instruction: async-wait for the specified event for me please, and let me know when that event
355 * is active by calling `(*on_active_ev_func)()` at that time. The details of how exactly to do this can be
356 * found in the sync_io::Event_wait_func doc header -- including tips on integrating with `poll()` and `epoll_*()`.
357 * Here we will assume you have a boost.asio loop, continuing from the above snippet. The simplest possible
358 * continuation:
359 *
360 * ~~~
361 * void my_rcv_blob_sync_io_ev_wait(Asio_waitable_native_handle* hndl_of_interest,
362 * bool ev_of_interest_snd_else_rcv,
363 * Task_ptr&& on_active_ev_func)
364 * {
365 * // sync_io wants us to async-wait. Oblige.
366 * hndl_of_interest->async_wait(ev_of_interest_snd_else_rcv
367 * ? Asio_waitable_native_handle::Base::wait_write
368 * : Asio_waitable_native_handle::Base::wait_read,
369 * [this, on_active_ev_func = std::move(on_active_ev_func)]
370 * (const Error_code& err_code)
371 * {
372 * if (err_code == boost::asio::error::operation_aborted) { return; } // Shutting down? Get out.
373 *
374 * // Event is active. sync_io wants us to inform it of this. Oblige.
375 * (*on_active_ev_func)();
376 * // ^-- THAT POTENTIALLY INVOKED COMPLETION HANDLER F() WE PASS IN TO x.async_receive_blob(F)!!!
377 * });
378 * }
379 * ~~~
380 *
381 * That's it. Note well: `(*on_active_ev_func)()` is (a) informing `sync_io`-pattern-implementing `x` of the
382 * event it (`x`) wanted; and (b) possibly getting the result *you* wanted in the original `async_receive_*()` call.
383 * Thus it is the equivalent of the 2nd `SSL_read()` call, after satisying the `SSL_ERROR_WANT_READ` or
384 * `SSL_ERROR_WANT_WRITE` result. The only difference, really, is the mechanic of getting the result is through
385 * a *synchronous* call to your completion handler.
386 *
387 * There is, however, an extremely important subtlety to consider additionally. The key point:
388 * - `(*on_active_ev_func)()` should be thought of as an API -- a method! -- of `x`, the
389 * `sync_io`-pattern-implementing Flow-IPC object...
390 * - ...right alongside (in our use case here) `.async_receive_blob()`.
391 *
392 * That means it is *not* thread-safe to call `(*on_active_ev_func)()` concurrently with
393 * `x.async_receive_blob()`. What does this mean in practice? Answer:
394 * - If you execute all these things in a single-threaded event loop -- nothing. You need not worry about
395 * synchronization.
396 * - If you perform some of your work on `x` (perhaps the `.async_*()` calls) in one thread but
397 * the async-wait handling -- and the potentially resulting *synchronous* invocation of your own completion
398 * handlers -- in another...
399 * - ...then you, at a minimum, need a mutex-lock (or use of a strand, or ...) around the call sites into
400 * `x.method(...)`, for all related `method`s of `x`...
401 * - ...*including* the call to `(*on_active_ev_func)()`.
402 *
403 * Chances are, though, that if you're operating in multiple threads, then anyway you'll need to protect *your own*
404 * data structures against concurrent writing. Remember: `(*on_active_ev_func)()` may well invoke
405 * *your own completion handler* from the original `x.async_receive_blob(..., F)` call. That code (*your* code)
406 * is more than likely to react to the received payload in some way, and that might be touching data structures
407 * accessed from 2+ threads. It would therefore make sense that the relevant mutex be already locked. In this
408 * example we presuppose your application might invoke async-receives from a thread U while placing handlers
409 * onto a thread W:
410 *
411 * ~~~
412 * using flow::util::Lock_guard;
413 *
414 * ...
415 * // In your thread U:
416 *
417 * Lock_guard<decltype(m_my_rcv_mutex)> lock(m_my_rcv_mutex); // <-- ATTN! Protects x receive ops at least.
418 * ...
419 * ipc::Error_code sync_err_code;
420 * size_t sync_sz;
421 * x.async_receive_blob(m_target_msg,
422 * &sync_err_code, &sync_sz,
423 * [this](const ipc::Error_code& err_code, size_t sz) { on_msg_in(err_code, sz); });
424 * if (sync_err_code != ipc::transport::error::Code::S_SYNC_IO_WOULD_BLOCK)
425 * {
426 * // ...Handle contents of m_target_msg, or sync_err_code indicating error ion -- perhaps ~like on_msg_in()!
427 * // Note m_my_rcv_mutex is locked... as it would be in on_msg_in() (which won't run in this case)!
428 * }
429 * // else { on_msg_in() will run later. x.receive_...() probably invoked my_rcv_blob_sync_io_ev_wait(). }
430 * ...
431 *
432 * void my_rcv_blob_sync_io_ev_wait(Asio_waitable_native_handle* hndl_of_interest,
433 * bool ev_of_interest_snd_else_rcv,
434 * Task_ptr&& on_active_ev_func)
435 * {
436 * // In your thread U:
437 *
438 * // sync_io wants us to async-wait. Oblige. (Do not lock m_my_rcv_mutex!)
439 * hndl_of_interest->async_wait(ev_of_interest_snd_else_rcv
440 * ? Asio_waitable_native_handle::Base::wait_write
441 * : Asio_waitable_native_handle::Base::wait_read,
442 * [this, on_active_ev_func = std::move(on_active_ev_func)]
443 * (const Error_code& err_code)
444 * {
445 * if (err_code == boost::asio::error::operation_aborted) { return; } // Shutting down? Get out.
446 *
447 * // In your thread W:
448 *
449 * // Event is active. sync_io wants to inform it of this. Oblige.
450 *
451 * flow::util::Lock_guard<decltype(m_my_rcv_mutex)> lock(m_my_rcv_mutex); // <-- ATTN!
452 *
453 * (*on_active_ev_func)();
454 * // ^-- THAT POTENTIALLY INVOKED on_msg_in()!!!
455 * });
456 * }
457 *
458 * void on_msg_in(const Error_code& err_code, size_t sz)
459 * {
460 * // In your thread W:
461 * // m_my_rcv_mutex is locked!
462 *
463 * ... // Handle contents of m_target_msg, or err_code indicating error.
464 * }
465 * ~~~
466 *
467 * Just remember: *You* choose when `x` does anything that might touch your data, or itself. This happens
468 * in exactly three possible places and always synchronously:
469 * - The `x.async_receive_blob()` initiating call itself (and ones like it, all having async-I/O almost-identical-sig
470 * counterparts in `X::` for any given `sync_io::X::`).
471 * - OpenSSL analogy: initial `SSL_read()`.
472 * - Your code reacting to non-would-block return from `x.async_receive_blob()`, right after it (and ones like it).
473 * - OpenSSL analogy: reacting to initial `SSL_read()` succeeding right away.
474 * - `(*on_active_ev_func)()`.
475 * - OpenSSL analogy: subsequent `SSL_read()`, once you've detected its `SSL_ERROR_WANT_*` has been satisfied.
476 *
477 * ### Is there always a completion handler? ###
478 * Answer: No. An async op might not have a completion handler, but it is still an async op and may need to
479 * ask you to async-wait for some handle to be readable and/or writable to work properly. The most prominent
480 * case of this is sending items as exemplified by ipc::transport::sync_io::Native_socket_stream::send_blob().
481 * How to handle it? Well, it's the same as any `async_*()` op (which shall always take a completion handler) --
482 * but simpler. Simply put: Such an op shall be initiated by a method that takes no completion handler arg, so
483 * you simply don't provide one. (By convention it will also lack an `async_` prefix in the method name.)
484 * Beyond that, everything is the same:
485 * - It may (or may not) initiate an async-wait by calling the `Event_wait_func` you supplied via
486 * `start_*_ops()`.
487 * - Your `Event_wait_func` should indeed -- on successful async-wait -- invoke `(*on_active_ev_func)()`.
488 * Internally that may continue, or complete, whatever async processing `x` needs to do.
489 * - The difference: it won't invoke some completion handler of yours... as you didn't (and couldn't) provide one.
490 *
491 * ### Multiple op-types in a given `sync_io`-pattern-implementing object ###
492 * Consider, first, ipc::session::sync_io::Session_server. It has only one operation, really:
493 * session::sync_io::Session_server::async_accept(). To set it up, you'll do (as explained above by another example
494 * use case): `x.start_accept_ops()`, possibly preceded by `x.replace_event_wait_handles()` (but that guy is beside
495 * the point).
496 *
497 * Easy enough. The `x` can't do any other "type of thing." Same is true of, say,
498 * ipc::transport::sync_io::Blob_stream_mq_sender: it can only `x.send_blob()` (and `x.auto_ping()`,
499 * `x.end_sending()`, `x.async_end_sending()` -- all of which deal with sending messages *out*); and needs only
500 * `x.start_send_blob_ops()` for setup.
501 *
502 * What if an object can do multiple things though? `Native_socket_stream` (operating as a `Blob_sender` and
503 * `Blob_receiver`) can do at least 2: it can
504 * - send (`.send_blob()`, `.end_sending()`, `.async_end_sending()`, `.auto_ping()`),
505 * - and receive (`.async_receive_blob()`, `.idle_timer_run()`).
506 *
507 * These are 2 distinct *op-types*, and each one has its own independent API started via
508 * `.start_send_blob_ops()` and `.start_receive_blob_ops()` respectively. (If it were networked -- this might
509 * be in the works -- it would probably gain a 3rd op-type via `.start_connect_ops()`.)
510 *
511 * Not that interesting in and of itself; but what about concurrency? Answer:
512 *
513 * Things only get interesting once 2+ op-types (in practice, as of this writing, it's
514 * 2 at most in all known use cases) can occur concurrently. Namely that's
515 * sending (started via `.start_send_blob_ops()` in this case) and receiving (`.start_receive_blob_ops()`).
516 * How does it work? Answer: Formally speaking, it's described in the appropriate class's doc header; in this case
517 * ipc::transport::sync_io::Native_socket_stream. Informally the intuition behind it is as follows:
518 * - In a full-duplex object (like this one), you can indeed send and receive at the same time.
519 * That is, e.g., you can interleave an async-receive-blob op and its eventual completion with
520 * invoking something send-related and its eventual completion (if applicable).
521 * - Moreover you can use the API of op-type "receive" *actually concurrently* with the API of op-type "send."
522 * It *is* thread-safe, even though you're ostensily mutating the same object `x`.
523 * This is analogous to being able to concurrently `read(fd, ...)` and `write(fd, ...)` on the same OS handle
524 * `fd` for many socket types including TCP and Unix-domain-socket. This may, all in all, lead to significant
525 * perf savings in your application, if you have multiple threads available to handle sending and receiving.
526 * If you *do* need a mutex, you can have 1 mutex for send ops and 1 mutex for receive ops (assuming of course
527 * your algorithm doesn't involve the 2 directions' algorithms concurrently mutating the same outside-`x` data).
528 *
529 * Do keep in mind: This applies to `sync_io::X` only -- not its async-I/O counterpart `X`. E.g., you cannot do
530 * `x.async_receive_blob()` and `x.send_blob()` concurrently, in the latter case, but you can too do so in the former.
531 * With plain `X` you'd need to mutex-protect (or use strands or ...) `x`. However you can of course still have 2+
532 * async operations outstanding simultaneously (e.g., initiating a send while an `.async_receive_*(..., F)` has not yet
533 * finished as signified by `F()`): you just cannot literally call mutating parts of `x` API concurrently.
534 *
535 * @see ipc::util::sync_io::Event_wait_func -- details about hooking up your event loop to a
536 * `sync_io`-pattern-implementing Flow-IPC object.
537 * @see ipc::util::sync_io::Asio_waitable_native_handle -- take a look particularly if your event loop is built on
538 * boost.asio.
539 * @see ipc::transport::Native_socket_stream internal source code: This exemplifies a fairly advanced
540 * "eat-our-own-dog-food" usage of a `sync_io`-pattern-implementing API
541 * (ipc::transport::sync_io::Native_socket_stream in this case) in a multi-threaded setting
542 * (user's thread U and internal worker thread W in this case). In this case the event loop is a boost.asio
543 * one.
544 *
545 * @todo Write an example of `sync_io`-pattern use with an old-school reactor-pattern event loop, using
546 * `poll()` and/or `epoll_*()`.
547 *
548 * @internal
549 *
550 * @see ipc::util::sync_io::Timer_event_emitter -- how to hook up a boost.asio `Timer` inside
551 * a `sync_io`-pattern-implementing Flow-IPC object.
552 */
553namespace ipc::util::sync_io
554{
555
556// Types.
557
558// Find doc headers near the bodies of these compound types.
559
560class Asio_waitable_native_handle;
561
562/**
563 * Short-hand for ref-counted pointer to a `Function<>` that takes no arguments and returns nothing;
564 * in particular used for `on_active_ev_func` arg of sync_io::Event_wait_func.
565 *
566 * ### Rationale ###
567 * This is defined here because of its central role in sync_io::Event_wait_func (see that doc header).
568 *
569 * Why wrap it in a smart pointer at all as opposed to passing around `Function<>`s as objects (particularly
570 * as arg to #Event_wait_func)? Answer: Performance. Our `sync_io` pattern is intended for the highly
571 * perf-conscious user, to the extent they'd forego the significantly easier to use async-I/O pattern
572 * just because that would involve involuntary (from the user's point of view) thread creation and context
573 * switching; copying or moving polymorphic functors, including all their captures, is an unnecessary expense.
574 *
575 * In that case why `shared_ptr`, not `unique_ptr`, given that it adds potential ref-counting behind the scenes?
576 * Answer: `unique_ptr` would have been nice; however it is likely the user (and/or internal Flow-IPC code)
577 * will want to lambda-capture the wrapped `Task`, and capturing movable-but-not-copyable types like `unique_ptr`
578 * does not compile (as of C++17), even if one never copies the capturing lambda. One would need to upgrade
579 * to `shared_ptr` to capture, and that is annoying.
580 *
581 * @note That said it is recommended that one `std::move()` any #Task_ptr whenever possible, such as when
582 * capturing it in a lambda (e.g., `[task_ptr = std::move(task_ptr)`). This advice applies generally
583 * to all `shared_ptr` captures (*"whenever possible"* being important), but this is just a reminder.
584 */
585using Task_ptr = boost::shared_ptr<Task>;
586
587/**
588 * In `sync_io` pattern, concrete type storing user-supplied function invoked by pattern-implementing
589 * ipc::transport and ipc::session object to indicate interest in an I/O status event (writable, readable)
590 * for a particular Native_handle.
591 *
592 * @see ipc::util::sync_io doc header first. It explains the pattern in detail including example code for
593 * setting up an `Event_wait_func`.
594 *
595 * Use in `sync_io` pattern
596 * ------------------------
597 * Suppose `T` is an ipc::transport or ipc::session object type, always in a `"sync_io"` sub-namespace, that
598 * operates according to the `sync_io` pattern. (For example, `T` might be transport::sync_io::Native_socket_stream.)
599 * Then `T::start_X_ops(Event_wait_func&&)`, and/or a compatible template, must be invoked to begin unidirectional work
600 * of type X (e.g., `start_X_ops()` might be `start_send_blob_ops()` or
601 * `start_receive_native_handle_ops()`) on a given `T`; the `T` memorizes the function until its destruction.
602 *
603 * From that point on, the `T` might at times be unable to complete an operation (for example
604 * transport::sync_io::Native_socket_stream::send_blob()) synchronously due to a would-block condition on some
605 * internal Native_handle (for example, internally, the Unix domain stream socket in this case encountering
606 * would-block on a `"::writemsg()"` attempt). It shall then -- synchronously, inside
607 * `Native_socket_stream::send_blob()` -- invoke the saved #Event_wait_func and pass to it the following arguments:
608 * - `hndl_of_interest`: The native handle on which the user's own event loop must wait for an I/O event.
609 * In-depth tips on how to do so are below; but for now:
610 * - If your event loop is built on boost.asio, you may use `hndl_of_interest->async_wait()` directly.
611 * - Otherwise (if you're using `[e]poll*()` perhaps), obtain the raw handle as follows:
612 * `hndl_of_interest->native_handle().m_native_handle`. It can be input to `poll()`, `epoll_ctl()`, etc.
613 * - `ev_of_interest_snd_else_rcv`: Which event it must await: `true` means "writable"; `false` means "readable."
614 * - What if the user's wait (such as `epoll_wait()`) encounters an error-occurred event instead
615 * (`EPOLLERR`)?
616 * Answer: They must in fact report this as-if the requested event (whether writable or readable) is active.
617 * - `on_active_ev_func`: If (and, to avoid pointless perf loss, only if) the above-specified event is active,
618 * the user must invoke `(*on_active_ev_func)()` (without args and expecting no return value).
619 * - In terms of thread safety, and generally, one should consider this function a non-`const` member of `T`'s
620 * sub-API. (The sub-API in question is the set of methods that correspond to unidirectional-operation
621 * of type X, where `T::start_X_ops()` was invoked to kick things off. For example, in `Native_socket_stream`
622 * as used as a Blob_sender, that's its `send_blob()` and `*end_sending()` methods.)
623 * That is, `(*on_active_ev_func)()` may not be called concurrently to any `T` sub-API method
624 * (`Native_socket_stream::send_blob()`, `*end_sending()` in the recent example) or other
625 * `(*on_active_ev_func)()`. `(*on_active_ev_func)()` may well, itself, synchronously invoke
626 * `Event_wait_func` to indicate interest in a new event.
627 *
628 * Naturally the key question arises: what, specifically, should a particular #Event_wait_func (as passed-into
629 * a `T::start_X_ops()`) *do*? Indeed, this requires great care on the `sync_io` pattern user's part.
630 *
631 * Formally speaking the contract is as follows. Let `F` be the particular `Event_wait_func`.
632 * -# Upon `F()` being called, it shall register -- through a technique of the user's choice (a couple are
633 * explained below for your convenience) -- the specified event as one of interest. The sooner this registration
634 * occurs, the more responsively `T` will behave.
635 * -# It shall arrange, via that same technique, to do the following upon detecting the
636 * event (or `hndl_of_interest` becoming hosed, i.e., the error event). The sooner it does so, the more
637 * responsively `T` will behave.
638 * -# *Deregister* the specified event. Each `Event_wait_func` invocation indicates a **one-off** wait.
639 * -# Call `(*on_active_ev_func)()`. (It is best, for a little perf bump, for the user
640 * to save `std::move(on_active_ev_func))` as opposed to a smart-pointer copy.)
641 *
642 * @warning Do not forget to deregister the event before `(*on_active_ev_func)()`. Failure to do so can easily
643 * result in processor pegging at best; or undefined behavior/assertion tripping.
644 * Worse still, if it's mere processor pegging, you might not notice it happening.
645 * E.g., if `Native_socket_stream::send*()` encounters would-block internally,
646 * it will register interest in writability of an internal handle; suppose when you report writability
647 * it is able to push-through any queued internal payload. Now it no longer needs writability; if
648 * informed of writability anyway, it will at best do nothing -- leading to an infinite loop
649 * of user reporting writability and `Native_socket_stream` ignoring it. Or, if that is how
650 * `Native_socket_stream` is written, it will detect that a write event is being reported despite it
651 * not asking for this and log WARNING or even FATAL/abort program. With `epoll_*()`, `EPOLLONESHOT` and/or
652 * `EPOLLET` may be of aid, but be very careful.
653 *
654 * ### Integrating with reactor-pattern `poll()` and similar ###
655 * Suppose your application is using POSIX `poll()`. Typically a data structure will be maintained mirroring
656 * the `fds[].events` (events of interest) sub-argument to `poll()`; for example an `unordered_map<>` from FD
657 * (Native_handle::m_native_handle) to an `enum { S_NONE, S_RD, S_WR, S_RD_WR }`; or simply the `fds[]` array itself.
658 * (In the latter case lookup by FD may not be constant-time. However `fds[].events` does not need to be built
659 * from a mirroring structure ahead of each `poll()`.)
660 *
661 * When `Event_wait_func F(hndl, snd_else_rcv, on_ev_func)` is invoked, you will register the event:
662 * -# If necessary, insert `raw_hndl = hndl->native_handle().m_native_handle` into the data structure,
663 * with no events of interest (which will change in the next step).
664 * If not necessary (already inserted, hence with a (different) event of interest already), continue to next step.
665 * -# Depending on `snd_else_rcv`, change the events-of-interest `enum` NONE->RD, NONE->WR, WR->RD_WR, or
666 * RD->RD_WR. (If operating on an `fd[].events` directly, that's: `fd.events = fd.events | POLLIN` or
667 * `fd.events = fd.events | POLLOUT`.)
668 * -# In some data structure keyed on, conceptually, the pair `(raw_hndl, bool snd_else_rcv)`,
669 * record `std::move(on_ev_func)`. Call it, say, `ipc_events_map`.
670 *
671 * At some point in your reactor-pattern event loop, you are ready to call `poll()`. Construct `fds[].events` array
672 * if needed, or just use the long-term-maintained one, depending on how you've set this up. Call `poll()`.
673 * For each *individual* active event in an `fds[].revents`:
674 * -# Construct the pair `(raw_hndl, snd_else_rcv)`.
675 * -# If it's not in `ipc_events_map`, no IPC-relevant event fired; it must be something relevant to other
676 * parts of your application (such as network traffic). Exit algorithm for now (until next `poll()`);
677 * the IPC-registered event is still being waited-on.
678 * -# If it *is* in `ipc_events_map`:
679 * -# Remove it from there.
680 * -# Deregister the event w/r/t next `poll()`:
681 * -# Depending on `snd_else_rcv`: Change the events-of-interest `enum` RD->NONE, WR->NONE, RD_WR->WR, or
682 * RD_WR->RD. (If operating on an `fd[].events` directly, that's `fd.events = fd.events & ~POLLIN)` or
683 * `fd.events = fd.events & ~POLLOUT`.)
684 * -# If the events-of-interest for the FD have become NONE (or `fd.events == 0` if tracking it directly),
685 * delete the handle's entry from the `events`-mirroring structure (or `events` itself if tracking it
686 * directly).
687 * -# Invoke the saved `(*on_ev_func)()`.
688 *
689 * @note While each `F()` invocation indicates one-off event interest, the handler `(*on_ev_func)()` will sometimes,
690 * or possibly frequently, re-indicate interest in the same event within its own handler. If done very
691 * carefully, it *might* be possible to detect this situation by deferring the editing of `events` or
692 * the mirroring structure until `(*on_ev_func)()` finishes; possibly this would net to making no change to
693 * the events-of-interest structure. This could save some processor cycles. I (ygoldfel) would recommend
694 * only getting into that if compelling perf savings evidence appears.
695 *
696 * @note You might notice `hndl` is a pointer to Asio_waitable_native_handle, but you simply get `raw_hndl`
697 * out of it and forget about the rest of `*hndl`. Why not just provide `raw_hndl` to you? Answer:
698 * It is useful when *not* integrating with a reactor-pattern event loop a-la `poll()` or similar but
699 * rather when integrating with a boost.asio event loop. See "Integrating with boost.asio" below.
700 *
701 * ### What about `epoll_*()`? ###
702 * In Linux `epoll_*()` is considered superior (its `man` page at least says that, when used in non-`EPOLLET`
703 * mode, it is a "faster" `poll()`). We leave the exercise of how to apply the above suggestions (for `poll()`)
704 * to `epoll_*()` to the reader. Briefly: Essentially `epoll_ctl()` lets the kernel track a long-running `fds[].events`
705 * sub-array, with add/remove/modify operations specified by the user as syscalls. However, it's not possible to simply
706 * say "I am interested in FD X, event writable"; the events-of-interest per FD are still specified as
707 * an ORing of `EPOLLIN` and/or `EPOLLOUT` in one per-FD entry, whereas `Event_wait_func` is finer-grained than that.
708 * It is not possible to iterate through the kernel-stored events-of-interest set or obtain the existing
709 * `events` bit-mask so as to then `|=` or `&=` it. Therefore a mirroring data structure (such as the
710 * aforementioned `unordered_map<>` from FD to rd/wr/rd-wr) may be necessary in practice. In my (ygoldfel)
711 * experience using such a mirroring thing is typical in any case.
712 *
713 * One tip: `EPOLLONESHOT` may be quite useful: It means you can limit your code to just doing
714 * `epoll_ctl(...ADD)` without requiring the counterpart `epoll_ctl(...DEL)` once the event does fire.
715 * (`EPOLLET` may be usable to decrease the number of epoll-ctl calls needed further, but as of this writing
716 * we haven't investigated this sufficiently to make a statement.)
717 *
718 * However `EPOLLONESHOT` pertains to the entire descriptor, not one specific waited-on event (read or write).
719 * Therefore this optimization is helpful only if `.events` is being set to `EPOLLIN` or `EPOLLOUT` -- not
720 * `EPOLLIN | EPOLLOUT`. Be careful.
721 *
722 * ### Integrating with boost.asio ###
723 * Suppose your application is using boost.asio (possibly with flow.async to manage threads),
724 * with a `flow::util::Task_engine::run()` comprising the event loop, in proactor pattern fashion.
725 * (We would recommend just that, preferring it to an old-school reactor-pattern via `[e]poll*()` directly.)
726 * *Your* loop is asynchronously expressed, but you can still use the `sync_io` pattern to graft
727 * Flow-IPC operations into it, so that they are invoked synchronously, when you want, in the exact thread you
728 * want. In fact, doing so is *significantly* simpler than integrating with a reactor-style `[e]poll*()`.
729 * That is because, conceptually, each `Event_wait_func` invocation is essentially expressing the following
730 * boost.asio operation:
731 *
732 * ~~~
733 * some_low_level_transport.async_wait(wait_write, // (or `wait_read`.)
734 * [...](const Error_code& err_code)
735 * {
736 * // Can do I/O, or error? Do I/O, or handle error.
737 * // ...If no error then possibly even at some point continue the async-op chain...
738 * same_or_other_low_level_transport.async_wait(wait_write, // (or `wait_read`.)
739 * ...);
740 * }
741 * ~~~
742 *
743 * Because of that you have to code essentially none of the stuff above (pertaining to a reactor-style `[e]poll*()`
744 * loop): no manual registering/deregistering, wondering how to handle error-event, etc. etc.
745 *
746 * So what to do, specifically? It is quite straightforward. Suppose you're got `Task_engine E` doing
747 * `E.run()` as your event loop. (Any `flow::async::*_loop` does that too, just internally once you do
748 * `loop.start()`.) E.g., if you've got TCP sockets attached to `E`, you might be doing
749 * `m_tcp_sock.async_read_some(..., [](const Error_code& err_code) { ... };` and so on.
750 *
751 * When `Event_wait_func F(hndl, snd_else_rcv, on_ev_func)` is invoked, you will simply do:
752 *
753 * ~~~
754 * hndl->async_wait(snd_else_rcv ? Asio_waitable_native_handle::Base::wait_write
755 * : Asio_waitable_native_handle::Base::wait_read,
756 * [on_ev_func = std::move(on_ev_func)](const Error_code& err_code)
757 * {
758 * if (err_code != boost::asio::error::operation_aborted)
759 * {
760 * (*on_ev_func)(); // Note: err_code is disregarded. Whether it's readable/writable or hosed, must invoke.
761 * }
762 * // else { Stuff is shutting down; do absolutely nothing! SOP on operation_aborted generally. }
763 * }
764 * ~~~
765 *
766 * Basically, Flow-IPC wants to do an `async_wait()`... so you do it for Flow-IPC, being the master of your
767 * (boost.asio) domain (so to speak).
768 *
769 * `hndl` is an `Asio_waitable_native_handle*`. Asio_waitable_native_handle is a razor-thin wrapper around
770 * a boost.asio `posix::descriptor` which itself is a thin wrapper around a native handle (FD). It has
771 * boost.asio-supplied `.async_wait()`. However, and this is a key point:
772 *
773 * To make it work, before invoking `T::start_X_ops()`, you must supply your execution context/executor -- usually
774 * a boost.asio `Task_engine` (a/k/a `boost::asio::io_contexst`) or strand (`boost::asio::io_context::strand`) --
775 * w/r/t which you plan to `.async_wait()` down the line. This is done via `T::replace_event_wait_handles()`,
776 * an otherwise optional call. If using flow.async, this might be (e.g.):
777 *
778 * ~~~
779 * flow::async::Single_thread_task_loop m_your_single_threaded_event_loop;
780 * ipc::transport::Native_socket_stream m_sock_stream;
781 * // ...
782 * m_sock_stream.replace_event_wait_handles([this]() -> auto
783 * {
784 * return ipc::util::async_io::Asio_waitable_native_handle
785 * (*(m_your_single_threaded_event_loop.task_engine()));
786 * });
787 * m_sock_stream.start_send_blob_ops(F); // F is your Event_wait_func.
788 * ~~~
789 *
790 * @note As explained in "Integrating with reactor-pattern..." above, `hndl` is
791 * a boost.asio I/O object, as opposed to just a Native_handle or even Native_handle::handle_t,
792 * specifically for the boost.asio integration use case. If *not* integrating with boost.asio,
793 * `start_X_ops()` is to be used without preceding it by `replace_event_wait_handles()`,
794 * and `hndl->native_handle()` is the only meaningful part of `*hndl`, with `.async_wait()` being meaningless
795 * and unused. Conversely, if integrating with boost.asio, `hndl->native_handle()` itself should not be
796 * required in your code, while `.async_wait()` is the only meaningful aspect of `*hndl`.
797 */
799 bool ev_of_interest_snd_else_rcv,
800 Task_ptr&& on_active_ev_func)>;
801
802} // namespace ipc::util::sync_io
Useful if using the sync_io pattern within a user event loop built on boost.asio (optionally with flo...
Contains common code, as well as important explanatory documentation in the following text,...
Definition: util_fwd.hpp:208
Function< void(Asio_waitable_native_handle *hndl_of_interest, bool ev_of_interest_snd_else_rcv, Task_ptr &&on_active_ev_func)> Event_wait_func
In sync_io pattern, concrete type storing user-supplied function invoked by pattern-implementing ipc:...
boost::shared_ptr< Task > Task_ptr
Short-hand for ref-counted pointer to a Function<> that takes no arguments and returns nothing; in pa...
flow::Function< Signature > Function
Short-hand for polymorphic functor holder which is very common. This is essentially std::function.
Definition: common.hpp:301