Flow 1.0.0
Flow project: Full implementation reference.
concurrent_task_loop.hpp
Go to the documentation of this file.
1/* Flow
2 * Copyright 2023 Akamai Technologies, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the
5 * "License"); you may not use this file except in
6 * compliance with the License. You may obtain a copy
7 * of the License at
8 *
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in
12 * writing, software distributed under the License is
13 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
14 * CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing
16 * permissions and limitations under the License. */
17
18/// @file
19#pragma once
20
22#include "flow/async/op.hpp"
24#include "flow/util/util.hpp"
25
26namespace flow::async
27{
28
29// Types.
30
31/**
32 * The core flow::async interface, providing an optionally multi-threaded thread pool onto which runnable `Task`s,
33 * optionally arranged into concurrency-avoiding `Op`s, can be boost.asio-posted for subsequent execution.
34 *
35 * ### Thread safety ###
36 * All methods are thread-safe for read-write on a shared Concurrent_task_loop, after its ctor returns, unless
37 * otherwise specified (but read on). This is highly significant, just as it is highly significant that boost.asio's
38 * `Task_engine::post()` is similarly thread-safe. However, it is *not* safe to call either stop() or start()
39 * concurrently with itself or the other of the two, on the same Concurrent_task_loop.
40 *
41 * ### First, select subclass to instantiate ###
42 * Whenever the user needs a pool of task-executing threads, meaning threads awaiting user-supplied work, to be
43 * `post()`ed (etc.) in the boost.asio sense, they'll create a concrete subclass of this interface
44 * (the choice perhaps based on configuration, e.g.). The choice of subclass determines how tasks will be scheduled
45 * internally across threads, but the user need not worry about that after construction.
46 *
47 * If your task loop is fundamentally single-threaded -- which is *extremely* common and *typically* does *not*
48 * generalize to a multi-threaded one easily -- then instead use the adapter Single_thread_task_loop which is *not*
49 * a part of Concurrent_task_loop hierarchy but *does* use the relevant parts of it internally.
50 *
51 * If you choose Single_thread_task_loop then it is not necessary to read further.
52 *
53 * ### Next, call start() ###
54 * This starts the actual threads in the thread pool. Hence subsequent `post(F)` (etc.) will cause `F()` to
55 * be able to run in one of the threads. Some advanced points:
56 * - You may post() (etc.) before start(). This will do no work but queue it up until start().
57 * - You may call stop() to synchronously, gracefully exit the threads. Any `post()`ed (etc.) that hadn't
58 * yet run will remain queued. Any post() (etc.) until the next start() will (again) do no work and (again)
59 * remain queued until start().
60 * - If you save the (ref-counted) util::Task_engine from task_engine() or from an async::Op via
61 * op_to_exec_ctx(), you may use it even after `*this` is destroyed. It is then your responsibility to
62 * start thread(s) in which to actually execute its tasks. Any queued tasks on the `Task_engine` will remain
63 * queued until then.
64 * - (This start/stop/run/post paradigm may be familiar to boost.asio (particularly `boost::asio::io_context`) users.)
65 *
66 * ### Next, post() tasks on it: create_op() to group task into operations ###
67 * One can post() a task (in the same way one would simply `Task_engine::post()`). If one wants to execute an async
68 * op with 2+ non-concurrent tasks, they would pass the same async::Op to post() for each of the aforementioned 2+
69 * `Task`s (which are simply `void` no-arg functions basically). An async::Op can be created
70 * via create_op(); or if the task must be pinned to a specific pre-made per-software-thread async::Op,
71 * these are accessible via per_thread_ops().
72 *
73 * New applications should strive to use only create_op() and not touch the advanced-yet-legacy-ish
74 * per_thread_ops() facility. In a classic async-task-based-event-loop algorithm, it should be sufficient to
75 * execute `Task`s -- sprinkling in `Op` tags when certain tasks together comprise multi-async-step ops --
76 * via one of the post() overloads. Hence simply make an `Op` via create_op() to associate `Task`s with each other,
77 * and that should be enough for most async algorithms.
78 *
79 * Note also the optional `Synchronicity synchronicity` argument to the `post()` methods. By default this acts
80 * like regular `Task_engine::post()`, but you can also access `Task_engine::dispatch()` type of behavior;
81 * you can wait for the task to complete using yet another mode. The latter feature,
82 * Synchronicity::S_ASYNC_AND_AWAIT_CONCURRENT_COMPLETION, may be particularly helpful at initialization time, such
83 * as if one needs to perform some startup tasks in the new thread(s) before continuing to general work on
84 * the loop. E.g., subclasses might, for convenience, wrap this ability in their constructors, so that the user can
85 * optionally provide an initializing task to run before the ctor returns.
86 *
87 * Lastly Synchronicity::S_ASYNC_AND_AWAIT_CONCURRENT_START may be quite helpful in ensuring a certain
88 * task definitely does run -- without waiting for it to actually run, but merely *begin* to run. More specifically
89 * `L.post(F, Synchronicity::S_ASYNC_AND_AWAIT_CONCURRENT_START); L.stop();` will ensure `F()` will run at some point --
90 * but not wait for its completion. `L.post(F); L.stop();` will not ensure this at all; whether it does or not
91 * is a matter of luck. `L.post(F, Synchronicity::S_ASYNC_AND_AWAIT_CONCURRENT_COMPLETION); L.stop();` *will* ensure
92 * it but also force a wait until `F()` finishes which may not be necessary and hence increases latency/responsiveness
93 * in the calling thread.
94 *
95 * ### And/or: Use per_thread_ops() to associate `Task`s with specific threads ###
96 * There are cases when the simple, conceptual approach just described (using create_op() only, if an `Op` is even
97 * desired at all) is not sufficient. Another approach is to pre-create N `Op`s, where N is the number of threads
98 * in the thread pool, and instead of create_op() one can randomly choose one of those N `Op`s and `post()` onto that
99 * one (an `Op` is an `Op` regardless of how it was obtained). Informally, there are 2 known categories of use cases
100 * for this, with overlap:
101 * - The legacy use case: Major daemons exist that were architected decades before flow::async was even conceived,
102 * so they understandably achieve some of the same aims (to wit, non-concurrency of tasks in an op) by working with
103 * the lower-level/less abstract notion of a thread in a pool. Basically, if an `Op` corresponds to a worker
104 * thread specifically, then internally 2 `Task`s being assigned to that 1 `Op` would just mean executing them
105 * on that `Op`'s corresponding worker thread; threads are serial by definition, so the `Op` semantics are trivially
106 * satisfied. So to support such legacy designs, the per-thread-pre-created in per_thread_ops() allow user to
107 * obtain either a randomly chosen or a specifically indexed 1 of N per-thread async::Op.
108 * - Example: Suppose a multi-threaded memory-caching server has a central data store and explicitly
109 * thread-local copies (periodically synchronized with the central one) thereof. With such a setup, it's
110 * a no-brainer to throw all work on a request that was originally assigned to thread 3 (random) of N *also* to
111 * per-thread Op #3 of N, for all `Task`s comprising that request's handling: Since all tasks operating in
112 * `Op` (therefore thread) #3 by definition execute non-concurrently, no locks are necessary when working
113 * with thread #3's thread-local object store copy. Simply, only thread `i` of N will ever touch object store
114 * `i` of N, to the extent it can be explicitly declared thread-local (`thread_local` in C++11, or similar).
115 * - The thread-to-thread communication use case: This has come up in practice: If daemon 1 has N producer threads,
116 * and daemon 2 has N consumer threads, then one can set up N IPC queues, where thread `i` in either process
117 * accessing (writing and reading, respectively) only queue `i`. Then -- assuming the queue itself is
118 * safe against 1 read occurring concurrently with 1 write -- no further locking is required. Basically,
119 * thread `i` in daemon 1 deals with thread `i` in daemon 2, using a dedicated lock-free thread-`i`-access-only
120 * IPC queue.
121 *
122 * ### Timers have similarly streamlined API: schedule_task_from_now() ###
123 * post() is absolutely core, of course. However obviously sometimes one needs to wait *asynchronously* for some kind
124 * of event and THEN execute a task on that event. In particular, executing it simply after some specific time period
125 * passes is common and has a dedicated API. This is called *scheduling a task* in our parlance.
126 *
127 * If you want to schedule a task, first decide whether you need certain advanced capabilities. This is explained in
128 * the doc header for util::schedule_task_from_now(). If you decide you need advanced capabilities, then skip to the
129 * next subsection below, about general boost.asio I/O objects. Most of the time you won't, in which case read on:
130 *
131 * schedule_from_now() and schedule_at() in this Concurrent_task_loop interface provide all the capabilities of
132 * `util::schedule[d]_task*()` API. (Reminder: This includes canceling and short-firing the task with ease, more
133 * ease than if using the full on I/O util::Timer, which is -- again -- explained below.) Just as with post(), there
134 * is 1 version of each method for single tasks; and and 1 for operating within an async::Op, meaning the timer
135 * completion handler.
136 *
137 * ### General boost.asio objects can be fully used with Concurrent_task_loop ###
138 * Finally, there's general boost.asio "I/O object" work. An I/O object is usually a class -- within boost.asio itself
139 * or a custom object -- with 1 or more *asynchronous action* methods, always named in the style `async_*()`. To show
140 * how one would do this with Concurrent_task_loop, let's do it in somewhat formal fashion:
141 *
142 * Suppose you have boost.asio object X, for example `boost::asio::ip::tcp::socket` (in boost.asio itself) and
143 * flow::net_flow::asio::Peer_socket (a custom one), and we want to perform an `async_A()` action, which waits
144 * asynchronously for some event (e.g., a successful `tcp::socket::async_receive()` or
145 * `net_flow::asio::Peer_socket::async_send()`, and then executes a *completion handler* task F:
146 *
147 * ~~~
148 * flow::util::Task_engine E; // boost.asio user works directly with a Task_engine E running in 1+ threads.
149 * ...
150 * X_type X(&E);
151 * // A_target represents 0 or more mutable data structures (e.g., received-data target buffer) that sync_A() would
152 * // modify in the background.
153 * // A_settings represents 0 or more immutable/by-value args controlling behavior of the background sync_A() action.
154 * X.async_A(&A_target, A_settings, F);
155 * // Equivalent of X.sync_A(&A_target, A_settings) is now executing in background via Task_engine E!
156 * // Once done, it will act as if one called, from an unspecified thread: F(err_code, A_more_result_info).
157 * // - Error_code err_code is the success/failure result code. In particular, `!err_code == true` on success.
158 * // - A_more_result_info represents 0 or more other bits of immutable/by-copy info indicating the results of
159 * // the background action. For example, both `tcp::socket::async_receive()` and
160 * // `net_flow::Peer_socket::async_send()` will call `F(err_code, n)`, where `size_t n` is the # of bytes
161 * // received or sent respectively.
162 * ~~~
163 *
164 * That's the setup and should be familiar to boost.asio I/O object users. (Note that util::Timer is a (relatively
165 * simple) I/O object itself; it lacks `A_settings` (one makes a call like `expires_at()` separately before the
166 * actual async action) and `A_more_result_info` (as `err_code` is sufficient) in particular. It also lacks
167 * any `A_target`. It's clearly the degenerate example of an I/O object action.) So how to write the
168 * above when working with a Concurrent_task_loop instead of `Task_engine`?
169 *
170 * ~~~
171 * flow::async::Concurrent_task_loop L; // Work with generalized thread pool L, not a Task_engine E.
172 * ...
173 * // ATTN! A Task_engine is needed by boost.asio? Use our API -- task_engine() -- to get one.
174 * X_type X(L.task_engine());
175 * X.async_A(&A_target, A_settings, F);
176 * ~~~
177 *
178 * Almost everything is the same! Just need to call that API to obtain a `Task_engine` to use. As a result:
179 * - The background action, the equivalent of `X.sync_A()`, will be performed by some unspecified code in some
180 * unknown thread. We don't care how with boost.asio directly, and we don't care how with Concurrent_task_loop
181 * either.
182 * - Then, for some `async_A()`s it will modify `A_target`. This, too, will be done by an unspecified thread
183 * *with no locking guarantees*. Hence, one must not access `A_target` from application threads. As wit
184 * boost.asio direct use, this is typical. For example no app code would access the target data buffer of a
185 * receive operation.
186 * - Finally, `F()` will be posted on completion via `Task_engine L.task_engine()`. Of course we guarantee it
187 * will be in some thread in the thread pool `L`.
188 *
189 * Finally, then, suppose the original snippet above is modified to use a `Strand`, to guarantee non-concurrency with
190 * some other boost.asio handler(s). This would look like:
191 *
192 * ~~~
193 * flow::util::Task_engine E;
194 * flow::util::Strand S(&E); // Create a Strand that guarantees non-concurrency of handlers posted onto Task_engine E.
195 * ...
196 * X_type X(&E);
197 * X.async_A(&A_target, A_settings, boost::asio::bind_executor(S, F));
198 * ...
199 * Y_type Y(&E);
200 * Y.async_B(&B_target, B_settings, boost::asio::bind_executor(S, G));
201 * // X.sync_A() and Y.sync_B() are executing in background; F() and G() will run on respective completion;
202 * // but F() and G() shall run non-concurrently by virtue of being wrapped by the same Strand: S.
203 * ~~~
204 *
205 * To accomplish this with a `Concurrent_task_loop L`:
206 *
207 * ~~~
208 * flow::async::Concurrent_task_loop L;
209 * auto op J = L.create_op(); // ATTN! The syntax is different from Strands but the idea is identical.
210 * ...
211 * X_type X(L.task_engine());
212 * // ATTN! The syntax is again somewhat different from bind_executor(S, F), but the idea is equivalent.
213 * X.async_A(&A_target, A_settings, flow::async::asio_handler_via_op(&L, J, F));
214 * ...
215 * Y_type Y(L.task_engine());
216 * Y.async_B(&B_target, B_settings, flow::async::asio_handler_via_op(&L, J, G));
217 * // X.sync_A() and Y.sync_B() are executing in background; F and G will run on respective completion;
218 * // but F() and G() shall run non-concurrently by virtue of being wrapped by the same Op: J.
219 * ~~~
220 *
221 * However, now that you're working with an I/O object directly, you must be careful. Memorizing a `Task_engine`
222 * at construction has different effects depending on which concrete subclass of Concurrent_task_loop `L` is.
223 * Cross_thread_task_loop in particular will assign it to whichever thread is best. Segregated_thread_task_loop
224 * will keep using the same random-ish thread chosen when `L.task_engine()` is called. If you need particular
225 * behavior, you will need to strongly consider what to do: It is no longer totally generic behavior independent
226 * of the subclass, as it generally is when it comes to the post() and `schedule_*()` APIs.
227 *
228 * Lastly, if you are going down that road (which may be fully necessary) then consider
229 * the free function template op_to_exec_ctx() which is specialized
230 * for each concrete Concurrent_task_loop; it takes a loop and an async::Op as input; and returns a boost.asio
231 * "execution context" which can be passed -- much like a `Task_engine` in the above example -- to I/O object
232 * constructors. See the specializations -- as of this writing near Cross_thread_task_loop (returns util::Strand) and
233 * Segregated_thread_task_loop (returns util::Task_engine) at least. Choose between the above technique
234 * and op_to_exec_ctx() when working directly with a boost.asio-compatible I/O object.
235 *
236 * I am self-conscious at the length and seeming complexity of this formal writeup but must emphasize: This is
237 * using the same patterns as boost.asio users use. It's just a matter of mapping them to `flow::async` Flow module's
238 * generalized Concurrent_task_loop and async::Op APIs. Reminder: The benefit of this is that one uses
239 * boost.asio-equivalent semantics; yet the Concurrent_task_loop concrete subclass can implement it internally in
240 * various ways that are or *aren't* what a direct use of `Task_engine` would do. However, when using
241 * I/O objects -- as opposed to post() -- the genericness will be less generic. That is sometimes necessary.
242 *
243 * TL;DR: Any boost.asio-style (whether from boost.asio itself or custom) I/O object is to be used as normal, but:
244 * To get a `Task_engine`, use task_engine(). To get a `Strand`-like thing util::Op, use create_op().
245 * To *use* the `Strand`-like thing util::Op, use asio_handler_via_op(). Alternatively use actual `Task_engine`s or
246 * `Strand`s directly if necessary; see op_to_exec_ctx() specializations for that purpose.
247 * Lastly, make sure you understand the exact boost.asio behavior when using task_engine() (yields util::Task_engine),
248 * asio_handler_via_op() (yields a util::Strand-bound callback), and/or op_to_exec_ctx() (yields a
249 * util::Task_engine, util::Strand, or something else depending on subclass, to be passed as an "execution context"
250 * to I/O object ctor).
251 *
252 * ### Are non-async boost.asio actions supported? ###
253 * boost.asio `async_*()` actions are supported by `flow::async` module. What about synchronous and non-blocking
254 * operations? Well, sure, they're supported. This module is just not *about* them, hence the name.
255 * Just for perspective though:
256 * - A non-blocking op (achieved in boost.asio normally by calling `something.non_blocking(true);` and then
257 * `some_op(something, ...)` or `something.some_op()`) can certainly be used whenever you want, in a task or
258 * outside of it, assuming of course you're not breaking thread-safety rules on concurrent access to `something`.
259 * The only "connection" to Concurrent_task_loop is that the `something` may be associated with
260 * `*(this->task_engine())`. That's fine.
261 * - A blocking op (achieved by calling `something.non_blocking(false);` and then same as in previous bullet)
262 * can also be used whenever. In some ways it's even less connected to Concurrent_task_loop, as blocking ops
263 * are only tangentially related to `Task_engine` in the first place; they don't participate in the
264 * internal event loop and simply usually call some blocking OS API or similar. However, a blocking call
265 * does *block* the thread; so if you do this inside a `*this`-posted task, then that task will block.
266 * - Informally: It it best not to mix blocking and non-blocking tasks in the same Concurrent_task_loop.
267 * The blocking ones will harm the scheduler's ability to efficiently schedule the quick (non-blocking) tasks.
268 * - Informally: However, it is entirely reasonable and practical to limit a given Concurrent_task_loop
269 * (even, or especially, a multi-thread one) to blocking tasks exclusively.
270 * - One practical example is a multi-threaded
271 * DNS host resolver that maintains many threads performing blocking `getaddrinfo()` calls, since an async
272 * DNS API is not available; and a separate async-only loop/thread kicking off result-emitting handlers to the
273 * user, with the former blocking-only loop posting result-emitting tasks onto the async-only loop/thread.
274 * (boost.asio's out-of-the-box `resolver` provides an async API but is internally single-threaded and therefore
275 * unsuitable at scale.)
276 *
277 * @internal
278 *
279 * ### Design discussion re. Concurrent_task_loop hierarchy ###
280 *
281 * @see Task_qing_thread, particularly the constructor Task_qing_thread::Task_qing_thread() doc header, explaining
282 * how the Concurrent_task_loop hierarchy is an encapsulation of `vector<Task_qing_thread>`, arranged in various
283 * potential ways of working with each other. It also includes an intro to the question of how to choose
284 * Cross_thread_task_loop vs. Segregated_thread_task_loop, at a lower level.
285 *
286 * While it is a clean interface, realistically speaking the entire existing hierarchy is perhaps best explained
287 * by immediately discussing the 2 concrete classes the API as of this writing. (More classes may well be added, but
288 * as of this writing and probably for posterity it makes sense to discuss these 2 specific ones.) The available pool
289 * types are:
290 *
291 * - Cross_thread_task_loop: Internally, a single shared Task_engine and N `Task_qing_thread`s working together
292 * off that `post()`-capable engine. N can be specified directly, or it can be auto-determined based on available
293 * hardware. The latter will also enable automatically pinning the threads in such a way as to attempt to minimize
294 * latency, namely avoiding hyper-threading or other physical core sharing by several hardware threads; this is done
295 * by making N = # of physical cores; and pinning each software thread to a distinct group of logical cores (hardware
296 * threads), so that each software thread gets its own physical core, avoiding latency-increasing "competition."
297 * An attempt is also made (achievable in Linux) to pin them in a consistent way, so that if another pool elsewhere
298 * uses the same code and config, they will arrange their same N threads in the same order. This can help if thread i
299 * from pool 1 is producer writing to some area in memory, while thread i from pool 2 is consumer of same, reading
300 * there. The `Cross_thread` part means that each multi-task sequence of callbacks constituting an async::Op, s/t
301 * those callbacks must not execute concurrently, may use more than 1 thread (internally, via the boost.asio
302 * util::Strand mechanism) which theoretically can improve use of thread time with asymmetrical load. It might
303 * also negate per-thread cache locality, etc., and counter-act the effectiveness of aforementioned pinning.
304 * - `Cross_thread` means cross-thread.
305 *
306 * - Segregated_thread_task_loop: Internally, N `Task_qing_thread`s working together, each with its *own*
307 * `post()`-capable Task_engine queue, meaning by contrast with
308 * Cross_thread_task_loop a given `Op` always executes its tasks on the same thread. Otherwise it works the same.
309 * Under asymmetrical load it might not use all available cross-thread time; however, it arguably also works
310 * straightforwardly "synergistically" with any attempts at per-processor-core pinning.
311 *
312 * The use is, of course, identical via the common API Concurrent_task_loop.
313 *
314 * Apologies for the conversational comment. The internal subtleties are encapsulated and hidden from user. Yet there
315 * is considerable flexibility available. One can think of this as a convenient wrapper around various functionality
316 * typically used manually and separately from each other -- simplifying the core interface to just async::Op and
317 * post() and providing automatic flexibility as to what functionality is in fact used and when as a result.
318 * The functionality accessible: `Task_engine::post()`; scheduling via util::Strand; scheduling on specific thread;
319 * non-concurrency guarantees of 2+ tasks in one async op; and thread-count selection and pinning based on available
320 * processor architecture (hardware threads, physical cores).
321 */
324{
325public:
326 // Types.
327
328 /// Short-hand for the thread-initializer-function optional arg type to start().
329 using Thread_init_func = Function<void (size_t thread_idx)>;
330
331 // Constructors/destructor.
332
333 /**
334 * Any implementing subclass's destructor shall execute stop() -- see its doc header please -- and then clean up
335 * any resources. The behavior of stop() has subtle implications, so please be sure to understand what it does.
336 *
337 * It is fine if stop() has already been called and returned.
338 */
340
341 // Methods.
342
343 /**
344 * Starts all threads in the thread pool; any queued `post()`ed (and similar) tasks may begin executing immediately;
345 * and any future posted work may execute in these threads. Calling start() after start() is discouraged and may
346 * log a WARNING but is a harmless no-op. See also stop().
347 *
348 * The optional `init_task_or_empty` arg is a convenience thing. It's equivalent to
349 * `post(init_task_or_empty, Synchronicity::S_ASYNC_AND_AWAIT_CONCURRENT_COMPLETION)` executed upon return.
350 * `init_task_or_empty()` will run in the new thread pool; and only once it `return`s, start() will `return`.
351 * Rationale: It has come up in our experience several times that one wants to execute something in the new thread(s)
352 * to initialize things, synchronously, before the main work -- various async `post()`ing and other calls -- can begin
353 * in earnest. Do note that any tasks enqueued before this start() but after the last stop() or constructor
354 * may run first.
355 *
356 * Suppose a user-supplied task posted onto a worker thread throws an uncaught exception. This will be handled
357 * the same as if that occurred directly in that thread; in other words we don't catch it in any way, not even
358 * to re-throw it or manually `std::abort()` or anything of that nature. We informally recommend you handle uncaught
359 * exceptions in a program-wide SIGABRT handler or equally program-wise custom `std::terminate()` (via
360 * `std::set_terminate()`). We informally recommend that all other threads similarly let any uncaught exception
361 * fall through and deal with the fallout at the global program-wide level (to avoid losing precious stack trace
362 * information).
363 *
364 * Assuming no such uncaught exception is thrown, all threads will run until stop() or the destructor runs and
365 * returns.
366 *
367 * ### Thread safety ###
368 * As noted in the class doc header, all methods are thread-safe on a common `*this` unless noted otherwise.
369 * To wit: it is not safe to call `X.start()` concurrently with `X.start()` or with `X.stop()`.
370 *
371 * ### Thread initialization ###
372 * Each thread start() starts shall be, soon, blocked by running an event loop (or part of a multi-thread event loop);
373 * meaning it will be either blocking waiting for posted tasks/active events or executing posted tasks/event handlers.
374 * It is, however, sometimes required to perform setup (usually of a low-level variety) in the thread before the
375 * event loop proper begins. (The use case that triggered this feature was wanting to execute Linux
376 * `setns(CLONE_NEWNET)` to affect the subsequent socket-create calls in that thread.) If needed: pass in
377 * a non-empty function as `thread_init_func_or_empty` arg; it will receive the thread index 0, 1, ... as the arg.
378 * It will run first-thing in the new thread. Subtlety: As of this writing "first-thing" means literally first-thing;
379 * it will run before any of the implementing start()'s own code begins. (This may be relaxed in the future to
380 * merely "before the event loop is ready for tasks." Then this comment shall be updated.)
381 *
382 * Note: start() shall *block* until *all* `thread_init_func_or_empty()` invocations (if arg not `.empty()`)
383 * have completed. This can be important, for example, if the actions they are taking require elevated privileges,
384 * then this guarantee means one can drop privileges after that. Informally: we intuitively recommend against
385 * blocking in this callback, although perhaps some use case might require it. Just be careful.
386 *
387 * ### Design rationale (thread initialization arg) ###
388 * Consider the specific implementation of the present interface, Segregated_thread_task_loop. Something similar
389 * to this feature is possible without this start() optional arg: One can simply post() onto each of the
390 * per_thread_ops(), with Synchronicity::S_ASYNC_AND_AWAIT_CONCURRENT_COMPLETION. It'll run as the first *task*
391 * in each thread, as opposed to strictly-speaking first-*thing*, but it's close enough. So why add this to the
392 * interface? Well, consider the other implementation, Cross_thread_task_loop. By definition it's not possible
393 * to target individual threads in that guy (per_thread_ops() exists but its `Op`s are *per*-thread, not *in*-thread;
394 * they are `Strand`s, not threads). So then some other, Cross_thread_task_loop-*only* API would be necessary to
395 * get what we need. Hence it made sense to add this as an interface-level feature. Then these asymmetries go away
396 * naturally.
397 *
398 * @todo Concurrent_task_loop::start() has an optional thread-initializer-function arg; it could be reasonable to
399 * ass a thread-finalizer-function arg symmetrically. As of this writing there is no use case, but it's certainly
400 * conceivable.
401 *
402 * @param init_task_or_empty
403 * Ignored if `.empty()` (the default). Otherwise `init_task_or_empty()` shall execute in one of the
404 * threads started by this method, delaying the method's return to the caller until `init_task_or_empty()`
405 * returns in said spawned thread.
406 * @param thread_init_func_or_empty
407 * If not `.empty() == true`, `thread_init_func_or_empty(thread_idx)` shall be executed first-thing
408 * in each thread, for all `thread_idx` in [0, n_threads()). start() will return no sooner than
409 * when each such callback has finished.
410 */
411 virtual void start(Task&& init_task_or_empty = Task(),
412 const Thread_init_func& thread_init_func_or_empty = Thread_init_func()) = 0;
413
414 /**
415 * Waits for any ongoing task(s)/completion handler(s) to return; then prevents any further-queued such tasks
416 * from running; then gracefully stops/joins all threads in pool; and then returns. The post-condition is that
417 * the worker threads have fully and gracefully exited.
418 *
419 * Upon return from this method, any further `post()` or more complex async ops can safely be invoked -- but they
420 * will not do any actual work, and no tasks or completion handlers will run until start(). In particular
421 * task_engine() will still return a util::Task_engine, and one can still invoke `post()` and async I/O ops on it:
422 * doing so won't crash, but it won't do the requested work until start(). (Recall that there are no more
423 * threads in which to do this work.) The destructor can then be invoked, at which point obviously one cannot
424 * `post()` (or anything else like it) either.
425 *
426 * This condition is reversible via start(). In fact, `*this` starts in the stopped state, and start() is required
427 * to make posted tasks actually execute.
428 *
429 * Lastly, calling stop() after stop() returns is a harmless no-op. Also note the destructor shall call stop().
430 *
431 * ### Thread safety ###
432 * As noted in the class doc header, all methods are thread-safe on a common `*this` unless noted otherwise.
433 * To wit: it is not safe to call `X.stop()` concurrently with `X.stop()` or with `X.start()`.
434 *
435 * You may call stop() from within a task/completion handler executing within `*this` thread pool. Of course
436 * you may also do this from another thread.
437 *
438 * ### Rationale ###
439 * This is similar to boost.asio `Task_engine::stop()`. At a minimum it is useful, when shutting down the app
440 * or module, in the situation where 2+ `Concurrent_task_loop`s routinely post work onto each other (or in at least 1
441 * direction). To safely stop all 2+ loops, one would first invoke this stop() method on each
442 * Concurrent_task_loop, in any order; having done that destroy (invoke dtor on) each Concurrent_task_loop, also in
443 * any order. This way any cross-posting will safely work during the stop() phase (but do nothing on the
444 * already-stopped loops); and by the time the destructor-invoking phase begins, no more cross-posting tasks can
445 * possibly be executing (as their threads don't even exist by then).
446 *
447 * Note, however, that this is as graceful as we can generically guarantee -- in that it won't crash/lead to undefined
448 * behavior on our account -- but it is up to you to ensure your algorithm is robust, in that nothing bad will happen
449 * if tasks are suddenly prevented from running. For example, if task A locks some file, while task B later unlocks
450 * it, *you* are the one who must ensure you don't invoke stop() "between" task A and task B. (E.g., invoking it
451 * while A runs will let A complete; but it will very possibly prevent B from starting subsequently.) We have no way
452 * of knowing to let task B run first and only then stop the thread(s).
453 *
454 * Lastly, the stop() and start() mechanism is amenable to dynamically configuring thread behavior such as the
455 * number of threads in the pool.
456 *
457 * @todo boost.asio has advanced features that might help to mark certain tasks as "must-run if already queued, even
458 * if one `stop()`s"; consider providing user-friendly access to these features, perhaps in the context of the
459 * existing Concurrent_task_loop::stop() API. These features are documented as of this writing at least in the
460 * `io_context` doc page.
461 */
462 virtual void stop() = 0;
463
464 /**
465 * How many threads does start() start?
466 * @return See above.
467 */
468 virtual size_t n_threads() const = 0;
469
470 /**
471 * Return a new Op which can bundle together an arbitrary set of `post()`s that would result in the
472 * provided task functions executing non-concurrently. That's informal; the formal semantics of what async::Op
473 * means are in async::Op doc header. Informally: please recall that a copy (of a copy, of a copy, ...) of
474 * an `Op` is an equivalent `Op`, and copying them is light-weight (at worst like copying `shared_ptr`).
475 *
476 * All `Op`s shall remain valid throughout the lifetime of `*this`.
477 *
478 * This is the more general method of obtaining an async::Op, vs. going through per_thread_ops().
479 * It should be used *unless* you specifically need to access some
480 * per-thread resource in the associated `Task`s. See class doc header for more discussion on this dichotomy.
481 * TL;DR: Use create_op() by default, *unless* the `Task`s you plan to execute are working on some
482 * per-thread resource.
483 *
484 * @return See above.
485 */
486 virtual Op create_op() = 0;
487
488 /**
489 * Returns the optional-use, pre-created collection of per-thread async::Op objects, such that
490 * the i-th `Op` therein corresponds to the i-th (of N, where N = # of threads in this pool) thread.
491 *
492 * All `Op`s and this `Op_list&` shall remain valid throughout the lifetime of `*this`.
493 *
494 * This is an advanced/legacy-ish feature. Please see class doc header for discussion on when one should use this
495 * as opposed to the simpler create_op().
496 *
497 * @return See above.
498 */
499 virtual const Op_list& per_thread_ops() = 0;
500
501 /**
502 * Cause the given `Task` (function) to execute within the thread pool as soon as possible, in the first thread
503 * available, in otherwise first-come-first-served fashion. `task` may execute concurrently with some other `Task` if
504 * there are 2+ threads in `*this` pool. Meanings of "as soon as possible" and "available" are to be determined by
505 * the concrete method implementation. That is, the interface does not promise it'll use literally the first
506 * thread to be idle, but informally -- all else being equal -- that's a great goal.
507 *
508 * `synchronicity` controls the precise behavior of the "post" operation. Read #Synchronicity `enum` docs carefully.
509 * That said: if left defaulted, `post()` works in the `Task_engine::post()` manner: return immediately; then
510 * execute either concurrently in another thread or later in the same thread.
511 *
512 * This is safe to call after stop(), but `task()` will not run until start() (see stop() doc header).
513 * Synchronicity::S_ASYNC_AND_AWAIT_CONCURRENT_COMPLETION and Synchronicity::S_ASYNC_AND_AWAIT_CONCURRENT_START modes
514 * will, therefore, block infinitely in that case; so don't do that after stop().
515 *
516 * Reminder: This is thread-safe as explained in class doc header.
517 *
518 * ### Rationale note ###
519 * The callback arg would normally be the last arg, by Flow coding style. In this case it isn't, because
520 * it is more valuable to make `synchronicity` optional (which it can only be if it's the last arg).
521 *
522 * @param task
523 * Task to execute. `task` object itself may be `move`d and saved.
524 * @param synchronicity
525 * Controls when `task()` will execute particularly in relation to when this `post()` call returns.
526 */
527 virtual void post(Task&& task, Synchronicity synchronicity = Synchronicity::S_ASYNC) = 0;
528
529 /**
530 * Identical to the other post() with the added constraint that no other `Task` *also* similarly posted with the
531 * equivalent async::Op may execute concurrently. See doc header for async::Op for a formal definition of
532 * what this call does w/r/t async::Op.
533 *
534 * Reminder: This is thread-safe as explained in class doc header.
535 *
536 * @param op
537 * The (presumably) multi-async-step operation to which `task` belongs, such that no `Task`s associated with
538 * `op` may execute concurrently with `task`. If `op.empty()` (a/k/a `op == Op()`, recalling that `Op()`
539 * is null/sentinel), then `assert()` trips.
540 * @param task
541 * See other post().
542 * @param synchronicity
543 * See other post().
544 */
545 virtual void post(const Op& op, Task&& task, Synchronicity synchronicity = Synchronicity::S_ASYNC) = 0;
546
547 /**
548 * Equivalent to 2-argument post() but execution is scheduled for later, after the given time period passes.
549 *
550 * The semantics are, in all ways in which this differs from 2-argument post(), those of
551 * util::schedule_task_from_now(). This includes the meaning of the returned value and the nature of
552 * util::Scheduled_task. Also, in particular, one can perform actions like canceling, short-firing, and
553 * info-access by passing the returned handle into util::scheduled_task_cancel() and others.
554 *
555 * @warning If n_threads() is 1, then you *must* not call any `util::scheduled_task_*()` function on the returned
556 * handle except from within `*this` loop's tasks.
557 *
558 * @todo Deal with the scheduled-tasks-affected-from-outside-loop corner case of the
559 * `Concurrent_task_loop::schedule_*()` APIs. Perhaps add `bool in_loop_use_only` arg
560 * which, if `false`, will always disable the `single_threaded` optimization internally.
561 * At this time it always enables it if `n_threads() == 1` which will cause thread un-safety if
562 * the returned handle is touched from outside an in-loop task. `void` versions of the `schedule_*()` APIs
563 * should be added which would lack this, as in that case there is no handle to misuse outside the loop.
564 *
565 * @param from_now
566 * See util::schedule_task_from_now().
567 * @param task
568 * The task to execute within `*this` unless successfully canceled.
569 * `task` object itself may be `move`d and saved.
570 * @return See util::schedule_task_from_now().
571 */
573
574 /**
575 * Equivalent to 2-argument schedule_from_now() except one specifies an absolute time point instead of wait duration.
576 *
577 * @warning See schedule_from_now() warning.
578 *
579 * @param at
580 * See util::schedule_task_at().
581 * @param task
582 * See schedule_from_now().
583 * @return See schedule_from_now().
584 */
586
587 /**
588 * Equivalent to 3-argument post() but execution is scheduled for later, after the given time period passes.
589 *
590 * The semantics are, in all ways in which this differs from 3-argument post(), those of
591 * util::schedule_task_from_now(). This includes the meaning of the returned value and the nature of
592 * util::Scheduled_task.
593 *
594 * @warning See other schedule_from_now() warning.
595 *
596 * @param op
597 * See 3-argument post().
598 * @param from_now
599 * See util::schedule_task_from_now().
600 * @param task
601 * The task to execute within `*this`, subject to `op` constraints, unless successfully canceled.
602 * `task` object itself may be `move`d and saved.
603 * @return See util::schedule_task_from_now().
604 */
606 const Fine_duration& from_now, Scheduled_task&& task) = 0;
607
608 /**
609 * Equivalent to 3-argument schedule_from_now() except one specifies an absolute time point instead of wait duration.
610 *
611 * @warning See schedule_from_now() warning.
612 *
613 * @param op
614 * See 3-argument post().
615 * @param at
616 * See util::schedule_task_at().
617 * @param task
618 * See schedule_from_now().
619 * @return See schedule_from_now().
620 */
622 const Fine_time_pt& at, Scheduled_task&& task) = 0;
623
624 /**
625 * Returns a pointer to *an* internal util::Task_engine (a/k/a boost.asio `io_service`) for the purpose of
626 * performing a boost.asio `async_*()` action on some boost.asio I/O object in the immediate near future.
627 *
628 * The mechanics of using this are explained in Concurrent_task_loop doc header. Using this in any other
629 * fashion may lead to undefined behavior, while `*this` exists.
630 *
631 * @return A mutable util::Task_engine to use soon. Informally, the sooner one calls the intended `async_*()` action
632 * on it, the more effective the internal load-balancing. Formally, it is *allowed* to use it as long as
633 * `*this` exists (pre-destructor) and even beyond that, though any use beyond that point would pass the
634 * reponsibility on providing thread(s) to `run()` in becomes the user's.
635 */
637}; // class Concurrent_task_loop
638
639// Free functions: in *_fwd.hpp.
640
641} // namespace flow::async
642
643// Macros.
644
645#ifdef FLOW_DOXYGEN_ONLY // Compiler ignores; Doxygen sees.
646
647/**
648 * Macro set to `1` (else `0`) if and only if natively the pthread API allows one to set thread-to-core
649 * affinity. This API, if available, is an extension of POSIX and not always available.
650 *
651 * The macro conceptually belongs to the flow::async namespace,
652 * hence the prefix.
653 *
654 * @see #FLOW_ASYNC_HW_THREAD_AFFINITY_MACH_VIA_POLICY_TAG
655 */
656# define FLOW_ASYNC_HW_THREAD_AFFINITY_PTHREAD_VIA_CORE_IDX
657
658/**
659 * Macro set to `1` (else `0`) if and only if natively there is Mach kernel API that allows to set thread-to-core
660 * affinity using the policy tag mechanism. This is specific to Mac kernels (used in Darwin/Mac) and not all of
661 * them.
662 *
663 * The macro conceptually belongs to the flow::async namespace,
664 * hence the prefix.
665 *
666 * @see #FLOW_ASYNC_HW_THREAD_AFFINITY_PTHREAD_VIA_CORE_IDX
667 */
668# define FLOW_ASYNC_HW_THREAD_AFFINITY_MACH_VIA_POLICY_TAG
669
670#else // if !defined(FLOW_DOXYGEN_ONLY)
671
672// Now the actual definitions compiler sees (Doxygen ignores).
673
674# ifdef FLOW_OS_MAC
675# define FLOW_ASYNC_HW_THREAD_AFFINITY_PTHREAD_VIA_CORE_IDX 0
676# define FLOW_ASYNC_HW_THREAD_AFFINITY_MACH_VIA_POLICY_TAG 1
677# elif defined(FLOW_OS_LINUX)
678# define FLOW_ASYNC_HW_THREAD_AFFINITY_PTHREAD_VIA_CORE_IDX 1
679# define FLOW_ASYNC_HW_THREAD_AFFINITY_MACH_VIA_POLICY_TAG 0
680# else
681# error "We only know how to deal with thread-core affinities in Darwin/Mac and Linux."
682# endif
683
684#endif // elif !defined(FLOW_DOXYGEN_ONLY)
685
686namespace flow::async
687{
688// Template implementations.
689
690template<typename Handler>
691auto asio_handler_via_op(Concurrent_task_loop* loop, const Op& op, Handler&& handler)
692{
693 /* To understand this -- though it is rather basic boiler-plate if one is familiar with lambdas and boost.asio --
694 * in context, read the relevant section of Concurrent_task_loop doc header. */
695 return [loop, op, handler = std::move(handler)](auto... params) mutable
696 {
697 loop->post(op,
698 [handler = std::move(handler), // Avoid copying handler again. Move it instead.
699 params...]()
700 {
701 handler(params...);
703 /* Attn: That last argument means if we're already in the proper thread, then run `handler(args...);`
704 * synchronously right now. This is a perf win -- particularly when *loop is really a Segregated_thread_task_loop,
705 * when this win will be in effect every single time -- but is it safe? Yes: The entirety of the intended task
706 * *is* `handler(args...)`, and the present { body } has already been post()ed in its entirety and is
707 * executing, hence we're adding nothing unsafe by executing the handler synchronously now. */
708 };
709
710 /* Subtleties regarding `params...` and `auto`:
711 * Firstly, if not familiar with this crazy particular use of `auto` (on `params`), it's something like this
712 * (cppreference.com, in its big page on lambdas, explains this is called a "generic lambda"):
713 * The params... pack expands to the arg types based on what Handler really is, in that instantiation of the
714 * asio_handler_via_op() template; meaning if Handler is, say, Function<void (int a, int b)>, then
715 * params... = int, int -- because of the invocation of `handler()` inside the *body* of the returned lambda.
716 * Hence the returned object is a function object that takes the same types of arguments, magically.
717 * Secondly, and this is where it gets subtle, notice what we do with params... within the returned lambda:
718 * we *capture* params..., so that we can post() a no-arg handler onto *loop; and within the body of that no-arg
719 * handler we will finally (when boost.asio actually executes the post()ed thing at some point) invoke the handler,
720 * passing it `params...`. The subtlety is that each actual arg in the params... pack is *copied* when thus
721 * captured. Ideally, for performance, we'd write something like `params = std::move(params)...`, but this is not
722 * supported (at least in C++17); only capturing a pack by copy is supported -- using a pack in an init-capture
723 * ([blah = blah]) isn't supported. To that end a standards edit was proposed:
724 * http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2017/p0780r0.html
725 * Apparently C++20 accepted this in slightly modified (syntactically) form, and indeed on coliru.com one can write
726 * `...params = std::move(params)` and build it with --std=c++20. In the meantime (C++14, C++17) one *could* get the
727 * desired behavior by using a strained tuple<> construction (see the above link which suggests it as a work-around).
728 * However, I (ygoldfel) did not do it; I am just copying it. Reason: Well, it's much easier to read.
729 * Perf-wise, however, the reason is: boost.asio out-of-the-box handler types all take little scalars (size_t,
730 * Error_code) which are fine to copy (std::move() would just copy them anyway). That said, boost.asio can be used
731 * with custom types; and indeed for example flow::net_flow::asio::Server_socket::async_accept() works with handlers
732 * that take a shared_ptr<> arg: copying that would increment a ref-count/later decrement it... whereas a move()
733 * would do neither. So it's not quite free in all cases -- close enough though in my opinion.
734 *
735 * @todo As of this writing we've recently moved to C++17 (months after this code was written originally);
736 * which gains std::apply(). The tuple<>+apply() trick isn't *so* hard to read/write. Maybe do so.
737 * @todo With C++20 -- if that ever happens -- can just write it so nicely: [...params = std::move(params)];
738 * best of all worlds. */
739
740 // ---
741
742 /* Lastly let's deal with the possibility that *loop is a Timed_concurrent_task_loop decorator of an actual
743 * Concurrent_task_loop. The good news is we've already ensured `handler()` itself gets timed: we L->post()ed,
744 * and L->post() does the right thing by definition when L is indeed Timed_.... However the loop->post() thingie
745 * we executed to make that happen -- that won't be timed, if we simply return posting_handler; after all
746 * they'll just do async_<something>(..., posting_handler), and that'll be that. One *could* try to
747 * conditionally (via dynamic_cast<> RTTI check) wrap the above in asio_handler_timed() if *loop is
748 * really a Timed_concurrent_task_loop; otherwise just return the above. However then `auto` can fail to compile,
749 * as the returned lambda function object type could be A in one case but B in another, for the same `Handler`;
750 * and we're screwed. So for now it's a @todo:
751 *
752 * @todo Enable the timing of the post()ing code above, not just handler() itself. How? Suppose the RTTI
753 * check idea were workable (thought experiment). Then:
754 * This kind of use of RTTI to conditionally do or not do something immediately sets off alarms; the point
755 * of interfaces is to do everything conditional through virtual. But the whole point of
756 * Timed_concurrent_task_loop's design is to *not* modify the core Concurrent_task_loop API but only decorate
757 * its behavior. To really solve this, I (ygoldfel) believe that a good-anyway rewrite of the
758 * Concurrent_task_loop hierarchy is in order. It really should have been a set of concept-implementing
759 * classes (where Concurrent_task_loop would cease to exist as a class and be merely a duck-typing concept).
760 * This would be a breaking change, but it would resolve a few issues. 1, asio_handler_via_op() could be coded
761 * separately for each type of _loop, which would make it possible to not even add this intermediate post()
762 * but instead simply do the right thing for each actual type of _loop: Cross_thread_task_loop would
763 * fish out the Strand from the Op and bind that executor to `handler`; Segregated_thread_task_loop would
764 * be a no-op (as the user must select the Op-appropriate Task_engine in the first place). The perf
765 * implications of such an approach would be pleasant. And 2,
766 * Timed_concurrent_task_loop's overload of asio_handler_via_op() would simply forward to the contained
767 * Concurrent_task_loop's asio_handler_via_op() -- and then wrap the result in asio_handler_timed() and return
768 * that. Nice and clean. Obviously then the way one uses the whole hierarchy would change and be a breaking
769 * change -- but even that wouldn't be such a big deal; a virtual hierarchy could be built on top of these
770 * concept-satisfying classes, if needed (and I doubt it's needed all that frequently in practice). */
771} // asio_handler_via_op()
772
773} // namespace flow::async
The core flow::async interface, providing an optionally multi-threaded thread pool onto which runnabl...
virtual size_t n_threads() const =0
How many threads does start() start?
virtual void post(Task &&task, Synchronicity synchronicity=Synchronicity::S_ASYNC)=0
Cause the given Task (function) to execute within the thread pool as soon as possible,...
virtual Op create_op()=0
Return a new Op which can bundle together an arbitrary set of post()s that would result in the provid...
virtual const Op_list & per_thread_ops()=0
Returns the optional-use, pre-created collection of per-thread async::Op objects, such that the i-th ...
~Concurrent_task_loop() override
Any implementing subclass's destructor shall execute stop() – see its doc header please – and then cl...
Function< void(size_t thread_idx)> Thread_init_func
Short-hand for the thread-initializer-function optional arg type to start().
virtual void stop()=0
Waits for any ongoing task(s)/completion handler(s) to return; then prevents any further-queued such ...
virtual void post(const Op &op, Task &&task, Synchronicity synchronicity=Synchronicity::S_ASYNC)=0
Identical to the other post() with the added constraint that no other Task also similarly posted with...
virtual util::Scheduled_task_handle schedule_at(const Op &op, const Fine_time_pt &at, Scheduled_task &&task)=0
Equivalent to 3-argument schedule_from_now() except one specifies an absolute time point instead of w...
virtual Task_engine_ptr task_engine()=0
Returns a pointer to an internal util::Task_engine (a/k/a boost.asio io_service) for the purpose of p...
virtual util::Scheduled_task_handle schedule_from_now(const Fine_duration &from_now, Scheduled_task &&task)=0
Equivalent to 2-argument post() but execution is scheduled for later, after the given time period pas...
virtual util::Scheduled_task_handle schedule_at(const Fine_time_pt &at, Scheduled_task &&task)=0
Equivalent to 2-argument schedule_from_now() except one specifies an absolute time point instead of w...
virtual util::Scheduled_task_handle schedule_from_now(const Op &op, const Fine_duration &from_now, Scheduled_task &&task)=0
Equivalent to 3-argument post() but execution is scheduled for later, after the given time period pas...
virtual void start(Task &&init_task_or_empty=Task(), const Thread_init_func &thread_init_func_or_empty=Thread_init_func())=0
Starts all threads in the thread pool; any queued post()ed (and similar) tasks may begin executing im...
Simple, immutable vector-like sequence of N opaque async::Op objects, usually corresponding to N work...
Definition: op.hpp:58
An empty interface, consisting of nothing but a default virtual destructor, intended as a boiler-plat...
Definition: util.hpp:45
Flow module containing tools enabling multi-threaded event loops operating under the asynchronous-tas...
Definition: async_fwd.hpp:75
boost::any Op
An object of this opaque type represents a collection of 1 or more async::Task, past or future,...
Definition: async_fwd.hpp:153
Synchronicity
Enumeration indicating the manner in which asio_exec_ctx_post(), and various boost....
Definition: async_fwd.hpp:223
@ S_ASYNC
Simply post the given task to execute asynchronously in some execution context – as soon as the conte...
@ S_OPPORTUNISTIC_SYNC_ELSE_ASYNC
Execute the given task synchronously, if the scheduler determines that the calling thread is in its t...
boost::shared_ptr< util::Task_engine > Task_engine_ptr
Short-hand for reference-counting pointer to a mutable util::Task_engine (a/k/a boost::asio::io_servi...
Definition: async_fwd.hpp:198
Function< void()> Task
Short-hand for a task that can be posted for execution by a Concurrent_task_loop or flow::util::Task_...
Definition: async_fwd.hpp:96
auto asio_handler_via_op(Concurrent_task_loop *loop, const Op &op, Handler &&handler)
Given a boost.asio completion handler handler for a boost.asio async_*() action on some boost....
boost::shared_ptr< Scheduled_task_handle_state > Scheduled_task_handle
Black-box type that represents a handle to a scheduled task as scheduled by schedule_task_at() or sch...
Fine_clock::duration Fine_duration
A high-res time duration as computed from two Fine_time_pts.
Definition: common.hpp:410
Fine_clock::time_point Fine_time_pt
A high-res time point as returned by Fine_clock::now() and suitable for precise time math in general.
Definition: common.hpp:407