Flow 1.0.2
Flow project: Full implementation reference.
|
The core flow::async interface, providing an optionally multi-threaded thread pool onto which runnable Task
s, optionally arranged into concurrency-avoiding Op
s, can be boost.asio-posted for subsequent execution.
More...
#include <concurrent_task_loop.hpp>
Public Types | |
using | Thread_init_func = Function< void(size_t thread_idx)> |
Short-hand for the thread-initializer-function optional arg type to start(). More... | |
Public Member Functions | |
~Concurrent_task_loop () override | |
Any implementing subclass's destructor shall execute stop() – see its doc header please – and then clean up any resources. More... | |
virtual void | start (Task &&init_task_or_empty=Task(), const Thread_init_func &thread_init_func_or_empty=Thread_init_func())=0 |
Starts all threads in the thread pool; any queued post() ed (and similar) tasks may begin executing immediately; and any future posted work may execute in these threads. More... | |
virtual void | stop ()=0 |
Waits for any ongoing task(s)/completion handler(s) to return; then prevents any further-queued such tasks from running; then gracefully stops/joins all threads in pool; and then returns. More... | |
virtual size_t | n_threads () const =0 |
How many threads does start() start? More... | |
virtual Op | create_op ()=0 |
Return a new Op which can bundle together an arbitrary set of post() s that would result in the provided task functions executing non-concurrently. More... | |
virtual const Op_list & | per_thread_ops ()=0 |
Returns the optional-use, pre-created collection of per-thread async::Op objects, such that the i-th Op therein corresponds to the i-th (of N, where N = # of threads in this pool) thread. More... | |
virtual void | post (Task &&task, Synchronicity synchronicity=Synchronicity::S_ASYNC)=0 |
Cause the given Task (function) to execute within the thread pool as soon as possible, in the first thread available, in otherwise first-come-first-served fashion. More... | |
virtual void | post (const Op &op, Task &&task, Synchronicity synchronicity=Synchronicity::S_ASYNC)=0 |
Identical to the other post() with the added constraint that no other Task also similarly posted with the equivalent async::Op may execute concurrently. More... | |
virtual util::Scheduled_task_handle | schedule_from_now (const Fine_duration &from_now, Scheduled_task &&task)=0 |
Equivalent to 2-argument post() but execution is scheduled for later, after the given time period passes. More... | |
virtual util::Scheduled_task_handle | schedule_at (const Fine_time_pt &at, Scheduled_task &&task)=0 |
Equivalent to 2-argument schedule_from_now() except one specifies an absolute time point instead of wait duration. More... | |
virtual util::Scheduled_task_handle | schedule_from_now (const Op &op, const Fine_duration &from_now, Scheduled_task &&task)=0 |
Equivalent to 3-argument post() but execution is scheduled for later, after the given time period passes. More... | |
virtual util::Scheduled_task_handle | schedule_at (const Op &op, const Fine_time_pt &at, Scheduled_task &&task)=0 |
Equivalent to 3-argument schedule_from_now() except one specifies an absolute time point instead of wait duration. More... | |
virtual Task_engine_ptr | task_engine ()=0 |
Returns a pointer to an internal util::Task_engine (a/k/a boost.asio io_service ) for the purpose of performing a boost.asio async_*() action on some boost.asio I/O object in the immediate near future. More... | |
Public Member Functions inherited from flow::util::Null_interface | |
virtual | ~Null_interface ()=0 |
Boring virtual destructor. More... | |
Related Functions | |
(Note that these are not member functions.) | |
template<typename Exec_ctx_ptr > | |
Exec_ctx_ptr | op_to_exec_ctx (Concurrent_task_loop *loop, const Op &op) |
Template specialization model for operation that obtains the underlying execution context, such as a util::Task_engine or util::Strand, stored in an async::Op generated by the given Concurrent_task_loop. More... | |
The core flow::async interface, providing an optionally multi-threaded thread pool onto which runnable Task
s, optionally arranged into concurrency-avoiding Op
s, can be boost.asio-posted for subsequent execution.
All methods are thread-safe for read-write on a shared Concurrent_task_loop, after its ctor returns, unless otherwise specified (but read on). This is highly significant, just as it is highly significant that boost.asio's Task_engine::post()
is similarly thread-safe. However, it is not safe to call either stop() or start() concurrently with itself or the other of the two, on the same Concurrent_task_loop.
Whenever the user needs a pool of task-executing threads, meaning threads awaiting user-supplied work, to be post()
ed (etc.) in the boost.asio sense, they'll create a concrete subclass of this interface (the choice perhaps based on configuration, e.g.). The choice of subclass determines how tasks will be scheduled internally across threads, but the user need not worry about that after construction.
If your task loop is fundamentally single-threaded – which is extremely common and typically does not generalize to a multi-threaded one easily – then instead use the adapter Single_thread_task_loop which is not a part of Concurrent_task_loop hierarchy but does use the relevant parts of it internally.
If you choose Single_thread_task_loop then it is not necessary to read further.
This starts the actual threads in the thread pool. Hence subsequent post(F)
(etc.) will cause F()
to be able to run in one of the threads. Some advanced points:
post()
ed (etc.) that hadn't yet run will remain queued. Any post() (etc.) until the next start() will (again) do no work and (again) remain queued until start().*this
is destroyed. It is then your responsibility to start thread(s) in which to actually execute its tasks. Any queued tasks on the Task_engine
will remain queued until then.boost::asio::io_context
) users.)One can post() a task (in the same way one would simply Task_engine::post()
). If one wants to execute an async op with 2+ non-concurrent tasks, they would pass the same async::Op to post() for each of the aforementioned 2+ Task
s (which are simply void
no-arg functions basically). An async::Op can be created via create_op(); or if the task must be pinned to a specific pre-made per-software-thread async::Op, these are accessible via per_thread_ops().
New applications should strive to use only create_op() and not touch the advanced-yet-legacy-ish per_thread_ops() facility. In a classic async-task-based-event-loop algorithm, it should be sufficient to execute Task
s – sprinkling in Op
tags when certain tasks together comprise multi-async-step ops – via one of the post() overloads. Hence simply make an Op
via create_op() to associate Task
s with each other, and that should be enough for most async algorithms.
Note also the optional Synchronicity synchronicity
argument to the post()
methods. By default this acts like regular Task_engine::post()
, but you can also access Task_engine::dispatch()
type of behavior; you can wait for the task to complete using yet another mode. The latter feature, Synchronicity::S_ASYNC_AND_AWAIT_CONCURRENT_COMPLETION, may be particularly helpful at initialization time, such as if one needs to perform some startup tasks in the new thread(s) before continuing to general work on the loop. E.g., subclasses might, for convenience, wrap this ability in their constructors, so that the user can optionally provide an initializing task to run before the ctor returns.
Lastly Synchronicity::S_ASYNC_AND_AWAIT_CONCURRENT_START may be quite helpful in ensuring a certain task definitely does run – without waiting for it to actually run, but merely begin to run. More specifically L.post(F, Synchronicity::S_ASYNC_AND_AWAIT_CONCURRENT_START); L.stop();
will ensure F()
will run at some point – but not wait for its completion. L.post(F); L.stop();
will not ensure this at all; whether it does or not is a matter of luck. L.post(F, Synchronicity::S_ASYNC_AND_AWAIT_CONCURRENT_COMPLETION); L.stop();
will ensure it but also force a wait until F()
finishes which may not be necessary and hence increases latency/responsiveness in the calling thread.
Task
s with specific threadsThere are cases when the simple, conceptual approach just described (using create_op() only, if an Op
is even desired at all) is not sufficient. Another approach is to pre-create N Op
s, where N is the number of threads in the thread pool, and instead of create_op() one can randomly choose one of those N Op
s and post()
onto that one (an Op
is an Op
regardless of how it was obtained). Informally, there are 2 known categories of use cases for this, with overlap:
Op
corresponds to a worker thread specifically, then internally 2 Task
s being assigned to that 1 Op
would just mean executing them on that Op
's corresponding worker thread; threads are serial by definition, so the Op
semantics are trivially satisfied. So to support such legacy designs, the per-thread-pre-created in per_thread_ops() allow user to obtain either a randomly chosen or a specifically indexed 1 of N per-thread async::Op.Task
s comprising that request's handling: Since all tasks operating in Op
(therefore thread) #3 by definition execute non-concurrently, no locks are necessary when working with thread #3's thread-local object store copy. Simply, only thread i
of N will ever touch object store i
of N, to the extent it can be explicitly declared thread-local (thread_local
in C++11, or similar).i
in either process accessing (writing and reading, respectively) only queue i
. Then – assuming the queue itself is safe against 1 read occurring concurrently with 1 write – no further locking is required. Basically, thread i
in daemon 1 deals with thread i
in daemon 2, using a dedicated lock-free thread-i
-access-only IPC queue.post() is absolutely core, of course. However obviously sometimes one needs to wait asynchronously for some kind of event and THEN execute a task on that event. In particular, executing it simply after some specific time period passes is common and has a dedicated API. This is called scheduling a task in our parlance.
If you want to schedule a task, first decide whether you need certain advanced capabilities. This is explained in the doc header for util::schedule_task_from_now(). If you decide you need advanced capabilities, then skip to the next subsection below, about general boost.asio I/O objects. Most of the time you won't, in which case read on:
schedule_from_now() and schedule_at() in this Concurrent_task_loop interface provide all the capabilities of util::schedule[d]_task*()
API. (Reminder: This includes canceling and short-firing the task with ease, more ease than if using the full on I/O util::Timer, which is – again – explained below.) Just as with post(), there is 1 version of each method for single tasks; and and 1 for operating within an async::Op, meaning the timer completion handler.
Finally, there's general boost.asio "I/O object" work. An I/O object is usually a class – within boost.asio itself or a custom object – with 1 or more asynchronous action methods, always named in the style async_*()
. To show how one would do this with Concurrent_task_loop, let's do it in somewhat formal fashion:
Suppose you have boost.asio object X, for example boost::asio::ip::tcp::socket
(in boost.asio itself) and flow::net_flow::asio::Peer_socket (a custom one), and we want to perform an async_A()
action, which waits asynchronously for some event (e.g., a successful tcp::socket::async_receive()
or net_flow::asio::Peer_socket::async_send()
, and then executes a completion handler task F:
That's the setup and should be familiar to boost.asio I/O object users. (Note that util::Timer is a (relatively simple) I/O object itself; it lacks A_settings
(one makes a call like expires_at()
separately before the actual async action) and A_more_result_info
(as err_code
is sufficient) in particular. It also lacks any A_target
. It's clearly the degenerate example of an I/O object action.) So how to write the above when working with a Concurrent_task_loop instead of Task_engine
?
Almost everything is the same! Just need to call that API to obtain a Task_engine
to use. As a result:
X.sync_A()
, will be performed by some unspecified code in some unknown thread. We don't care how with boost.asio directly, and we don't care how with Concurrent_task_loop either.async_A()
s it will modify A_target
. This, too, will be done by an unspecified thread with no locking guarantees. Hence, one must not access A_target
from application threads. As wit boost.asio direct use, this is typical. For example no app code would access the target data buffer of a receive operation.F()
will be posted on completion via Task_engine L.task_engine()
. Of course we guarantee it will be in some thread in the thread pool L
.Finally, then, suppose the original snippet above is modified to use a Strand
, to guarantee non-concurrency with some other boost.asio handler(s). This would look like:
To accomplish this with a Concurrent_task_loop L
:
However, now that you're working with an I/O object directly, you must be careful. Memorizing a Task_engine
at construction has different effects depending on which concrete subclass of Concurrent_task_loop L
is. Cross_thread_task_loop in particular will assign it to whichever thread is best. Segregated_thread_task_loop will keep using the same random-ish thread chosen when L.task_engine()
is called. If you need particular behavior, you will need to strongly consider what to do: It is no longer totally generic behavior independent of the subclass, as it generally is when it comes to the post() and schedule_*()
APIs.
Lastly, if you are going down that road (which may be fully necessary) then consider the free function template op_to_exec_ctx() which is specialized for each concrete Concurrent_task_loop; it takes a loop and an async::Op as input; and returns a boost.asio "execution context" which can be passed – much like a Task_engine
in the above example – to I/O object constructors. See the specializations – as of this writing near Cross_thread_task_loop (returns util::Strand) and Segregated_thread_task_loop (returns util::Task_engine) at least. Choose between the above technique and op_to_exec_ctx() when working directly with a boost.asio-compatible I/O object.
I am self-conscious at the length and seeming complexity of this formal writeup but must emphasize: This is using the same patterns as boost.asio users use. It's just a matter of mapping them to flow::async
Flow module's generalized Concurrent_task_loop and async::Op APIs. Reminder: The benefit of this is that one uses boost.asio-equivalent semantics; yet the Concurrent_task_loop concrete subclass can implement it internally in various ways that are or aren't what a direct use of Task_engine
would do. However, when using I/O objects – as opposed to post() – the genericness will be less generic. That is sometimes necessary.
TL;DR: Any boost.asio-style (whether from boost.asio itself or custom) I/O object is to be used as normal, but: To get a Task_engine
, use task_engine(). To get a Strand
-like thing util::Op, use create_op(). To use the Strand
-like thing util::Op, use asio_handler_via_op(). Alternatively use actual Task_engine
s or Strand
s directly if necessary; see op_to_exec_ctx() specializations for that purpose. Lastly, make sure you understand the exact boost.asio behavior when using task_engine() (yields util::Task_engine), asio_handler_via_op() (yields a util::Strand-bound callback), and/or op_to_exec_ctx() (yields a util::Task_engine, util::Strand, or something else depending on subclass, to be passed as an "execution context" to I/O object ctor).
boost.asio async_*()
actions are supported by flow::async
module. What about synchronous and non-blocking operations? Well, sure, they're supported. This module is just not about them, hence the name. Just for perspective though:
something.non_blocking(true);
and then some_op(something, ...)
or something.some_op()
) can certainly be used whenever you want, in a task or outside of it, assuming of course you're not breaking thread-safety rules on concurrent access to something
. The only "connection" to Concurrent_task_loop is that the something
may be associated with *(this->task_engine())
. That's fine.something.non_blocking(false);
and then same as in previous bullet) can also be used whenever. In some ways it's even less connected to Concurrent_task_loop, as blocking ops are only tangentially related to Task_engine
in the first place; they don't participate in the internal event loop and simply usually call some blocking OS API or similar. However, a blocking call does block the thread; so if you do this inside a *this
-posted task, then that task will block.getaddrinfo()
calls, since an async DNS API is not available; and a separate async-only loop/thread kicking off result-emitting handlers to the user, with the former blocking-only loop posting result-emitting tasks onto the async-only loop/thread. (boost.asio's out-of-the-box resolver
provides an async API but is internally single-threaded and therefore unsuitable at scale.)vector<Task_qing_thread>
, arranged in various potential ways of working with each other. It also includes an intro to the question of how to choose Cross_thread_task_loop vs. Segregated_thread_task_loop, at a lower level.While it is a clean interface, realistically speaking the entire existing hierarchy is perhaps best explained by immediately discussing the 2 concrete classes the API as of this writing. (More classes may well be added, but as of this writing and probably for posterity it makes sense to discuss these 2 specific ones.) The available pool types are:
Task_qing_thread
s working together off that post()
-capable engine. N can be specified directly, or it can be auto-determined based on available hardware. The latter will also enable automatically pinning the threads in such a way as to attempt to minimize latency, namely avoiding hyper-threading or other physical core sharing by several hardware threads; this is done by making N = # of physical cores; and pinning each software thread to a distinct group of logical cores (hardware threads), so that each software thread gets its own physical core, avoiding latency-increasing "competition." An attempt is also made (achievable in Linux) to pin them in a consistent way, so that if another pool elsewhere uses the same code and config, they will arrange their same N threads in the same order. This can help if thread i from pool 1 is producer writing to some area in memory, while thread i from pool 2 is consumer of same, reading there. The Cross_thread
part means that each multi-task sequence of callbacks constituting an async::Op, s/t those callbacks must not execute concurrently, may use more than 1 thread (internally, via the boost.asio util::Strand mechanism) which theoretically can improve use of thread time with asymmetrical load. It might also negate per-thread cache locality, etc., and counter-act the effectiveness of aforementioned pinning.Cross_thread
means cross-thread.Task_qing_thread
s working together, each with its own post()
-capable Task_engine queue, meaning by contrast with Cross_thread_task_loop a given Op
always executes its tasks on the same thread. Otherwise it works the same. Under asymmetrical load it might not use all available cross-thread time; however, it arguably also works straightforwardly "synergistically" with any attempts at per-processor-core pinning.The use is, of course, identical via the common API Concurrent_task_loop.
Apologies for the conversational comment. The internal subtleties are encapsulated and hidden from user. Yet there is considerable flexibility available. One can think of this as a convenient wrapper around various functionality typically used manually and separately from each other – simplifying the core interface to just async::Op and post() and providing automatic flexibility as to what functionality is in fact used and when as a result. The functionality accessible: Task_engine::post()
; scheduling via util::Strand; scheduling on specific thread; non-concurrency guarantees of 2+ tasks in one async op; and thread-count selection and pinning based on available processor architecture (hardware threads, physical cores).
Definition at line 322 of file concurrent_task_loop.hpp.
using flow::async::Concurrent_task_loop::Thread_init_func = Function<void (size_t thread_idx)> |
Short-hand for the thread-initializer-function optional arg type to start().
Definition at line 329 of file concurrent_task_loop.hpp.
|
overridedefault |
|
pure virtual |
Return a new Op which can bundle together an arbitrary set of post()
s that would result in the provided task functions executing non-concurrently.
That's informal; the formal semantics of what async::Op means are in async::Op doc header. Informally: please recall that a copy (of a copy, of a copy, ...) of an Op
is an equivalent Op
, and copying them is light-weight (at worst like copying shared_ptr
).
All Op
s shall remain valid throughout the lifetime of *this
.
This is the more general method of obtaining an async::Op, vs. going through per_thread_ops(). It should be used unless you specifically need to access some per-thread resource in the associated Task
s. See class doc header for more discussion on this dichotomy. TL;DR: Use create_op() by default, unless the Task
s you plan to execute are working on some per-thread resource.
Implemented in flow::async::Segregated_thread_task_loop, flow::async::Timed_concurrent_task_loop_impl< Time_accumulator >, flow::async::Timed_concurrent_task_loop_impl< std::atomic< perf::duration_rep_t > >, flow::async::Timed_concurrent_task_loop_impl< perf::duration_rep_t >, and flow::async::Cross_thread_task_loop.
|
pure virtual |
How many threads does start() start?
Implemented in flow::async::Segregated_thread_task_loop, flow::async::Timed_concurrent_task_loop_impl< Time_accumulator >, flow::async::Timed_concurrent_task_loop_impl< std::atomic< perf::duration_rep_t > >, flow::async::Timed_concurrent_task_loop_impl< perf::duration_rep_t >, and flow::async::Cross_thread_task_loop.
|
pure virtual |
Returns the optional-use, pre-created collection of per-thread async::Op objects, such that the i-th Op
therein corresponds to the i-th (of N, where N = # of threads in this pool) thread.
All Op
s and this Op_list&
shall remain valid throughout the lifetime of *this
.
This is an advanced/legacy-ish feature. Please see class doc header for discussion on when one should use this as opposed to the simpler create_op().
Implemented in flow::async::Segregated_thread_task_loop, flow::async::Timed_concurrent_task_loop_impl< Time_accumulator >, flow::async::Timed_concurrent_task_loop_impl< std::atomic< perf::duration_rep_t > >, flow::async::Timed_concurrent_task_loop_impl< perf::duration_rep_t >, and flow::async::Cross_thread_task_loop.
|
pure virtual |
Identical to the other post() with the added constraint that no other Task
also similarly posted with the equivalent async::Op may execute concurrently.
See doc header for async::Op for a formal definition of what this call does w/r/t async::Op.
Reminder: This is thread-safe as explained in class doc header.
op | The (presumably) multi-async-step operation to which task belongs, such that no Task s associated with op may execute concurrently with task . If op.empty() (a/k/a op == Op() , recalling that Op() is null/sentinel), then assert() trips. |
task | See other post(). |
synchronicity | See other post(). |
Implemented in flow::async::Segregated_thread_task_loop, flow::async::Timed_concurrent_task_loop_impl< Time_accumulator >, flow::async::Timed_concurrent_task_loop_impl< std::atomic< perf::duration_rep_t > >, flow::async::Timed_concurrent_task_loop_impl< perf::duration_rep_t >, and flow::async::Cross_thread_task_loop.
|
pure virtual |
Cause the given Task
(function) to execute within the thread pool as soon as possible, in the first thread available, in otherwise first-come-first-served fashion.
task
may execute concurrently with some other Task
if there are 2+ threads in *this
pool. Meanings of "as soon as possible" and "available" are to be determined by the concrete method implementation. That is, the interface does not promise it'll use literally the first thread to be idle, but informally – all else being equal – that's a great goal.
synchronicity
controls the precise behavior of the "post" operation. Read Synchronicity enum
docs carefully. That said: if left defaulted, post()
works in the Task_engine::post()
manner: return immediately; then execute either concurrently in another thread or later in the same thread.
This is safe to call after stop(), but task()
will not run until start() (see stop() doc header). Synchronicity::S_ASYNC_AND_AWAIT_CONCURRENT_COMPLETION and Synchronicity::S_ASYNC_AND_AWAIT_CONCURRENT_START modes will, therefore, block infinitely in that case; so don't do that after stop().
Reminder: This is thread-safe as explained in class doc header.
The callback arg would normally be the last arg, by Flow coding style. In this case it isn't, because it is more valuable to make synchronicity
optional (which it can only be if it's the last arg).
task | Task to execute. task object itself may be move d and saved. |
synchronicity | Controls when task() will execute particularly in relation to when this post() call returns. |
Implemented in flow::async::Segregated_thread_task_loop, flow::async::Timed_concurrent_task_loop_impl< Time_accumulator >, flow::async::Timed_concurrent_task_loop_impl< std::atomic< perf::duration_rep_t > >, flow::async::Timed_concurrent_task_loop_impl< perf::duration_rep_t >, and flow::async::Cross_thread_task_loop.
Referenced by flow::async::asio_handler_via_op().
|
pure virtual |
Equivalent to 2-argument schedule_from_now() except one specifies an absolute time point instead of wait duration.
at | See util::schedule_task_at(). |
task | See schedule_from_now(). |
Implemented in flow::async::Segregated_thread_task_loop, flow::async::Timed_concurrent_task_loop_impl< Time_accumulator >, flow::async::Timed_concurrent_task_loop_impl< std::atomic< perf::duration_rep_t > >, flow::async::Timed_concurrent_task_loop_impl< perf::duration_rep_t >, and flow::async::Cross_thread_task_loop.
|
pure virtual |
Equivalent to 3-argument schedule_from_now() except one specifies an absolute time point instead of wait duration.
op | See 3-argument post(). |
at | See util::schedule_task_at(). |
task | See schedule_from_now(). |
Implemented in flow::async::Segregated_thread_task_loop, flow::async::Timed_concurrent_task_loop_impl< Time_accumulator >, flow::async::Timed_concurrent_task_loop_impl< std::atomic< perf::duration_rep_t > >, flow::async::Timed_concurrent_task_loop_impl< perf::duration_rep_t >, and flow::async::Cross_thread_task_loop.
|
pure virtual |
Equivalent to 2-argument post() but execution is scheduled for later, after the given time period passes.
The semantics are, in all ways in which this differs from 2-argument post(), those of util::schedule_task_from_now(). This includes the meaning of the returned value and the nature of util::Scheduled_task. Also, in particular, one can perform actions like canceling, short-firing, and info-access by passing the returned handle into util::scheduled_task_cancel() and others.
util::scheduled_task_*()
function on the returned handle except from within *this
loop's tasks.Concurrent_task_loop::schedule_*()
APIs. Perhaps add bool in_loop_use_only
arg which, if false
, will always disable the single_threaded
optimization internally. At this time it always enables it if n_threads() == 1
which will cause thread un-safety if the returned handle is touched from outside an in-loop task. void
versions of the schedule_*()
APIs should be added which would lack this, as in that case there is no handle to misuse outside the loop.from_now | See util::schedule_task_from_now(). |
task | The task to execute within *this unless successfully canceled. task object itself may be move d and saved. |
Implemented in flow::async::Segregated_thread_task_loop, flow::async::Timed_concurrent_task_loop_impl< Time_accumulator >, flow::async::Timed_concurrent_task_loop_impl< std::atomic< perf::duration_rep_t > >, flow::async::Timed_concurrent_task_loop_impl< perf::duration_rep_t >, and flow::async::Cross_thread_task_loop.
|
pure virtual |
Equivalent to 3-argument post() but execution is scheduled for later, after the given time period passes.
The semantics are, in all ways in which this differs from 3-argument post(), those of util::schedule_task_from_now(). This includes the meaning of the returned value and the nature of util::Scheduled_task.
op | See 3-argument post(). |
from_now | See util::schedule_task_from_now(). |
task | The task to execute within *this , subject to op constraints, unless successfully canceled. task object itself may be move d and saved. |
Implemented in flow::async::Segregated_thread_task_loop, flow::async::Timed_concurrent_task_loop_impl< Time_accumulator >, flow::async::Timed_concurrent_task_loop_impl< std::atomic< perf::duration_rep_t > >, flow::async::Timed_concurrent_task_loop_impl< perf::duration_rep_t >, and flow::async::Cross_thread_task_loop.
|
pure virtual |
Starts all threads in the thread pool; any queued post()
ed (and similar) tasks may begin executing immediately; and any future posted work may execute in these threads.
Calling start() after start() is discouraged and may log a WARNING but is a harmless no-op. See also stop().
The optional init_task_or_empty
arg is a convenience thing. It's equivalent to post(init_task_or_empty, Synchronicity::S_ASYNC_AND_AWAIT_CONCURRENT_COMPLETION)
executed upon return. init_task_or_empty()
will run in the new thread pool; and only once it return
s, start() will return
. Rationale: It has come up in our experience several times that one wants to execute something in the new thread(s) to initialize things, synchronously, before the main work – various async post()
ing and other calls – can begin in earnest. Do note that any tasks enqueued before this start() but after the last stop() or constructor may run first.
Suppose a user-supplied task posted onto a worker thread throws an uncaught exception. This will be handled the same as if that occurred directly in that thread; in other words we don't catch it in any way, not even to re-throw it or manually std::abort()
or anything of that nature. We informally recommend you handle uncaught exceptions in a program-wide SIGABRT handler or equally program-wise custom std::terminate()
(via std::set_terminate()
). We informally recommend that all other threads similarly let any uncaught exception fall through and deal with the fallout at the global program-wide level (to avoid losing precious stack trace information).
Assuming no such uncaught exception is thrown, all threads will run until stop() or the destructor runs and returns.
As noted in the class doc header, all methods are thread-safe on a common *this
unless noted otherwise. To wit: it is not safe to call X.start()
concurrently with X.start()
or with X.stop()
.
Each thread start() starts shall be, soon, blocked by running an event loop (or part of a multi-thread event loop); meaning it will be either blocking waiting for posted tasks/active events or executing posted tasks/event handlers. It is, however, sometimes required to perform setup (usually of a low-level variety) in the thread before the event loop proper begins. (The use case that triggered this feature was wanting to execute Linux setns(CLONE_NEWNET)
to affect the subsequent socket-create calls in that thread.) If needed: pass in a non-empty function as thread_init_func_or_empty
arg; it will receive the thread index 0, 1, ... as the arg. It will run first-thing in the new thread. Subtlety: As of this writing "first-thing" means literally first-thing; it will run before any of the implementing start()'s own code begins. (This may be relaxed in the future to merely "before the event loop is ready for tasks." Then this comment shall be updated.)
Note: start() shall block until all thread_init_func_or_empty()
invocations (if arg not .empty()
) have completed. This can be important, for example, if the actions they are taking require elevated privileges, then this guarantee means one can drop privileges after that. Informally: we intuitively recommend against blocking in this callback, although perhaps some use case might require it. Just be careful.
Consider the specific implementation of the present interface, Segregated_thread_task_loop. Something similar to this feature is possible without this start() optional arg: One can simply post() onto each of the per_thread_ops(), with Synchronicity::S_ASYNC_AND_AWAIT_CONCURRENT_COMPLETION. It'll run as the first task in each thread, as opposed to strictly-speaking first-*thing*, but it's close enough. So why add this to the interface? Well, consider the other implementation, Cross_thread_task_loop. By definition it's not possible to target individual threads in that guy (per_thread_ops() exists but its Op
s are per-thread, not in-thread; they are Strand
s, not threads). So then some other, Cross_thread_task_loop-*only* API would be necessary to get what we need. Hence it made sense to add this as an interface-level feature. Then these asymmetries go away naturally.
init_task_or_empty | Ignored if .empty() (the default). Otherwise init_task_or_empty() shall execute in one of the threads started by this method, delaying the method's return to the caller until init_task_or_empty() returns in said spawned thread. |
thread_init_func_or_empty | If not .empty() == true , thread_init_func_or_empty(thread_idx) shall be executed first-thing in each thread, for all thread_idx in [0, n_threads()). start() will return no sooner than when each such callback has finished. |
Implemented in flow::async::Segregated_thread_task_loop, flow::async::Timed_concurrent_task_loop_impl< Time_accumulator >, flow::async::Timed_concurrent_task_loop_impl< std::atomic< perf::duration_rep_t > >, flow::async::Timed_concurrent_task_loop_impl< perf::duration_rep_t >, and flow::async::Cross_thread_task_loop.
|
pure virtual |
Waits for any ongoing task(s)/completion handler(s) to return; then prevents any further-queued such tasks from running; then gracefully stops/joins all threads in pool; and then returns.
The post-condition is that the worker threads have fully and gracefully exited.
Upon return from this method, any further post()
or more complex async ops can safely be invoked – but they will not do any actual work, and no tasks or completion handlers will run until start(). In particular task_engine() will still return a util::Task_engine, and one can still invoke post()
and async I/O ops on it: doing so won't crash, but it won't do the requested work until start(). (Recall that there are no more threads in which to do this work.) The destructor can then be invoked, at which point obviously one cannot post()
(or anything else like it) either.
This condition is reversible via start(). In fact, *this
starts in the stopped state, and start() is required to make posted tasks actually execute.
Lastly, calling stop() after stop() returns is a harmless no-op. Also note the destructor shall call stop().
As noted in the class doc header, all methods are thread-safe on a common *this
unless noted otherwise. To wit: it is not safe to call X.stop()
concurrently with X.stop()
or with X.start()
.
You may call stop() from within a task/completion handler executing within *this
thread pool. Of course you may also do this from another thread.
This is similar to boost.asio Task_engine::stop()
. At a minimum it is useful, when shutting down the app or module, in the situation where 2+ Concurrent_task_loop
s routinely post work onto each other (or in at least 1 direction). To safely stop all 2+ loops, one would first invoke this stop() method on each Concurrent_task_loop, in any order; having done that destroy (invoke dtor on) each Concurrent_task_loop, also in any order. This way any cross-posting will safely work during the stop() phase (but do nothing on the already-stopped loops); and by the time the destructor-invoking phase begins, no more cross-posting tasks can possibly be executing (as their threads don't even exist by then).
Note, however, that this is as graceful as we can generically guarantee – in that it won't crash/lead to undefined behavior on our account – but it is up to you to ensure your algorithm is robust, in that nothing bad will happen if tasks are suddenly prevented from running. For example, if task A locks some file, while task B later unlocks it, you are the one who must ensure you don't invoke stop() "between" task A and task B. (E.g., invoking it while A runs will let A complete; but it will very possibly prevent B from starting subsequently.) We have no way of knowing to let task B run first and only then stop the thread(s).
Lastly, the stop() and start() mechanism is amenable to dynamically configuring thread behavior such as the number of threads in the pool.
io_context
doc page. Implemented in flow::async::Segregated_thread_task_loop, flow::async::Timed_concurrent_task_loop_impl< Time_accumulator >, flow::async::Timed_concurrent_task_loop_impl< std::atomic< perf::duration_rep_t > >, flow::async::Timed_concurrent_task_loop_impl< perf::duration_rep_t >, and flow::async::Cross_thread_task_loop.
|
pure virtual |
Returns a pointer to an internal util::Task_engine (a/k/a boost.asio io_service
) for the purpose of performing a boost.asio async_*()
action on some boost.asio I/O object in the immediate near future.
The mechanics of using this are explained in Concurrent_task_loop doc header. Using this in any other fashion may lead to undefined behavior, while *this
exists.
async_*()
action on it, the more effective the internal load-balancing. Formally, it is allowed to use it as long as *this
exists (pre-destructor) and even beyond that, though any use beyond that point would pass the reponsibility on providing thread(s) to run()
in becomes the user's. Implemented in flow::async::Segregated_thread_task_loop, flow::async::Timed_concurrent_task_loop_impl< Time_accumulator >, flow::async::Timed_concurrent_task_loop_impl< std::atomic< perf::duration_rep_t > >, flow::async::Timed_concurrent_task_loop_impl< perf::duration_rep_t >, and flow::async::Cross_thread_task_loop.
|
related |
Template specialization model for operation that obtains the underlying execution context, such as a util::Task_engine or util::Strand, stored in an async::Op generated by the given Concurrent_task_loop.
Each subclass (impl) of Concurrent_task_loop shall provide a specialization of this template with Exec_ctx_ptr
template param being the appropriate boost.asio-compatible execution context type for that loop type's Op create_op()
.
The mechanics of using this are explained in Concurrent_task_loop doc header. Beyond that please see the particular specialization's doc header.
Exec_ctx_ptr | A pointer type (raw or smart) pointing to an execution context type satisfying boost.asio's "execution context" concept. As of this writing the known values would be pointers to util::Task_engine and util::Strand, but really it depends on the particular subclass of Concurrent_task_loop for the *loop arg. See its doc header near the particular Concurrent_task_loop subclass. |
loop | Loop object that, one way or another, generated and returned op . |
op | async::Op from *loop from which to extract the execution context object on which you'd like to perform custom boost.asio work. |