Flow 1.0.2
Flow project: Full implementation reference.
x_thread_task_loop.hpp
Go to the documentation of this file.
1/* Flow
2 * Copyright 2023 Akamai Technologies, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the
5 * "License"); you may not use this file except in
6 * compliance with the License. You may obtain a copy
7 * of the License at
8 *
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in
12 * writing, software distributed under the License is
13 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
14 * CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing
16 * permissions and limitations under the License. */
17
18/// @file
19#pragma once
20
24#include "flow/log/log.hpp"
25#include <boost/move/unique_ptr.hpp>
26#include <vector>
27#include <string>
28
29namespace flow::async
30{
31
32/**
33 * Concrete Concurrent_task_loop that is able to efficiently schedule `Task`s within a given `Op` to execute in
34 * different threads while still properly avoiding concurrency.
35 *
36 * @see Concurrent_task_loop doc header for an overview of this class in the context of sibling classes (alternatives).
37 *
38 * By definition all you need to know to use it is in the super-interface Concurrent_task_loop doc header
39 * (except constructor semantics of course). However, in choosing this vs. a sibling type, one must at least
40 * loosely understand its underlying behavior. To summarize the behavior of this class beyond the interface guarantees:
41 * - A no-`Op` stand-alone `Task` executes simply as soon as any one thread becomes available.
42 * - An `Op`-sharing `Task` may execute in a different thread than another `Task` sharing the same `Op`.
43 * (They just won't execute *concurrently*.) If you're familiar with boost.asio: This internally uses
44 * util::Strand to achieve that behavior.
45 * - per_thread_ops() is organized as follows: If N worker threads are created, then accordingly N strands are
46 * created internally. Internally, `per_thread_ops()[i]` is really the i-th strand, not any thread, and hence even
47 * though it is "per-thread," actual execution may occur in other thread(s) than the i-th. However, user logic
48 * can remain exactly as if it were truly per-thread.
49 * Why? Answer: A thread is a thing where stuff can only execute serially
50 * within it -- but so is a strand! And we have N of those, just like we have N threads; so it works by symmetry.
51 * So if there are N strands across N threads, then subdividing work among the N
52 * strands results in the same non-concurrency guarantees as doing so among the N threads directly. In fact,
53 * it might be somewhat more efficient in using idle thread time across the pool.
54 * - This allows for an interesting experiment if a legacy application is written in an explicitly per-thread way,
55 * maintaining per-thread data structures for example: Suppose one tweaks it to use Concurrent_task_loop.
56 * One can convert its tasks (conceptually speaking) and data structures to refer to
57 * per_thread_ops()-returned N `Op`s, instead of the N threads directly, and further modify it to work with
58 * a Concurrent_task_loop. Now, if the latter is a Segregated_thread_task_loop, then the resulting program
59 * will work identically (functionally speaking) to how it worked before these changes: one has perhaps made it
60 * more conceptual/readable/maintainable, but the thread arch is unchanged. The interesting part is next:
61 * To see whether one can gain efficiency from using a strand-based approach, but
62 * without making any real design changes in this legacy application, all one has to do now is replace the
63 * Segregated_thread_task_loop construction with a Cross_thread_task_loop one. All application logic can remain
64 * exactly identical (after initialization), yet efficiency gains might be achieved for "free" -- simply by
65 * changing the concrete type of the central Concurrent_task_loop. One can
66 * even easily use one or the other based on config, changing between the two in the field.
67 *
68 * @todo Add dynamic configurability of low-level thread/core behavior of Cross_thread_task_loop,
69 * Segregated_thread_task_loop, and the Concurrent_task_loop interface generally, as these parameters
70 * (including `n_threads_or_zero`, `est_hw_core_sharing_helps_algo`, `est_hw_core_pinning_helps_algo`)
71 * can only be set at construction time even though start() and stop() can be invoked anytime.
72 * For instance a non-`virtual` `configure()` method could be added to each Concurrent_task_loop subclass,
73 * potentially with different signatures. Note, also, that the decision to have stop() exit the actual threads,
74 * in addition to making some things easier to code/reason about internally, is also more amenable to these dynamic
75 * changes -- messing with threading behavior dynamically is easier if one can exit and re-spawn the thread pool.
76 *
77 * @internal
78 *
79 * Regarding the dynamic-configurability to-do above: By my (ygoldfel) off-top-of-head estimation,
80 * it's easier to add in Cross_thread_task_loop than in Segregated_thread_task_loop, as there is exactly 1
81 * util::Task_engine per `*this` in the former, while Segregated_thread_task_loop has N of them, 1 per thread.
82 * So changing the desired thread count, in particular, would be more difficult in the latter case, as one must not
83 * lose any enqueued tasks through stop() and start(). It would not be unreasonable to make one more
84 * dynamically-configurable than the other.
85 *
86 * One must also be careful about the legacy-ish mechanism per_thread_ops() in this context.
87 */
90 public log::Log_context,
91 private boost::noncopyable
92{
93public:
94 // Constructors/destructor.
95
96 /**
97 * Constructs object, making it available for post() and similar work, but without starting any threads and hence
98 * without the ability to perform any work; call start() to spawn threads and perform work.
99 *
100 * The arguments `n_threads_or_zero` and ones following it should be viewed as configuration for the next
101 * start() call.
102 *
103 * @param logger_ptr
104 * Logger to use for subsequently logging.
105 * @param nickname
106 * Brief, human-readable nickname of the new thread pool, as of this writing for logging only.
107 * @param n_threads_or_zero
108 * If non-zero, start() will start exactly this many worker threads; and the system will be entirely entrusted
109 * with the question of which thread is assigned to which processor core and any subsequent migration.
110 * If zero, start() attempts to optimize the thread count and thread-to-core relationship itself, per
111 * optimal_worker_thread_count_per_pool() and optimize_pinning_in_thread_pool(). The thread pool will choose
112 * the worker thread count and any thread-to-core relationships as per those methods, having made the
113 * assumptions documented for those functions.
114 * @param est_hw_core_sharing_helps_algo
115 * See optimal_worker_thread_count_per_pool().
116 * @param est_hw_core_pinning_helps_algo
117 * See optimize_pinning_in_thread_pool().
118 * @param hw_threads_is_grouping_collated
119 * See optimize_pinning_in_thread_pool().
120 */
121 explicit Cross_thread_task_loop(log::Logger* logger_ptr, util::String_view nickname,
122 size_t n_threads_or_zero,
123 bool est_hw_core_sharing_helps_algo = false,
124 bool est_hw_core_pinning_helps_algo = false,
125 bool hw_threads_is_grouping_collated = false);
126
127 /// See superclass destructor.
128 ~Cross_thread_task_loop() override;
129
130 // Methods.
131
132 /**
133 * Implements superclass API. In this implementation this essentially involves a single `Task_engine::restart()`,
134 * followed by starting N threads, each of which executing a long-running `Task_engine::run()`.
135 *
136 * ### Logging subtlety for large thread pools ###
137 * If n_threads() is large -- e.g., hundreds -- this method may produce a large number of log::Sev::S_INFO level
138 * log messages to log::Logger at `get_logger()`. There may be a few lines, split among this thread and the
139 * spawned thread, per each of n_threads() threads. In addition stop() and/or the destructor will produce a similar
140 * volume of INFO messages.
141 *
142 * You may, however, use `log::Config::this_thread_verbosity_override_auto(X)` just before start().
143 * X = log::Sev::S_NONE will disable all logging; while X = log::Sev::S_WARNING will skip anything but error
144 * conditions.
145 *
146 * This is not special and can be done anytime in any context: it affects any logging through log::Config
147 * in this thread. In addition: If indeed such an X is set at entry to start(), we will intentionally affect
148 * each spawned thread as well, albeit only around startup and shutdown. This is for your convenience.
149 *
150 * @param init_task_or_empty
151 * See superclass API.
152 * @param thread_init_func_or_empty
153 * See superclass API.
154 */
155 void start(Task&& init_task_or_empty = Task(),
156 const Thread_init_func& thread_init_func_or_empty = Thread_init_func()) override;
157
158 /**
159 * Implements superclass API. In this implementation this essentially boils down to a single `Task_engine::stop()`,
160 * followed by joining each thread started by start().
161 *
162 * ### Logging subtlety for large thread pools ###
163 * A similar note from start() doc header applies here. Again: you may set an override verbosity just ahead of
164 * stop() to reduce or disable logging if desired.
165 *
166 * In addition, for your convenience: Any shutdown logging within each of the spawned (now shutting down) threads
167 * will follow the override (if any) you had specified ahead of start().
168 */
169 void stop() override;
170
171 /**
172 * Implements superclass API.
173 * @return See superclass API.
174 */
175 size_t n_threads() const override;
176
177 /**
178 * Implements superclass API. In this implementation: async::Op has the "weight" of a `shared_ptr<>` and internally
179 * corresponds to a util::Strand.
180 *
181 * @return See superclass API.
182 */
183 Op create_op() override;
184
185 /**
186 * Implements superclass API. In this implementation: Each pre-created `Op` internally corresponds to a
187 * pre-created long-lived util::Strand.
188 *
189 * @return See superclass API.
190 */
191 const Op_list& per_thread_ops() override;
192
193 /**
194 * Implements superclass API. In this implementation: `task` will execute as soon as a thread is available, because
195 * no other applicable work is forthcoming.
196 *
197 * @param task
198 * See superclass API.
199 * @param synchronicity
200 * See superclass API.
201 */
202 void post(Task&& task, Synchronicity synchronicity = Synchronicity::S_ASYNC) override;
203
204 /**
205 * Implements superclass API.
206 *
207 * In this implementation: `task` will execute as soon as a thread is available, because
208 * no other applicable work is forthcoming, plus one more constraint: It will not allow `task` to execute concurrently
209 * to any other `Task` also associated with `op`. The latter is accomplished on account of
210 * `Op` internally being associated with a util::Strand.
211 *
212 * @param op
213 * See superclass API.
214 * @param task
215 * See superclass API.
216 * @param synchronicity
217 * See superclass API.
218 */
219 void post(const Op& op, Task&& task, Synchronicity synchronicity = Synchronicity::S_ASYNC) override;
220
221 /**
222 * Implements superclass API. See also 1-arg post().
223 * @param from_now
224 * See superclass API.
225 * @param task
226 * See superclass API.
227 * @return See superclass API.
228 */
230
231 /**
232 * Implements superclass API. See also 1-arg post().
233 * @param at
234 * See superclass API.
235 * @param task
236 * See superclass API.
237 * @return See superclass API.
238 */
240
241 /**
242 * Implements superclass API. See also 2-arg post().
243 * @param op
244 * See superclass API.
245 * @param from_now
246 * See superclass API.
247 * @param task
248 * See superclass API.
249 * @return See superclass API.
250 */
252 const Fine_duration& from_now, Scheduled_task&& task) override;
253
254 /**
255 * See superclass API. See also 2-arg post().
256 * @param op
257 * See superclass API.
258 * @param at
259 * See superclass API.
260 * @param task
261 * See superclass API.
262 * @return See superclass API.
263 */
265 const Fine_time_pt& at, Scheduled_task&& task) override;
266
267 /**
268 * See superclass API. In this implementation: Internally there is just this `Task_engine`, and that is what this
269 * returns. The load-balancing is performed directly by boost.asio itself. (Reminder: general
270 * `Concurrent_task_loop`-using code would not rely on this implementation information.) (Reminder: be very sure
271 * you understand what will happen if/when you pass this into a boost.asio-compliant I/O object's constructor or
272 * similar. See the warning on this topic in Concurrent_task_loop doc header.)
273 *
274 * @return See superclass API.
275 */
276 Task_engine_ptr task_engine() override;
277
278private:
279 // Types.
280
281 /// Short-hand for smart pointer to Task_qing_thread.
282 using Task_qing_thread_ptr = boost::movelib::unique_ptr<Task_qing_thread>;
283
284 // Data.
285
286 /// See constructor.
287 const std::string m_nickname;
288 /// See constructor. @warning Not safe to use this in lieu of n_threads() when this is in fact zero.
290 /// See constructor.
292 /// See constructor.
294 /// See constructor.
296
297 /// N task-execution-capable worker threads whose lifetimes equal those of `*this`, all sharing #m_shared_task_engine.
298 std::vector<Task_qing_thread_ptr> m_qing_threads;
299
300 /// boost.asio `Task_engine` (a/k/a `io_service`) co-used by all #m_qing_threads.
302
303 /// See per_thread_ops(). The `boost::any` payload is always of the type async::Strand_ptr.
305}; // class Cross_thread_task_loop
306
307// Free functions: in *_fwd.hpp.
308
309} // namespace flow::async
The core flow::async interface, providing an optionally multi-threaded thread pool onto which runnabl...
Function< void(size_t thread_idx)> Thread_init_func
Short-hand for the thread-initializer-function optional arg type to start().
Concrete Concurrent_task_loop that is able to efficiently schedule Tasks within a given Op to execute...
const bool m_est_hw_core_pinning_helps_algo
See constructor.
util::Scheduled_task_handle schedule_from_now(const Fine_duration &from_now, Scheduled_task &&task) override
Implements superclass API.
~Cross_thread_task_loop() override
See superclass destructor.
Op create_op() override
Implements superclass API.
std::vector< Task_qing_thread_ptr > m_qing_threads
N task-execution-capable worker threads whose lifetimes equal those of *this, all sharing m_shared_ta...
Task_engine_ptr m_shared_task_engine
boost.asio Task_engine (a/k/a io_service) co-used by all m_qing_threads.
void start(Task &&init_task_or_empty=Task(), const Thread_init_func &thread_init_func_or_empty=Thread_init_func()) override
Implements superclass API.
size_t n_threads() const override
Implements superclass API.
boost::movelib::unique_ptr< Task_qing_thread > Task_qing_thread_ptr
Short-hand for smart pointer to Task_qing_thread.
Cross_thread_task_loop(log::Logger *logger_ptr, util::String_view nickname, size_t n_threads_or_zero, bool est_hw_core_sharing_helps_algo=false, bool est_hw_core_pinning_helps_algo=false, bool hw_threads_is_grouping_collated=false)
Constructs object, making it available for post() and similar work, but without starting any threads ...
const bool m_hw_threads_is_grouping_collated
See constructor.
const std::string m_nickname
See constructor.
const bool m_est_hw_core_sharing_helps_algo
See constructor.
void stop() override
Implements superclass API.
util::Scheduled_task_handle schedule_at(const Fine_time_pt &at, Scheduled_task &&task) override
Implements superclass API.
const size_t m_n_threads_or_zero
See constructor.
void post(Task &&task, Synchronicity synchronicity=Synchronicity::S_ASYNC) override
Implements superclass API.
Task_engine_ptr task_engine() override
See superclass API.
const Op_list m_per_thread_strands
See per_thread_ops(). The boost::any payload is always of the type async::Strand_ptr.
const Op_list & per_thread_ops() override
Implements superclass API.
Simple, immutable vector-like sequence of N opaque async::Op objects, usually corresponding to N work...
Definition: op.hpp:58
Convenience class that simply stores a Logger and/or Component passed into a constructor; and returns...
Definition: log.hpp:1619
Interface that the user should implement, passing the implementing Logger into logging classes (Flow'...
Definition: log.hpp:1291
Flow module containing tools enabling multi-threaded event loops operating under the asynchronous-tas...
Definition: async_fwd.hpp:75
boost::any Op
An object of this opaque type represents a collection of 1 or more async::Task, past or future,...
Definition: async_fwd.hpp:153
Synchronicity
Enumeration indicating the manner in which asio_exec_ctx_post(), and various boost....
Definition: async_fwd.hpp:223
@ S_ASYNC
Simply post the given task to execute asynchronously in some execution context – as soon as the conte...
boost::shared_ptr< util::Task_engine > Task_engine_ptr
Short-hand for reference-counting pointer to a mutable util::Task_engine (a/k/a boost::asio::io_servi...
Definition: async_fwd.hpp:198
Function< void()> Task
Short-hand for a task that can be posted for execution by a Concurrent_task_loop or flow::util::Task_...
Definition: async_fwd.hpp:96
boost::shared_ptr< Scheduled_task_handle_state > Scheduled_task_handle
Black-box type that represents a handle to a scheduled task as scheduled by schedule_task_at() or sch...
Basic_string_view< char > String_view
Commonly used char-based Basic_string_view. See its doc header.
Fine_clock::duration Fine_duration
A high-res time duration as computed from two Fine_time_pts.
Definition: common.hpp:411
Fine_clock::time_point Fine_time_pt
A high-res time point as returned by Fine_clock::now() and suitable for precise time math in general.
Definition: common.hpp:408