Flow 1.0.1
Flow project: Full implementation reference.
single_thread_task_loop.hpp
Go to the documentation of this file.
1/* Flow
2 * Copyright 2023 Akamai Technologies, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the
5 * "License"); you may not use this file except in
6 * compliance with the License. You may obtain a copy
7 * of the License at
8 *
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in
12 * writing, software distributed under the License is
13 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
14 * CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing
16 * permissions and limitations under the License. */
17
18/// @file
19#pragma once
20
25#include "flow/util/util.hpp"
26
27namespace flow::async
28{
29
30/**
31 * A `Concurrent_task_loop`-related adapter-style class that represents a single-thread task loop; essentially
32 * it is pleasant syntactic sugar around a Concurrent_task_loop impl that cuts out concepts that become irrelevant
33 * with only one thread involved.
34 *
35 * ### Thread safety ###
36 * All methods are thread-safe for read-write on a shared Single_thread_task_loop, after its ctor returns, unless
37 * otherwise specified (but read on). This is highly significant, just as it is highly significant that boost.asio's
38 * `Task_engine::post()` is similarly thread-safe. However, it is *not* safe to call either stop() or start()
39 * concurrently with itself or the other of the two, on the same Single_thread_task_loop.
40 *
41 * ### Rationale ###
42 * A single-thread task/event loop is very common; arguably more common than multi-threaded. While
43 * the Concurrent_task_loop hierarchy's value is things like async::Op (strand/etc.) support, which are multi-threaded
44 * by definition, it also provides significant value via setup/teardown of a thread in a consistent way and other
45 * features that aren't related to concurrency. The author (ygoldfel) therefore kept wanting to use
46 * Concurrent_task_loop hierarchy for single-thread loops; and immediately realized in particular that the 2 choices
47 * of impl -- Cross_thread_task_loop and Segregated_thread_task_loop -- become almost bit-wise-identical to each other
48 * internally when only 1 thread is involved. Therefore the choice of which one to use felt silly. Moreover
49 * both of those classes have internal optimizations when `n_threads_or_zero == 1`.
50 *
51 * So then the author decided to make this class to eliminate having to make this silly choice, when all the needs
52 * are already met regardless of the choice made. It is similar to how `std::queue<T>` is a limited `deque<T>` or
53 * `list<T>`; though arguably even more straightforward, because in our case it makes no difference which underlying
54 * class is used (with `queue<T>` it does). In any case, that's why there's no template parameter for the
55 * underlying impl class in our case (could change in the future).
56 *
57 * @internal
58 * ### Implementation notes / rationale ###
59 * In the past Single_thread_task_loop `private`ly inherited from Cross_thread_task_loop. Now it simple stores one
60 * as a member (composition). In both cases it's a HAS-A relationship, so the diffence is a question of syntactic
61 * sugar and not conceptual. The reason I (ygoldfel) originally used `private` inheritance was purely for
62 * brevity: one could, e.g., write `using Task_loop_impl::stop;` instead of writing a trivial body that just
63 * forwarded to that super-method. The reason I switched to composition was Timed_single_thread_task_loop appeared,
64 * which made it desirable to make methods like post() `virtual` -- meaning to *start* a `virtual` hierarchy
65 * of (some) similarly-named methods, so that Single_thread_task_loop::post() would be `virtual`, and
66 * Timed_single_thread_task_loop::post() would `override` it. This is when trouble started: due to inheriting
67 * from a Concurrent_task_loop with a `virtual` method also named `post()`,
68 * `static_cast<Concurrent_task_loop*>(this)->post()` -- necessary in Timed_single_thread_task_loop for certain
69 * internal reasons -- `virtual` resolution followed the chain down through the `private` inheritance and ended up
70 * invoking Single_thread_task_loop::post() and hence Timed_single_thread_task_loop::post() again, leading to
71 * infinite recursion. In other words, making Single_thread_task_loop::post() `virtual` *continued* the `virtual`
72 * chain of `post()` methods, instead of starting a new chain as planned, even though the inheritance is `private`.
73 * At that point it seemed better -- if a little less pithy -- to give up on that and just use composition.
74 */
76 public log::Log_context, // We log a bit.
77 public util::Null_interface // We have some virtual things.
78{
79public:
80 // Constructors/destructor.
81
82 /**
83 * Constructs object, making it available for post() and similar work, but without starting any thread and hence
84 * without the ability to perform any work; call start() to spawn 1 thread and perform work.
85 *
86 * @param logger_ptr
87 * Logger to use for subsequently logging.
88 * @param nickname
89 * Brief, human-readable nickname of the new loop/thread, as of this writing for logging only.
90 */
91 explicit Single_thread_task_loop(log::Logger* logger_ptr, util::String_view nickname);
92
93 /**
94 * Executes stop() -- see its doc header please -- and then cleans up any resources.
95 * The behavior of stop() has subtle implications, so please be sure to understand what it does.
96 *
97 * It is fine if stop() has already been called and returned.
98 */
99 ~Single_thread_task_loop() override;
100
101 // Methods.
102
103 /**
104 * Starts the 1 thread in the thread pool; any queued `post()`ed (and similar) tasks may begin executing immediately;
105 * and any future posted work may execute in this thread. Calling start() after start() is discouraged and may
106 * log a WARNING but is a harmless no-op. See also stop().
107 *
108 * The optional `init_task_or_empty` arg is a convenience thing. It's equivalent to
109 * `post(init_task_or_empty, Synchronicity::S_ASYNC_AND_AWAIT_CONCURRENT_COMPLETION)` executed upon return.
110 * `init_task_or_empty()` will run in the new thread; and only once it `return`s, start() will `return`.
111 * Rationale: It has come up in our experience several times that one wants to execute something in the new thread
112 * to initialize things, synchronously, before the main work -- various async `post()`ing and other calls -- can begin
113 * in earnest. Do note that any tasks enqueued before this start() but after the last stop() or constructor
114 * may run first.
115 *
116 * The worker thread started will exit upon any uncaught exception by one
117 * of the user-supplied tasks posted onto it subsequently. If this occurs, the handler will `exit()` the entire
118 * program with a non-zero code after logging the exception message. (It is informally recommended that all other
119 * threads in the application do the same.)
120 *
121 * Assuming no such uncaught exception is thrown, all threads will run until stop() or the destructor runs and
122 * returns.
123 *
124 * ### Thread safety ###
125 * All methods are thread-safe on a common `*this` unless noted otherwise.
126 * To wit: it is not safe to call `X.start()` concurrently with `X.start()` or with `X.stop()`.
127 *
128 * ### Implementation ###
129 * In this implementation start() essentially involves a single `Task_engine::restart()`,
130 * followed by starting 1 thread, that thread executing a long-running `Task_engine::run()`.
131 *
132 * ### Rationale for omitting counterpart to Concurrent_task_loop::start() `thread_init_func_or_empty` arg ###
133 * `init_task_or_empty` is sufficient, as there is only one thread. The `thread_init_func_or_empty` guarantee
134 * that it run *first-thing* in the thread is too subtle to worry about in practice; `init_task_or_empty` is fine.
135 *
136 * @param init_task_or_empty
137 * Ignored if `.empty()` (the default). Otherwise `init_task_or_empty()` shall execute in the
138 * thread started by this method, delaying the method's return to the caller until `init_task_or_empty()`
139 * returns in said spawned thread.
140 */
141 void start(Task&& init_task_or_empty = Task());
142
143 /**
144 * Waits for the ongoing task/completion handler -- if one is running -- to return; then prevents any further-queued
145 * such tasks from running; then gracefully stops/joins the worker thread; and then returns.
146 * The post-condition is that the worker thread has fully and gracefully exited.
147 *
148 * Upon return from this method, any further `post()` or more complex async ops can safely be invoked -- but they
149 * will not do any actual work, and no tasks or completion handlers will run until start(). In particular
150 * task_engine() will still return a util::Task_engine, and one can still invoke `post()` and async I/O ops on it:
151 * doing so won't crash, but it won't do the requested work until start(). (Recall that there is no more
152 * threads in which to do this work.) The destructor can then be invoked, at which point obviously one cannot
153 * `post()` (or anything else like it) either.
154 *
155 * This condition is reversible via start(). In fact, `*this` starts in the stopped state, and start() is required
156 * to make posted tasks actually execute.
157 *
158 * Lastly, calling stop() after stop() returns is a harmless no-op. Also note the destructor shall call stop().
159 *
160 * ### Thread safety ###
161 * As noted in the class doc header, all methods are thread-safe on a common `*this` unless noted otherwise.
162 * To wit: it is not safe to call `X.stop()` concurrently with `X.stop()` or with `X.start()`.
163 *
164 * You may call stop() from within a task/completion handler executing within `*this` thread. Of course
165 * you may also do this from another thread.
166 *
167 * ### Rationale ###
168 * See Concurrent_thread_task_loop::stop() doc header's Rationale section. It also lists some useful tip(s) as of
169 * this writing.
170 */
171 void stop();
172
173 /**
174 * Cause the given `Task` (function) to execute within the worker thread as soon as the thread is free of other queued
175 * tasks/completion handlers, in first-come-first-served fashion. `task` may not execute concurrently with some other
176 * `Task`.
177 *
178 * `synchronicity` controls the precise behavior of the "post" operation. Read #Synchronicity `enum` docs carefully.
179 * That said: if left defaulted, `post()` works in the `Task_engine::post()` manner: return immediately; then
180 * execute either concurrently in another thread (if called *not* from within another in-thread task) or later in the
181 * same thread (otherwise).
182 *
183 * This is safe to call after stop(), but `task()` will not run until start() (see stop() doc header).
184 * Synchronicity::S_ASYNC_AND_AWAIT_CONCURRENT_COMPLETION and Synchronicity::S_ASYNC_AND_AWAIT_CONCURRENT_START modes
185 * will, therefore, block infinitely in that case; so don't do that after stop().
186 *
187 * Reminder: This is thread-safe as explained in class doc header.
188 *
189 * ### Rationale notes ###
190 * The callback arg would normally be the last arg, by Flow coding style. In this case it isn't, because
191 * it is more valuable to make `synchronicity` optional (which it can only be if it's the last arg).
192 *
193 * Also, `using` would be nice for brevity, but we don't want to expose the other (irrelevant) `post()` overload
194 * of the `private` superclass.
195 *
196 * @param task
197 * Task to execute. `task` object itself may be `move`d and saved.
198 * @param synchronicity
199 * Controls when `task()` will execute particularly in relation to when this `post()` call returns.
200 */
201 virtual void post(Task&& task, Synchronicity synchronicity = Synchronicity::S_ASYNC);
202
203 /**
204 * Equivalent to post() but execution is scheduled for later, after the given time period passes.
205 *
206 * The semantics are, in all ways in which this differs from post(), those of
207 * util::schedule_task_from_now(). This includes the meaning of the returned value and the nature of
208 * util::Scheduled_task. Also, in particular, one can perform actions like canceling, short-firing, and
209 * info-access by passing the returned handle into util::scheduled_task_cancel() and others.
210 *
211 * @warning You *must* not call any `util::scheduled_task_*()` function on the returned
212 * handle except from within `*this` loop's tasks.
213 *
214 * @todo Deal with the scheduled-tasks-affected-from-outside-loop corner case of the
215 * `Single_thread_task_loop::schedule_*()` APIs. Perhaps add `bool in_loop_use_only` arg
216 * which, if `false`, will always disable the `single_threaded` optimization internally.
217 * At this time it always enables it which will cause thread un-safety if
218 * the returned handle is touched from outside an in-loop task. `void` versions of the `schedule_*()` APIs
219 * should be added which would lack this, as in that case there is no handle to misuse outside the loop.
220 *
221 * @param from_now
222 * See util::schedule_task_from_now().
223 * @param task
224 * The task to execute within `*this` unless successfully canceled.
225 * `task` object itself may be `move`d and saved.
226 * @return See util::schedule_task_from_now().
227 */
229
230 /**
231 * Equivalent to schedule_from_now() except one specifies an absolute time point instead of wait duration.
232 *
233 * @warning See schedule_from_now() warning.
234 *
235 * @param at
236 * See util::schedule_task_at().
237 * @param task
238 * See schedule_from_now().
239 * @return See schedule_from_now().
240 */
242
243 /**
244 * Returns a pointer to *the* internal util::Task_engine (a/k/a boost.asio `io_service`) for the purpose of
245 * performing a boost.asio `async_*()` action on some boost.asio I/O object in the immediate near future.
246 *
247 * @return A mutable util::Task_engine to use soon. It is *allowed* to use it as long as
248 * `*this` exists (pre-destructor) and even beyond that, though any use beyond that point would pass the
249 * reponsibility on providing thread(s) to `run()` in becomes the user's.
250 */
252
253 /**
254 * Returns `true` if and only if the thread executing this call is the thread started by start().
255 * Returns `false` otherwise, including if start() has not yet been called.
256 *
257 * This may be useful if a task may be executed from an outside thread or posted through `*this` loop.
258 * E.g., if `true` it can synchronously execute some other task; if `false` `post()` it
259 * with Synchronicity::S_ASYNC_AND_AWAIT_CONCURRENT_START or Synchronicity::S_ASYNC_AND_AWAIT_CONCURRENT_COMPLETION
260 * to ensure it not stoppable (by stop() or dtor) or even fully finishes synchronously, respectively.
261 * Using either of those `Synchronicity` values while executing while in_thread() returns `true` assures
262 * a thread hang instead.
263 *
264 * ### Thread safety ###
265 * For a given `*this` safe to execute concurrently with any method except start(). It is also safe to execute
266 * in the optional init-task given to start().
267 *
268 * @return See above.
269 */
270 bool in_thread() const;
271
272protected:
273 // Methods.
274
275 /**
276 * Returns the underlying work-horse Concurrent_task_loop.
277 * @return See above.
278 */
280
281private:
282 // Types.
283
284 /// The class an instance of which we hold (in HAS-A fashion, not IS-A fashion).
286
287 // Data.
288
289 /// See underlying_loop().
291
292 /// Before start() it is default-cted (not-a-thread); from start() on it's the started thread's ID.
294}; // class Single_thread_task_loop
295
296/**
297 * Identical to Single_thread_task_loop, but all tasks posted through it are automatically timed, with the result
298 * accessible via accumulated_time().
299 *
300 * Using a Timed_single_thread_task_loop is identical to using Single_thread_task_loop; except -- as with
301 * a Timed_concurrent_task_loop vs. plain Concurrent_task_loop -- any "naked" handler `F` passed directly to
302 * a boost.asio `async_...(..., F)` operation should be wrapped by asio_handler_timed(), so that it too is timed.
303 * Hence it would become: `async_...(..., L->asio_handler_timed(F))`.
304 *
305 * To access timing results use added API accumulated_time().
306 */
308{
309public:
310 // Constructors/destructor.
311
312 /**
313 * Constructs object, similarly to Single_thread_task_loop ctor; but with timing capabilities
314 * a-la Timed_concurrent_task_loop ctor.
315 *
316 * @param logger_ptr
317 * See Single_thread_task_loop ctor.
318 * @param nickname
319 * See Single_thread_task_loop ctor.
320 * @param clock_type
321 * See Timed_concurrent_task_loop_impl ctor.
322 */
323 explicit Timed_single_thread_task_loop(log::Logger* logger_ptr, util::String_view nickname,
325
326 // Methods.
327
328 /**
329 * Implements superclass method but with the addition of timing of `task`, accessible through
330 * accumulated_time().
331 *
332 * @see Single_thread_task_loop::post() doc header for the essential documentation outside of timing.
333 *
334 * @param task
335 * See superclass method.
336 * @param synchronicity
337 * See superclass method.
338 */
339 void post(Task&& task, Synchronicity synchronicity = Synchronicity::S_ASYNC) override;
340
341 /**
342 * Implements superclass method but with the addition of timing of `task`, accessible through
343 * accumulated_time().
344 *
345 * @see Single_thread_task_loop::schedule_from_now() doc header for the essential documentation outside of timing.
346 *
347 * @param from_now
348 * See superclass method.
349 * @param task
350 * See superclass method.
351 * @return See superclass method.
352 */
354
355 /**
356 * Implements superclass method but with the addition of timing of `task`, accessible through
357 * accumulated_time().
358 *
359 * @see Single_thread_task_loop::schedule_at() doc header for the essential documentation outside of timing.
360 *
361 * @param at
362 * See superclass method.
363 * @param task
364 * See superclass method.
365 * @return See superclass method.
366 */
368
369 /**
370 * See Timed_concurrent_task_loop_impl::accumulated_time().
371 * @return See above.
372 */
374
375 /**
376 * See Timed_concurrent_task_loop_impl::asio_handler_timed().
377 *
378 * @tparam Handler
379 * See Timed_concurrent_task_loop_impl::asio_handler_timed().
380 * @param handler_moved
381 * See Timed_concurrent_task_loop_impl::asio_handler_timed().
382 * @return See Timed_concurrent_task_loop_impl::asio_handler_timed().
383 */
384 template<typename Handler>
385 auto asio_handler_timed(Handler&& handler_moved);
386
387private:
388 // Data.
389
390 /**
391 * The task-timing decorator of our superclass Single_thread_task_loop's `protected` superclass.
392 * As required, its lifetime (being a base class) is >= that of this decorating #m_timed_loop.
393 */
395}; // class Timed_single_thread_task_loop
396
397// Template implementations.
398
399template<typename Handler>
401{
402 return m_timed_loop.asio_handler_timed(std::move(handler_moved));
403}
404
405} // namespace flow::async
The core flow::async interface, providing an optionally multi-threaded thread pool onto which runnabl...
Concrete Concurrent_task_loop that is able to efficiently schedule Tasks within a given Op to execute...
A Concurrent_task_loop-related adapter-style class that represents a single-thread task loop; essenti...
void start(Task &&init_task_or_empty=Task())
Starts the 1 thread in the thread pool; any queued post()ed (and similar) tasks may begin executing i...
virtual util::Scheduled_task_handle schedule_at(const Fine_time_pt &at, Scheduled_task &&task)
Equivalent to schedule_from_now() except one specifies an absolute time point instead of wait duratio...
Task_loop_impl m_underlying_loop
See underlying_loop().
virtual util::Scheduled_task_handle schedule_from_now(const Fine_duration &from_now, Scheduled_task &&task)
Equivalent to post() but execution is scheduled for later, after the given time period passes.
Task_engine_ptr task_engine()
Returns a pointer to the internal util::Task_engine (a/k/a boost.asio io_service) for the purpose of ...
util::Thread_id m_started_thread_id_or_none
Before start() it is default-cted (not-a-thread); from start() on it's the started thread's ID.
Concurrent_task_loop * underlying_loop()
Returns the underlying work-horse Concurrent_task_loop.
~Single_thread_task_loop() override
Executes stop() – see its doc header please – and then cleans up any resources.
void stop()
Waits for the ongoing task/completion handler – if one is running – to return; then prevents any furt...
bool in_thread() const
Returns true if and only if the thread executing this call is the thread started by start().
virtual void post(Task &&task, Synchronicity synchronicity=Synchronicity::S_ASYNC)
Cause the given Task (function) to execute within the worker thread as soon as the thread is free of ...
Single_thread_task_loop(log::Logger *logger_ptr, util::String_view nickname)
Constructs object, making it available for post() and similar work, but without starting any thread a...
auto asio_handler_timed(Handler &&handler_moved)
Given a boost.asio completion handler handler for a boost.asio async_*() action on some boost....
Identical to Single_thread_task_loop, but all tasks posted through it are automatically timed,...
Timed_single_thread_task_loop(log::Logger *logger_ptr, util::String_view nickname, perf::Clock_type clock_type=perf::Clock_type::S_CPU_THREAD_TOTAL_HI_RES)
Constructs object, similarly to Single_thread_task_loop ctor; but with timing capabilities a-la Timed...
Timed_concurrent_task_loop_impl< perf::duration_rep_t > m_timed_loop
The task-timing decorator of our superclass Single_thread_task_loop's protected superclass.
auto asio_handler_timed(Handler &&handler_moved)
See Timed_concurrent_task_loop_impl::asio_handler_timed().
util::Scheduled_task_handle schedule_at(const Fine_time_pt &at, Scheduled_task &&task) override
Implements superclass method but with the addition of timing of task, accessible through accumulated_...
void post(Task &&task, Synchronicity synchronicity=Synchronicity::S_ASYNC) override
Implements superclass method but with the addition of timing of task, accessible through accumulated_...
perf::Duration accumulated_time()
See Timed_concurrent_task_loop_impl::accumulated_time().
util::Scheduled_task_handle schedule_from_now(const Fine_duration &from_now, Scheduled_task &&task) override
Implements superclass method but with the addition of timing of task, accessible through accumulated_...
Convenience class that simply stores a Logger and/or Component passed into a constructor; and returns...
Definition: log.hpp:1619
Interface that the user should implement, passing the implementing Logger into logging classes (Flow'...
Definition: log.hpp:1291
An empty interface, consisting of nothing but a default virtual destructor, intended as a boiler-plat...
Definition: util.hpp:45
Flow module containing tools enabling multi-threaded event loops operating under the asynchronous-tas...
Definition: async_fwd.hpp:75
Synchronicity
Enumeration indicating the manner in which asio_exec_ctx_post(), and various boost....
Definition: async_fwd.hpp:223
@ S_ASYNC
Simply post the given task to execute asynchronously in some execution context – as soon as the conte...
boost::shared_ptr< util::Task_engine > Task_engine_ptr
Short-hand for reference-counting pointer to a mutable util::Task_engine (a/k/a boost::asio::io_servi...
Definition: async_fwd.hpp:198
Function< void()> Task
Short-hand for a task that can be posted for execution by a Concurrent_task_loop or flow::util::Task_...
Definition: async_fwd.hpp:96
Fine_duration Duration
Short-hand for a high-precision boost.chrono duration, formally equivalent to flow::Fine_duration.
Clock_type
Clock types supported by flow::perf module facilities, perf::Checkpointing_timer in particular.
@ S_CPU_THREAD_TOTAL_HI_RES
Similar to S_CPU_TOTAL_HI_RES but applied to the calling thread as opposed to entire process.
Thread::id Thread_id
Short-hand for an OS-provided ID of a util::Thread.
Definition: util_fwd.hpp:90
boost::shared_ptr< Scheduled_task_handle_state > Scheduled_task_handle
Black-box type that represents a handle to a scheduled task as scheduled by schedule_task_at() or sch...
Basic_string_view< char > String_view
Commonly used char-based Basic_string_view. See its doc header.
Fine_clock::duration Fine_duration
A high-res time duration as computed from two Fine_time_pts.
Definition: common.hpp:410
Fine_clock::time_point Fine_time_pt
A high-res time point as returned by Fine_clock::now() and suitable for precise time math in general.
Definition: common.hpp:407