Flow 1.0.1
Flow project: Full implementation reference.
segregated_thread_task_loop.hpp
Go to the documentation of this file.
1/* Flow
2 * Copyright 2023 Akamai Technologies, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the
5 * "License"); you may not use this file except in
6 * compliance with the License. You may obtain a copy
7 * of the License at
8 *
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in
12 * writing, software distributed under the License is
13 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
14 * CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing
16 * permissions and limitations under the License. */
17
18/// @file
19#pragma once
20
24#include "flow/log/log.hpp"
25#include <boost/move/unique_ptr.hpp>
26#include <vector>
27#include <string>
28
29namespace flow::async
30{
31
32/**
33 * Concrete Concurrent_task_loop that uses the legacy pin-`Task`s-within-1-`Op`-to-1-thread method of achieving
34 * required non-concurrency of `Task`s.
35 *
36 * @see Concurrent_task_loop doc header for an overview of this class in the context of sibling classes (alternatives).
37 *
38 * By definition all you need to know to use it is in the super-interface Concurrent_task_loop doc header
39 * (except constructor semantics of course). However, in choosing this vs. a sibling type, one must at least
40 * loosely understand its underlying behavior. To summarize the behavior of this class beyond the interface guarantees:
41 * - A no-`Op` stand-alone `Task` executes as soon as thread T becomes available, and T is randomly chosen.
42 * - An `Op`-sharing `Task` will *always* execute in the same thread T as the first `Task`; and that T is randomly
43 * chosen at the time of that 1st Task. This ensures they won't execute concurrently by definition.
44 * - per_thread_ops() is organized as follows: If N worker threads are created, then, internally,
45 * `per_thread_ops()[i]` is really the i-th thread. Very simple.
46 * - See discussion in Cross_thread_task_loop doc header about an "interesting experiment" related to us.
47 *
48 * @internal
49 *
50 * @todo In Segregated_thread_task_loop, when dealing with the
51 * 2-arg `post()` (the one taking an async::Op and an async::Task) and similar
52 * `schedule_*()` methods -- consider postponing the random thread selection until the last possible moment, as
53 * opposed to in create_op(). Given that Segregated_thread_task_loop is legacy-ish in nature, the resulting
54 * significant increase in internal complexity may or may not be worth accepting. This to-do also makes little
55 * practical difference without the nearby to-do that would bias in favor of less-loaded threads.
56 *
57 * @todo In Segregated_thread_task_loop, when randomly selecting a thread, considering trying to bias in favor of
58 * threads with less load (perhaps measured as # of tasks enqueued in a given thread's `Task_engine`); and/or
59 * round-robin thread assignment; and/or other heuristics.
60 */
63 public log::Log_context,
64 private boost::noncopyable
65{
66public:
67 // Constructors/destructor.
68
69 /**
70 * Constructs object, making it available for post() and similar work, but without starting any threads and hence
71 * without the ability to perform any work; call start() to spawn threads and perform work.
72 *
73 * The arguments `n_threads_or_zero` and ones following it should be viewed as configuration for the next
74 * start() call.
75 *
76 * @param logger_ptr
77 * Logger to use for subsequently logging.
78 * @param nickname
79 * Brief, human-readable nickname of the new thread pool, as of this writing for logging only.
80 * @param n_threads_or_zero
81 * If non-zero, start() will start exactly this many worker threads; and the system will be entirely entrusted
82 * with the question of which thread is assigned to which processor core and any subsequent migration.
83 * If zero, start() attempts to optimize the thread count and thread-to-core relationship itself, per
84 * optimal_worker_thread_count_per_pool() and optimize_pinning_in_thread_pool(). The thread pool will choose
85 * the worker thread count and any thread-to-core relationships as per those methods, having made the
86 * assumptions documented for those functions.
87 * @param est_hw_core_sharing_helps_algo
88 * See optimal_worker_thread_count_per_pool().
89 * @param est_hw_core_pinning_helps_algo
90 * See optimize_pinning_in_thread_pool().
91 * @param hw_threads_is_grouping_collated
92 * See optimize_pinning_in_thread_pool().
93 */
94 explicit Segregated_thread_task_loop(log::Logger* logger_ptr, util::String_view nickname,
95 size_t n_threads_or_zero,
96 bool est_hw_core_sharing_helps_algo = false,
97 bool est_hw_core_pinning_helps_algo = false,
98 bool hw_threads_is_grouping_collated = false);
99
100 /// See superclass destructor.
102
103 // Methods.
104
105 /**
106 * Implements superclass API. In this implementation this essentially involves repeating N times:
107 * `Task_engine::restart()`; spawn thread; in it execute a long-running `Task_engine::run()`.
108 *
109 * ### Logging subtlety for large thread pools ###
110 * See Cross_thread_task_loop::start() doc header similar section: The exact same note applies here.
111 *
112 * @param init_task_or_empty
113 * See superclass API.
114 * @param thread_init_func_or_empty
115 * See superclass API.
116 */
117 void start(Task&& init_task_or_empty = Task(),
118 const Thread_init_func& thread_init_func_or_empty = Thread_init_func()) override;
119
120 /**
121 * Implements superclass API. In this implementation this essentially boils down to N `Task_engine::stop()`s,
122 * followed by joining each thread started by start().
123 *
124 * ### Logging subtlety for large thread pools ###
125 * See Cross_thread_task_loop::stop() doc header similar section: The exact same note applies here.
126 */
127 void stop() override;
128
129 /**
130 * Implements superclass API.
131 * @return See superclass API.
132 */
133 size_t n_threads() const override;
134
135 /**
136 * Implements superclass API. In this implementation: async::Op has the "weight" of a raw pointer and internally
137 * corresponds to a thread.
138 *
139 * @return See superclass API.
140 */
141 Op create_op() override;
142
143 /**
144 * Implements superclass API. In this implementation: Each pre-created `Op` internally corresponds (1-to-1 overall)
145 * to one of the worker threads spawned at construction.
146 *
147 * @return See superclass API.
148 */
149 const Op_list& per_thread_ops() override;
150
151 /**
152 * Implements superclass API. In this implementation: `task` will execute as soon as thread T is available, because
153 * no other applicable work is forthcoming; and T is randomly chosen by the implementation.
154 *
155 * @param task
156 * See superclass API.
157 * @param synchronicity
158 * See superclass API.
159 */
160 void post(Task&& task, Synchronicity synchronicity = Synchronicity::S_ASYNC) override;
161
162 /**
163 * Implements superclass API. In this implementation: identical to 1-arg post() for the *first* async::Task to be
164 * associated with `op` (randomly choose T, then basically 1-arg-post() equivalent); for subsequent such `Task`s
165 * act similarly but simply keep that same T.
166 *
167 * @param op
168 * See superclass API.
169 * @param task
170 * See superclass API.
171 * @param synchronicity
172 * See superclass API.
173 */
174 void post(const Op& op, Task&& task, Synchronicity synchronicity = Synchronicity::S_ASYNC) override;
175
176 /**
177 * Implements superclass API. See also 1-arg post().
178 * @param from_now
179 * See superclass API.
180 * @param task
181 * See superclass API.
182 * @return See superclass API.
183 */
185
186 /**
187 * Implements superclass API. See also 1-arg post().
188 * @param at
189 * See superclass API.
190 * @param task
191 * See superclass API.
192 * @return See superclass API.
193 */
195
196 /**
197 * Implements superclass API. See also 2-arg post().
198 * @param op
199 * See superclass API.
200 * @param from_now
201 * See superclass API.
202 * @param task
203 * See superclass API.
204 * @return See superclass API.
205 */
207 const Fine_duration& from_now, Scheduled_task&& task) override;
208
209 /**
210 * See superclass API. See also 2-arg post().
211 * @param op
212 * See superclass API.
213 * @param at
214 * See superclass API.
215 * @param task
216 * See superclass API.
217 * @return See superclass API.
218 */
220 const Fine_time_pt& at, Scheduled_task&& task) override;
221
222 /**
223 * See superclass API. In this implementation: As with create_op(), this selects a thread from the pool randomly,
224 * and each thread has a 1-1 dedicated internal `Task_engine`. The load-balancing is hence performed, by us,
225 * at the time of this call -- not later when one does the `async_*()` action. (Reminder: general
226 * `Concurrent_task_loop`-using code would not rely on this implementation information.) (Reminder: be very sure
227 * you understand what will happen if/when you pass this into a boost.asio-compliant I/O object's constructor or
228 * similar. See the warning on this topic in Concurrent_task_loop doc header.)
229 *
230 * @return See superclass API.
231 */
232 Task_engine_ptr task_engine() override;
233
234private:
235 // Types.
236
237 /// Short-hand for smart pointer to Task_qing_thread.
238 using Task_qing_thread_ptr = boost::movelib::unique_ptr<Task_qing_thread>;
239
240 // Methods.
241
242 /**
243 * Helper performing the core `Task_engine::post()` (or similar) call on behalf of the various `post()` overloads.
244 *
245 * @param chosen_task_engine
246 * The value `m_qing_threads[idx].task_engine().` for some valid `idx`: the engine for the thread selected
247 * to execute `task`.
248 * @param synchronicity
249 * See any post().
250 * @param task
251 * See any post().
252 */
253 void post_impl(const Task_engine_ptr& chosen_task_engine, Synchronicity synchronicity, Task&& task);
254
255 /**
256 * Helper performing the core util::schedule_task_from_now() call on behalf of the various
257 * `schedule_from_now()` overloads.
258 *
259 * @param chosen_task_engine
260 * See post_impl().
261 * @param from_now
262 * See any schedule_from_now().
263 * @param task
264 * See any schedule_from_now().
265 * @return See any schedule_from_now().
266 */
268 const Fine_duration& from_now,
269 Scheduled_task&& task);
270
271 // Data.
272
273 /// See constructor.
274 const std::string m_nickname;
275 /// See constructor. @warning Not safe to use this in lieu of n_threads() when this is in fact zero.
277 /// See constructor.
279 /// See constructor.
281 /// See constructor.
283
284 /**
285 * N task-execution-capable worker threads whose lifetimes equal those of `*this`, each with its own
286 * util::Task_engine. Due to the latter fact, util::Strand would not be useful to implement async::Op
287 * semantics; as every `Task_engine` involved `run()`s within one thread only.
288 *
289 * @warning Outside of ctor, this datum must not be accessed except in start() and stop(), as that would break
290 * the thread-safety guarantee wherein post() (and similar) are safe to call concurrently with
291 * start() or stop(). All such work shall be done through #m_task_engines exclusively.
292 */
293 std::vector<Task_qing_thread_ptr> m_qing_threads;
294
295 /**
296 * boost.asio `Task_engine`s (a/k/a `io_service`s) used by each respective element in #m_qing_threads.
297 * It is critical that this not be modified after constructor returns, for the same thread safety reason mentioned
298 * in the warning in #m_qing_threads doc header.
299 */
300 std::vector<Task_engine_ptr> m_task_engines;
301
302 /**
303 * See per_thread_ops(). The `boost::any` payload is always of the type `Task_engine_ptr`.
304 * The latter not being something else, namely Task_qing_thread* or similar, is due to the same thread safety
305 * reason mentioned in the warning in #m_qing_threads doc header.
306 */
307 boost::movelib::unique_ptr<const Op_list> m_per_thread_ops;
308}; // class Segregated_thread_task_loop
309
310// Free functions: in *_fwd.hpp.
311
312} // namespace flow::async
The core flow::async interface, providing an optionally multi-threaded thread pool onto which runnabl...
Function< void(size_t thread_idx)> Thread_init_func
Short-hand for the thread-initializer-function optional arg type to start().
Simple, immutable vector-like sequence of N opaque async::Op objects, usually corresponding to N work...
Definition: op.hpp:58
Concrete Concurrent_task_loop that uses the legacy pin-Tasks-within-1-Op-to-1-thread method of achiev...
std::vector< Task_qing_thread_ptr > m_qing_threads
N task-execution-capable worker threads whose lifetimes equal those of *this, each with its own util:...
Segregated_thread_task_loop(log::Logger *logger_ptr, util::String_view nickname, size_t n_threads_or_zero, bool est_hw_core_sharing_helps_algo=false, bool est_hw_core_pinning_helps_algo=false, bool hw_threads_is_grouping_collated=false)
Constructs object, making it available for post() and similar work, but without starting any threads ...
size_t n_threads() const override
Implements superclass API.
boost::movelib::unique_ptr< const Op_list > m_per_thread_ops
See per_thread_ops().
const Op_list & per_thread_ops() override
Implements superclass API.
util::Scheduled_task_handle schedule_from_now(const Fine_duration &from_now, Scheduled_task &&task) override
Implements superclass API.
void start(Task &&init_task_or_empty=Task(), const Thread_init_func &thread_init_func_or_empty=Thread_init_func()) override
Implements superclass API.
Task_engine_ptr task_engine() override
See superclass API.
void post_impl(const Task_engine_ptr &chosen_task_engine, Synchronicity synchronicity, Task &&task)
Helper performing the core Task_engine::post() (or similar) call on behalf of the various post() over...
Op create_op() override
Implements superclass API.
void stop() override
Implements superclass API.
util::Scheduled_task_handle schedule_from_now_impl(const Task_engine_ptr &chosen_task_engine, const Fine_duration &from_now, Scheduled_task &&task)
Helper performing the core util::schedule_task_from_now() call on behalf of the various schedule_from...
boost::movelib::unique_ptr< Task_qing_thread > Task_qing_thread_ptr
Short-hand for smart pointer to Task_qing_thread.
void post(Task &&task, Synchronicity synchronicity=Synchronicity::S_ASYNC) override
Implements superclass API.
~Segregated_thread_task_loop() override
See superclass destructor.
std::vector< Task_engine_ptr > m_task_engines
boost.asio Task_engines (a/k/a io_services) used by each respective element in m_qing_threads.
util::Scheduled_task_handle schedule_at(const Fine_time_pt &at, Scheduled_task &&task) override
Implements superclass API.
Convenience class that simply stores a Logger and/or Component passed into a constructor; and returns...
Definition: log.hpp:1619
Interface that the user should implement, passing the implementing Logger into logging classes (Flow'...
Definition: log.hpp:1291
Flow module containing tools enabling multi-threaded event loops operating under the asynchronous-tas...
Definition: async_fwd.hpp:75
boost::any Op
An object of this opaque type represents a collection of 1 or more async::Task, past or future,...
Definition: async_fwd.hpp:153
Synchronicity
Enumeration indicating the manner in which asio_exec_ctx_post(), and various boost....
Definition: async_fwd.hpp:223
@ S_ASYNC
Simply post the given task to execute asynchronously in some execution context – as soon as the conte...
boost::shared_ptr< util::Task_engine > Task_engine_ptr
Short-hand for reference-counting pointer to a mutable util::Task_engine (a/k/a boost::asio::io_servi...
Definition: async_fwd.hpp:198
Function< void()> Task
Short-hand for a task that can be posted for execution by a Concurrent_task_loop or flow::util::Task_...
Definition: async_fwd.hpp:96
boost::shared_ptr< Scheduled_task_handle_state > Scheduled_task_handle
Black-box type that represents a handle to a scheduled task as scheduled by schedule_task_at() or sch...
Basic_string_view< char > String_view
Commonly used char-based Basic_string_view. See its doc header.
Fine_clock::duration Fine_duration
A high-res time duration as computed from two Fine_time_pts.
Definition: common.hpp:410
Fine_clock::time_point Fine_time_pt
A high-res time point as returned by Fine_clock::now() and suitable for precise time math in general.
Definition: common.hpp:407