Flow 1.0.0
Flow project: Full implementation reference.
timed_concurrent_task_loop.hpp
Go to the documentation of this file.
1/* Flow
2 * Copyright 2023 Akamai Technologies, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the
5 * "License"); you may not use this file except in
6 * compliance with the License. You may obtain a copy
7 * of the License at
8 *
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in
12 * writing, software distributed under the License is
13 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
14 * CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing
16 * permissions and limitations under the License. */
17
18/// @file
19#pragma once
20
24#include <atomic>
25
26namespace flow::async
27{
28
29// Types.
30
31/**
32 * Decorator of a Concurrent_task_loop with same or greater lifetime that accumulates time elapsed in any tasks posted
33 * or scheduled onto that loop. While this class template is publicly available, most likely the user shall choose
34 * one of 2 related classes:
35 * - Timed_concurrent_task_loop: This is essentially an alias of `Timed_concurrent_task_loop_impl<T>`, where `T` is
36 * an `atomic` suitable for accumulating times from tasks executing across a multi-thread pool.
37 * Use this if you have a general Concurrent_task_loop (of whatever actual non-abstract type) which may
38 * at times use multiple threads. In other words use it if you want a Concurrent_task_loop... but timed.
39 * - Timed_single_thread_task_loop: This is a refinement (subclass) of Single_thread_task_loop.
40 * Internally it uses a `Timed_concurrent_task_loop<perf::duration_rep_t>`, since no `atomic` is needed with
41 * only 1 thread of execution. In other words use it if you want a Single_thread_task_loop... but timed.
42 *
43 * @see Timed_concurrent_task_loop doc header for instructions/synopsis of timing functionality.
44 *
45 * @tparam Time_accumulator
46 * As of this writing this should be either perf::duration_rep_t; or `atomic<perf::duration_rep_t>`,
47 * depending on whether the `Timed_*_loop` that uses this template instance is guaranteed to be
48 * single-threaded or potentially a multi-thread pool.
49 */
50template<typename Time_accumulator>
52{
53public:
54 // Types.
55
56 /// Short-hand for the exhanger function taken by the ctor.
57 using Exchanger_func = Function<perf::duration_rep_t (Time_accumulator*)>;
58
59 // Constructors/destructor.
60
61 /**
62 * Constructs a time-accumulating version of existing loop `*loop`. `*loop` must exist at least until
63 * `*this` is destroyed.
64 *
65 * @param clock_type
66 * See timed_function(). Use a perf::Clock_type that tracks thread time, not process time, typically.
67 * That said, in single-thread loops without competing concurrent work on the same machine,
68 * more precise non-per-thread clocks may be suitable (it is a trade-off).
69 * @param loop
70 * The loop object being decorated with timing functionality.
71 * @param exchanger_func_moved
72 * If `A` is a `Time_accumulator`, then `exchanger_func_moved()` must return
73 * the perf::duration_rep_t currently stored in `A` and -- as atomically as possible --
74 * store `0` in `A` before returning that. More specifically, if `Time_accumulator` is an `atomic`,
75 * then use `.exchange(0)`; if it's simply perf::duration_rep_t, then it can save the value, set it to 0,
76 * then return the saved thing (as concurrency is not a concern).
77 */
79 Exchanger_func&& exchanger_func_moved);
80
81 // Methods.
82
83 /**
84 * Implements superclass API.
85 * @param init_task_or_empty
86 * See superclass API.
87 * @param thread_init_func_or_empty
88 * See superclass API.
89 */
90 void start(Task&& init_task_or_empty = Task(),
91 const Thread_init_func& thread_init_func_or_empty = Thread_init_func()) override;
92
93 /// Implements superclass API.
94 void stop() override;
95
96 /**
97 * Implements superclass API.
98 * @return See superclass API.
99 */
100 size_t n_threads() const override;
101
102 /**
103 * Implements superclass API.
104 * @return See superclass API.
105 */
106 Op create_op() override;
107
108 /**
109 * Implements superclass API.
110 * @return See superclass API.
111 */
112 const Op_list& per_thread_ops() override;
113
114 /**
115 * Implements superclass API. In this implementation: `task` will execute as soon as a thread is available, because
116 * no other applicable work is forthcoming.
117 *
118 * @param task
119 * See superclass API.
120 * @param synchronicity
121 * See superclass API.
122 */
123 void post(Task&& task, Synchronicity synchronicity = Synchronicity::S_ASYNC) override;
124
125 /**
126 * Implements superclass API.
127 *
128 * In this implementation: `task` will execute as soon as a thread is available, because
129 * no other applicable work is forthcoming, plus one more constraint: It will not allow `task` to execute concurrently
130 * to any other `Task` also associated with `op`. The latter is accomplished on account of
131 * `Op` internally being associated with a util::Strand.
132 *
133 * @param op
134 * See superclass API.
135 * @param task
136 * See superclass API.
137 * @param synchronicity
138 * See superclass API.
139 */
140 void post(const Op& op, Task&& task, Synchronicity synchronicity = Synchronicity::S_ASYNC) override;
141
142 /**
143 * Implements superclass API.
144 * @param from_now
145 * See superclass API.
146 * @param task
147 * See superclass API.
148 * @return See superclass API.
149 */
151
152 /**
153 * Implements superclass API.
154 * @param at
155 * See superclass API.
156 * @param task
157 * See superclass API.
158 * @return See superclass API.
159 */
161
162 /**
163 * Implements superclass API.
164 * @param op
165 * See superclass API.
166 * @param from_now
167 * See superclass API.
168 * @param task
169 * See superclass API.
170 * @return See superclass API.
171 */
173 const Fine_duration& from_now, Scheduled_task&& task) override;
174
175 /**
176 * See superclass API.
177 * @param op
178 * See superclass API.
179 * @param at
180 * See superclass API.
181 * @param task
182 * See superclass API.
183 * @return See superclass API.
184 */
186 const Fine_time_pt& at, Scheduled_task&& task) override;
187
188 /**
189 * See superclass API.
190 * @return See superclass API.
191 */
193
194 /**
195 * Returns the accumulated time spent on tasks this thread pool handles since its last invocation; and reset the
196 * accumulated time.
197 *
198 * @see Constructor which controls the behavior of the timing.
199 *
200 * @return See above.
201 */
203
204 /**
205 * Given a boost.asio *completion handler* `handler` for a boost.asio `async_*()` action on some boost.asio I/O
206 * object to be initiated in the immediate near future, returns a wrapped handler with the same signature
207 * to be passed as the handler arg to that `async_*()` action, so that `handler()` will be properly timed.
208 *
209 * The mechanics of using this are explained in the class doc header. Using this in any other
210 * fashion leads to undefined behavior.
211 *
212 * @tparam Handler
213 * See asio_handler_via_op(): same here.
214 * @param handler_moved
215 * Completion handler for the boost.asio `async_*()` operation to be initiated soon.
216 * It may be `move`d and saved.
217 * @return A completion handler that will act as `handler()` but with the addition of timing as
218 * returned by accumulated_time().
219 */
220 template<typename Handler>
221 auto asio_handler_timed(Handler&& handler_moved);
222
223private:
224 // Data.
225
226 /// See constructor.
228
229 /// See constructor.
231
232 /// See constructor.
234
235 /// Accumulates time ticks, of clock type #m_clock_type, spent in tasks posted onto #m_loop.
236 Time_accumulator m_time_accumulator;
237}; // class Timed_concurrent_task_loop_impl
238
239/**
240 * Decorates a general -- potentially multi-threaded -- Concurrent_task_loop of any kind but with timing capabilities.
241 *
242 * Using one of these is identical to using the underlying Concurrent_task_loop; except that after constructing it
243 * it must be passed to this class's ctor. From that point on, `*this` is identical to that underlying loop object,
244 * except that all relevant handlers posted onto it will be timed. One can then access the result (and reset the
245 * accumulator to zero) simply by calling accumulated_time().
246 *
247 * The following tasks shall be timed, with results accessible via accumulated_time():
248 * - Anything posted via post().
249 * - Anything scheduled via `schedule_task_*()`.
250 * - Anything invoked directly through a boost.asio `async_*()` API, as long as:
251 * - either it is wrapped via asio_handler_via_op() (to execute through an async::Op);
252 * - or it is wrapped via asio_handler_timed() (otherwise).
253 *
254 * @warning Unfortunately, any "naked" handler passed to a boost.asio async procedure will not be timed;
255 * so don't forget to use a wrapper (presumably asio_handler_timed()).
256 * The key point: 3/4 posting techniques listed above are identical in use to general use of
257 * Concurrent_task_loop as explained in that class's doc header. The exception is the need to
258 * use asio_handler_timed() when dealing with "naked" boost.asio handlers (the last bullet point above).
259 */
260class Timed_concurrent_task_loop : public Timed_concurrent_task_loop_impl<std::atomic<perf::duration_rep_t>>
261{
262public:
263 // Constructors/destructor.
264
265 /**
266 * Constructs a time-accumulating version of existing loop `*loop`. `*loop` must exist at least until
267 * `*this` is destroyed.
268 *
269 * @param clock_type
270 * See Timed_concurrent_task_loop_impl ctor.
271 * @param loop
272 * See Timed_concurrent_task_loop_impl ctor.
273 */
276
277private:
278 // Types.
279
280 /// Convenience alias to our thread-safe tick accumulator type.
281 using Time_accumulator = std::atomic<perf::duration_rep_t>;
282
283 /// Convenience alias to our superclass.
285}; // class Timed_concurrent_task_loop
286
287// Template implementations.
288
289template<typename Time_accumulator>
291 (Concurrent_task_loop* loop, perf::Clock_type clock_type, Exchanger_func&& exchanger_func_moved) :
292 m_clock_type(clock_type),
293 m_loop(loop),
294 m_exhanger_func(std::move(exchanger_func_moved)),
295 m_time_accumulator(0)
296{
297 // That's it.
298}
299
300template<typename Time_accumulator>
302 (Task&& init_task_or_empty, const Thread_init_func& thread_init_func_or_empty)
303{
304 m_loop->start(std::move(init_task_or_empty), thread_init_func_or_empty);
305}
306
307template<typename Time_accumulator>
309{
310 m_loop->stop();
311}
312
313template<typename Time_accumulator>
315{
316 return m_loop->n_threads();
317}
318
319template<typename Time_accumulator>
321{
322 return m_loop->create_op();
323}
324
325template<typename Time_accumulator>
327{
328 return m_loop->per_thread_ops();
329}
330
331template<typename Time_accumulator>
333{
334 m_loop->post(perf::timed_function(m_clock_type, &m_time_accumulator, std::move(task)),
335 synchronicity);
336}
337
338template<typename Time_accumulator>
340{
341 m_loop->post(op,
342 perf::timed_function(m_clock_type, &m_time_accumulator, std::move(task)),
343 synchronicity);
344}
345
346template<typename Time_accumulator>
348 (const Fine_duration& from_now, Scheduled_task&& task)
349{
350 return m_loop->schedule_from_now(from_now,
351 perf::timed_function(m_clock_type, &m_time_accumulator, std::move(task)));
352}
353
354template<typename Time_accumulator>
356 (const Fine_time_pt& at, Scheduled_task&& task)
357{
358 return m_loop->schedule_at(at,
359 perf::timed_function(m_clock_type, &m_time_accumulator, std::move(task)));
360}
361
362template<typename Time_accumulator>
364 (const Op& op, const Fine_duration& from_now, Scheduled_task&& task)
365{
366 return m_loop->schedule_from_now(op, from_now,
367 perf::timed_function(m_clock_type, &m_time_accumulator, std::move(task)));
368}
369
370template<typename Time_accumulator>
372 (const Op& op, const Fine_time_pt& at, Scheduled_task&& task)
373{
374 return m_loop->schedule_at(op, at,
375 perf::timed_function(m_clock_type, &m_time_accumulator, std::move(task)));
376}
377
378template<typename Time_accumulator>
380{
381 return m_loop->task_engine();
382}
383
384template<typename Time_accumulator>
386{
387 return perf::Duration(m_exhanger_func(&m_time_accumulator));
388}
389
390template<typename Time_accumulator>
391template<typename Handler>
393{
394 return perf::timed_handler(m_clock_type, &m_time_accumulator, std::move(handler_moved));
395 // Note that timed_handler() specifically promises to not "lose" any executor (e.g., strand) bound to `handler_moved`.
396}
397
398} // namespace flow::async
The core flow::async interface, providing an optionally multi-threaded thread pool onto which runnabl...
Function< void(size_t thread_idx)> Thread_init_func
Short-hand for the thread-initializer-function optional arg type to start().
Simple, immutable vector-like sequence of N opaque async::Op objects, usually corresponding to N work...
Definition: op.hpp:58
Decorator of a Concurrent_task_loop with same or greater lifetime that accumulates time elapsed in an...
Concurrent_task_loop *const m_loop
See constructor.
const Exchanger_func m_exhanger_func
See constructor.
size_t n_threads() const override
Implements superclass API.
const flow::perf::Clock_type m_clock_type
See constructor.
util::Scheduled_task_handle schedule_at(const Fine_time_pt &at, Scheduled_task &&task) override
Implements superclass API.
void post(const Op &op, Task &&task, Synchronicity synchronicity=Synchronicity::S_ASYNC) override
Implements superclass API.
perf::Duration accumulated_time()
Returns the accumulated time spent on tasks this thread pool handles since its last invocation; and r...
Time_accumulator m_time_accumulator
Accumulates time ticks, of clock type m_clock_type, spent in tasks posted onto m_loop.
void start(Task &&init_task_or_empty=Task(), const Thread_init_func &thread_init_func_or_empty=Thread_init_func()) override
Implements superclass API.
util::Scheduled_task_handle schedule_from_now(const Fine_duration &from_now, Scheduled_task &&task) override
Implements superclass API.
util::Scheduled_task_handle schedule_at(const Op &op, const Fine_time_pt &at, Scheduled_task &&task) override
See superclass API.
Op create_op() override
Implements superclass API.
const Op_list & per_thread_ops() override
Implements superclass API.
auto asio_handler_timed(Handler &&handler_moved)
Given a boost.asio completion handler handler for a boost.asio async_*() action on some boost....
Task_engine_ptr task_engine() override
See superclass API.
void stop() override
Implements superclass API.
void post(Task &&task, Synchronicity synchronicity=Synchronicity::S_ASYNC) override
Implements superclass API.
util::Scheduled_task_handle schedule_from_now(const Op &op, const Fine_duration &from_now, Scheduled_task &&task) override
Implements superclass API.
Timed_concurrent_task_loop_impl(Concurrent_task_loop *loop, perf::Clock_type clock_type, Exchanger_func &&exchanger_func_moved)
Constructs a time-accumulating version of existing loop *loop.
Decorates a general – potentially multi-threaded – Concurrent_task_loop of any kind but with timing c...
Timed_concurrent_task_loop(Concurrent_task_loop *loop, perf::Clock_type clock_type=perf::Clock_type::S_CPU_THREAD_TOTAL_HI_RES)
Constructs a time-accumulating version of existing loop *loop.
std::atomic< perf::duration_rep_t > Time_accumulator
Convenience alias to our thread-safe tick accumulator type.
Flow module containing tools enabling multi-threaded event loops operating under the asynchronous-tas...
Definition: async_fwd.hpp:75
boost::any Op
An object of this opaque type represents a collection of 1 or more async::Task, past or future,...
Definition: async_fwd.hpp:153
Synchronicity
Enumeration indicating the manner in which asio_exec_ctx_post(), and various boost....
Definition: async_fwd.hpp:223
@ S_ASYNC
Simply post the given task to execute asynchronously in some execution context – as soon as the conte...
boost::shared_ptr< util::Task_engine > Task_engine_ptr
Short-hand for reference-counting pointer to a mutable util::Task_engine (a/k/a boost::asio::io_servi...
Definition: async_fwd.hpp:198
Function< void()> Task
Short-hand for a task that can be posted for execution by a Concurrent_task_loop or flow::util::Task_...
Definition: async_fwd.hpp:96
auto timed_function(Clock_type clock_type, Accumulator *accumulator, Func &&function)
Constructs a closure that times and executes void-returning function(), adding the elapsed time with ...
Fine_duration Duration
Short-hand for a high-precision boost.chrono duration, formally equivalent to flow::Fine_duration.
Duration::rep duration_rep_t
The raw type used in Duration to store its clock ticks.
auto timed_handler(Clock_type clock_type, Accumulator *accumulator, Handler &&handler)
Identical to timed_function() but suitable for boost.asio-targeted handler functions.
Clock_type
Clock types supported by flow::perf module facilities, perf::Checkpointing_timer in particular.
@ S_CPU_THREAD_TOTAL_HI_RES
Similar to S_CPU_TOTAL_HI_RES but applied to the calling thread as opposed to entire process.
boost::shared_ptr< Scheduled_task_handle_state > Scheduled_task_handle
Black-box type that represents a handle to a scheduled task as scheduled by schedule_task_at() or sch...
Fine_clock::duration Fine_duration
A high-res time duration as computed from two Fine_time_pts.
Definition: common.hpp:410
Fine_clock::time_point Fine_time_pt
A high-res time point as returned by Fine_clock::now() and suitable for precise time math in general.
Definition: common.hpp:407