Flow 1.0.0
Flow project: Full implementation reference.
task_qing_thread.hpp
Go to the documentation of this file.
1/* Flow
2 * Copyright 2023 Akamai Technologies, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the
5 * "License"); you may not use this file except in
6 * compliance with the License. You may obtain a copy
7 * of the License at
8 *
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in
12 * writing, software distributed under the License is
13 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
14 * CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing
16 * permissions and limitations under the License. */
17
18/// @file
19#pragma once
20
24#include "flow/log/log.hpp"
25#include <boost/move/unique_ptr.hpp>
26#include <boost/thread/future.hpp>
27
28namespace flow::async
29{
30
31/**
32 * Internally used building block of various concrete Concurrent_task_loop subclasses that encapsulates a thread
33 * that spawns at construction time and a dedicated-or-shared util::Task_engine (a/k/a boost.asio `io_service`)
34 * `run()`ning in that new thread.
35 *
36 * This class:
37 * - At construction:
38 * - Takes a ref-counted reference to an existing util::Task_engine, which may be fresh of
39 * posted tasks or not; shared with another Task_qing_thread or not.
40 * - Starts a thread, ultimately executing an indefinitely-running `Task_engine::run()`, thus
41 * either comprising the single thread executing tasks from that `Task_engine`; or joining a pool
42 * of such threads.
43 * - `post(*(this->task_engine()), F)` can be used to queue up tasks to execute; and any already-queued
44 * work on `*(this->task_engine())` may execute in this new thread as well. (This is all thread-safe due to
45 * the thread-safe nature of boost.asio util::Task_engine posting, including across thread creation and
46 * destruction.)
47 * - Ends* the thread and "joins" it in destructor.
48 * - Can also do this earlier (but still irreversibly) via the stop() API.
49 * - *If the `Task_engine` is shared with other `Task_qing_thread`s, per `own_task_engine` ctor arg,
50 * then the `Task_engine::stop()` call must be executed by outside code instead.
51 * - Exits the program (!) if a task posted via task_engine() throws an uncaught exception.
52 * - Provides access to low-level underlying util::Thread via raw_worker_thread();
53 * see async::optimize_pinning_in_thread_pool(). Note util::Thread in turn allows direct native access to
54 * the OS thread object (pthread handle, or whatever).
55 *
56 * ### Design rationale ###
57 * Task_qing_thread() constructor doc header contains discussion worth reading.
58 */
61 private boost::noncopyable
62{
63public:
64 // Constants.
65
66 /**
67 * `exit()` code returned to OS in the event Task_qing_thread chooses to `exit()` the entire program (as of this
68 * writing, this occurs on uncaught exception by a user task).
69 */
70 static const int S_BAD_EXIT;
71
72 // Constructors/destructor.
73
74 /**
75 * Constructs object, immediately spawning new (worker) thread, memorizing a ref-counted reference to
76 * the provided util::Task_engine which may or may not be a fresh one and meant to be shared with other
77 * `Task_qing_thread`s or exclusively by this one. The post-condition of this constructor is:
78 * - If `done_promise_else_block` is null, it's:
79 * - The new thread has started.
80 * - If `init_func_or_empty` is not `.empty()`, then: `init_func_or_empty()` ran *first-thing* in that thread and
81 * returned.
82 * - Note: This is a useful guarantee, if, say, you need to perform some privileged actions at thread setup;
83 * once this post-condition holds, it is safe to drop privileges.
84 * - The thread is ready to participate in task-posting via `post(*this->task_engine(), F)` and
85 * similar.
86 * - If `done_promise_else_block` is *not* null, it's:
87 * - None. However, if one executes `done_promise_else_block->get_future().wait()`, upon return from that
88 * statement, the above if-`done_promise_else_block`-is-null post-conditions will hold.
89 *
90 * @note `!done_promise_else_block` mode is typical. Just wait for construction, then `post()` away. The
91 * other mode is helpful when one has an N-thread pool and wants each Task_qing_thread to initialize
92 * concurrently with the others instead of serially. Then one can pass in N `promise`s to N Task_qing_thread
93 * ctor calls, then wait on the N respective `unique_future`s.
94 *
95 * @note When creating the util::Task_engine `task_engine`, it is usually a good idea for perf to pass the
96 * concurrency-hint arg value 1 (one) if either `own_task_engine == true`, or the overall thread pool simply
97 * will have but 1 thread. This will allow boost.asio to optimize internal locking and such.
98 *
99 * The worker thread started by each Task_qing_thread constructor will exit upon any uncaught exception by one
100 * of the user-supplied `Task`s `post()`ed onto it subsequently. If this occurs, the handler will `exit()` the entire
101 * program with a non-zero code after logging (to `*logger_ptr`) the exception message. (It is informally
102 * recommended that all other threads in the application do the same.)
103 *
104 * Assuming no such uncaught exception is thrown, the thread will run until stop() or the destructor is called and
105 * returns.
106 *
107 * ### Basic concept discussion: To share or not to share (a `Task_engine`)? ###
108 * The choice of `own_task_engine` flag, as of this writing, does not actually affect much of `*this` behavior.
109 * If `true`, then you're saying this is the only thread to run tasks on the `Task_engine` (call it E).
110 * If `false`, it may be shared with other threads. In practice, though, as of this writing, this only controls
111 * whether stop() will perform `E->stop()` (which causes all `E->run()`s to return and hence threads to soon exit)
112 * for you (`true`), or you must do it for the shared E yourself (which has other objects like `*this` associated
113 * with it). Either way, E can be used before or after `*this` thread runs in whatever way one prefers, including:
114 * one can pre-queueing tasks (via `post(*E, F)` and such) for it to join in executing in new thread; one can inherit
115 * any not-yet-executed tasks after stop(), to execute them in some other thread/run().
116 *
117 * That said, despite the small practical impact in *this* class, the decision of whether to assign one
118 * Task_qing_thread (and hence util::Thread and hence native thread) to a `Task_engine` in one-to-one fashion,
119 * versus sharing the latter with more `Task_qing_thread`s, is a central one to your design.
120 * It is central to specifying the pattern of how `post()`ed `Task`s are
121 * spread across actual threads in a pool. In particular, if it's `true` (not shared), then one *must*
122 * select a specific Task_qing_thread (and, therefore, its corresponding worker thread) before
123 * actually `post()`ing; otherwise it will be selected intelligently by boost.asio. On the other hand, if it's
124 * `false` (shared), then to guarantee two tasks FG will not execute
125 * concurrently (<= desirable if they're assigned to one async::Op) one must probably
126 * use a util::Strand. Meanwhile if it IS `true` (not shared), then one can simply guarantee it by posting
127 * onto the same Task_qing_thread (i.e., worker thread)... which is straightforward but theoretically worse at
128 * using available time slices across threads. It's worse that way, but on the other hand thread-to-core-pinning
129 * is arguably more predictable in terms of ultimate effect on performance when `Strand`s aren't used. Plus it
130 * might cause thread-caching perf increases.
131 *
132 * Very informally, and perhaps arguably, the `true` (do-not-share-engine) mode is the legacy way and is how
133 * certain entrenched legacy daemons do it; the `false` (share-engine-among-threads) is the common-sense
134 * boost.asio-leveraging way which might be the default for new applications; but it depends also on perf analysis
135 * of thread caching benefits of the `true` way as well.
136 *
137 * ### Rationale/history ###
138 * Also informally: The hairiness of forcing the user to have to make this decision, and then write potentially
139 * `if`-laden code that subsequently actually posts tasks, is a chief motivation for abstracting such details
140 * behind the interfaces Concurrent_task_loop and async::Op. Then the user gets to just post `Task`s, optionally
141 * tagged with `Op`s to prevent unwanted concurrency, while the aforementioned interfaces will deal with
142 * the different ways of using Task_qing_thread. Therefore, Task_qing_thread is a detail/ class not to be used
143 * by or exposed to the user.
144 *
145 * Historically, a certain proof of concept (PoC) started out by having "user" code deal with `Task_engine`s directly,
146 * quickly morphing to wrap them with Task_qing_thread for ease of use. Then once this PoC desired to have knobs
147 * controlling how tasks are scheduled across threads, without having the "user" code worry about it after
148 * initial thread-pool setup, Task_qing_thread was moved from the public area into detail/, and Concurrent_task_loop
149 * was born (along with with helper type async::Op).
150 *
151 * ### Logging subtlety ###
152 * For convenience, as promised by at least Cross_thread_task_loop::start() doc header: If user has specified
153 * a Config::this_thread_verbosity_override() setting (to reduce or increase log volume temporarily), then
154 * we take it upon ourselves to apply this setting to the spawned thread during exactly the following times:
155 * - Any startup logging in the spawned thread, by `*this`.
156 * - Any shutdown logging in the spawned thread, by `*this`, after stop() triggers thread exit.
157 *
158 * @param logger_ptr
159 * Logger to use for subsequently logging.
160 * @param nickname
161 * Brief, human-readable nickname of the new thread pool, as of this writing for logging only.
162 * @param task_engine
163 * The util::Task_engine E such that the body of the new thread will be essentially `E->run()`.
164 * @param own_task_engine
165 * Essentially, `true` if you do not wish to share `*task_engine` with other `Task_qing_thread`s;
166 * `false` if you do wish to share it with other such threads. See more detailed notes above.
167 * Also see stop().
168 * @param init_func_or_empty
169 * If not `.empty()`, `init_func_or_empty()` shall execute first-thing in the new thread, before internal
170 * code begins the thread's participation in the `*task_engine` event loop (i.e., before task_engine->run()).
171 * @param done_promise_else_block
172 * If null, ctor will block until the thread has started and is ready to participate in
173 * `task_engine()->post()`ing (etc.; see above text). If not null, then it will kick things off asynchronously
174 * and satisfy the `promise *done_promise_else_block` once the thread has started and is ready to p... you
175 * get the idea.
176 */
177 explicit Task_qing_thread(flow::log::Logger* logger_ptr, util::String_view nickname,
178 const Task_engine_ptr& task_engine, bool own_task_engine,
179 boost::promise<void>* done_promise_else_block = 0,
180 Task&& init_func_or_empty = Task());
181
182 /**
183 * stop(), followed by forgetting the `Task_engine` returned by task_engine(); the latter action may
184 * destroy that `Task_engine` synchronously.
185 *
186 * In particular task_engine() shall be destroyed by this destructor, unless you've saved a copy of that `shared_ptr`
187 * elsewhere (particularly in `own_task_engine == false` mode in ctor, it is likely to be saved in another
188 * Task_qing_thread).
189 *
190 * Since stop() has the post-condition that the thread has been joined,
191 * the same post-condition holds for this destructor. It is, of course, safe to call this destructor after
192 * already having called stop().
193 *
194 * @see stop() which is useful when you want the thread to exit/be joined, but the underlying `Task_engine` must
195 * continue to exist for a bit; in particular `post()` on it would execute but do nothing.
196 * Then once you've ensured no more such `post()`s are forthcoming, and hence it's safe,
197 * "finish the job" by destroying `*this`.
198 *
199 * @see task_engine() through which one can obtain a ref-counted util::Task_engine, for example with the idea
200 * to have another thread `task_engine()->run()`, thus inheriting any queued work/tasks and able to enqueue
201 * and execute future ones.
202 */
204
205 // Methods.
206
207 /**
208 * Returns pointer to util::Task_engine such that `post()`ing to it will cause the subsequent asynchronous execution
209 * of that task in a way explained in the Task_qing_thread() constructor doc header. This is the same object
210 * passed to ctor.
211 *
212 * Do note that the user's saving a copy of this pointer can extend the life of the returned `Task_engine`
213 * (which is NOT at all the same as extending the life of raw_worker_thread(); also NOT at all the same as
214 * making it possible to actually execute work which requires threads).
215 *
216 * Could technically be `const`, but `const` usage is OK to be conservative. In spirit, at least, it's not `const`.
217 *
218 * @return See above.
219 */
221
222 /**
223 * Returns the util::Thread -- a thin wrapper around the native OS thread handle -- corresponding to the worker
224 * thread started in constructor.
225 *
226 * The intended use of this is to set thread attributes (such as processor-core
227 * affinity) in a way that won't affect/disturb the concurrently executing thread's ability to execute tasks;
228 * meaning one might grab its native ID and then set some affinity attribute, but it wouldn't (say) suspend the
229 * thread or join it. Slightly informally, then: any such steps ("such" being the informal part) lead to undefined
230 * behavior.
231 *
232 * @return Pointer to `Thread`, not null. Guaranteed valid until destructor is invoked; guaranteed to be
233 * not-a-thread after stop() and not not-a-thread before it.
234 */
236
237 /**
238 * Blocks the calling thread until the constructor-started thread has finished; if the underlying `Task_engine` is not
239 * shared then first signals it to stop executing any further `Task`s, thus causing the constructor-started thread
240 * to in fact finish soon and hence this method to return soon.
241 *
242 * After stop() has returned once already, stop() will immediately return. Concurrently executing stop() from
243 * 2+ different threads leads to undefined behavior.
244 *
245 * In effect: If we own the `Task_engine` (`own_task_engine == true` in constructor), this method
246 * causes the `Task_engine` to stop executing tasks ASAP and then waits as long as necessary for the thread to exit;
247 * then returns. This will be fast if `Task`s are well behaved (do not block).
248 *
249 * In effect: If we share an external `Task_engine` (`own_task_engine == false` in constructor), this
250 * method simply waits for the thread to exit as long as needed. Hence the caller must trigger the shared
251 * `Task_engine` to exit this thread's `Task_engine::run()`. (In particular, `Task_engine::stop()` will do this.)
252 * Otherwise this method will block until then.
253 *
254 * The key fact is that, after this returns, the `Task_engine` returned by task_engine() shall not have been
255 * destroyed by this method. In particular, `post()` on that `Task_engine` object will still work without
256 * undefined behavior/crashing. The `post()`ed function just won't actually run. (It may run on another thread but
257 * not this one, since by definition this thread has been joined.)
258 */
259 void stop();
260
261private:
262 // Data.
263
264 /// See task_engine().
266
267 /// See constructor.
269
270 /// Thread created in constructor. Not-a-thread after stop(); not not-a-thread before stop().
271 boost::movelib::unique_ptr<util::Thread> m_worker_thread;
272}; // class Task_qing_thread
273
274} // namespace flow::async
Internally used building block of various concrete Concurrent_task_loop subclasses that encapsulates ...
bool m_own_task_engine
See constructor.
Task_engine_ptr m_task_engine
See task_engine().
util::Thread * raw_worker_thread()
Returns the util::Thread – a thin wrapper around the native OS thread handle – corresponding to the w...
void stop()
Blocks the calling thread until the constructor-started thread has finished; if the underlying Task_e...
boost::movelib::unique_ptr< util::Thread > m_worker_thread
Thread created in constructor. Not-a-thread after stop(); not not-a-thread before stop().
static const int S_BAD_EXIT
exit() code returned to OS in the event Task_qing_thread chooses to exit() the entire program (as of ...
Task_engine_ptr task_engine()
Returns pointer to util::Task_engine such that post()ing to it will cause the subsequent asynchronous...
Task_qing_thread(flow::log::Logger *logger_ptr, util::String_view nickname, const Task_engine_ptr &task_engine, bool own_task_engine, boost::promise< void > *done_promise_else_block=0, Task &&init_func_or_empty=Task())
Constructs object, immediately spawning new (worker) thread, memorizing a ref-counted reference to th...
~Task_qing_thread()
stop(), followed by forgetting the Task_engine returned by task_engine(); the latter action may destr...
Convenience class that simply stores a Logger and/or Component passed into a constructor; and returns...
Definition: log.hpp:1619
Interface that the user should implement, passing the implementing Logger into logging classes (Flow'...
Definition: log.hpp:1291
Flow module containing tools enabling multi-threaded event loops operating under the asynchronous-tas...
Definition: async_fwd.hpp:75
boost::shared_ptr< util::Task_engine > Task_engine_ptr
Short-hand for reference-counting pointer to a mutable util::Task_engine (a/k/a boost::asio::io_servi...
Definition: async_fwd.hpp:198
Function< void()> Task
Short-hand for a task that can be posted for execution by a Concurrent_task_loop or flow::util::Task_...
Definition: async_fwd.hpp:96
Basic_string_view< char > String_view
Commonly used char-based Basic_string_view. See its doc header.
boost::thread Thread
Short-hand for standard thread class.
Definition: util_fwd.hpp:78