Flow 1.0.1
Flow project: Full implementation reference.
segregated_thread_task_loop.cpp
Go to the documentation of this file.
1/* Flow
2 * Copyright 2023 Akamai Technologies, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the
5 * "License"); you may not use this file except in
6 * compliance with the License. You may obtain a copy
7 * of the License at
8 *
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in
12 * writing, software distributed under the License is
13 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
14 * CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing
16 * permissions and limitations under the License. */
17
18/// @file
21#include "flow/async/util.hpp"
23
24namespace flow::async
25{
26
27// Implementations.
28
30 size_t n_threads_or_zero,
31 bool est_hw_core_sharing_helps_algo,
32 bool est_hw_core_pinning_helps_algo,
33 bool hw_threads_is_grouping_collated) :
34 log::Log_context(logger_ptr, Flow_log_component::S_ASYNC),
35
36 m_nickname(nickname),
37 m_n_threads_or_zero(n_threads_or_zero),
38 m_est_hw_core_sharing_helps_algo(est_hw_core_sharing_helps_algo),
39 m_est_hw_core_pinning_helps_algo(est_hw_core_pinning_helps_algo),
40 m_hw_threads_is_grouping_collated(hw_threads_is_grouping_collated),
41
42 /* Note to reader: If you grok the doc headers of superclass and Segregated_thread_task_loop, the following
43 * is easy to understand. If you don't, then to make it clear I'd have to repeat all of that here, basically.
44 * So please grok it first. */
45
46 m_qing_threads((m_n_threads_or_zero == 0) // n_threads() nulls: still need to set each one after this.
47 ? optimal_worker_thread_count_per_pool(logger_ptr, m_est_hw_core_sharing_helps_algo)
48 : m_n_threads_or_zero),
49 // n_threads() is now accurate.
50 m_task_engines(n_threads()) // n_threads() nulls: still need to set each one after this.
51{
53
54 /* m_per_thread_task_engines will store the `n` Task_engines. These will be cted now and destroyed in dtor;
55 * they will survive any stop()s or start()s (which stop and start all the threads in thread pool respectively).
56 * For each Task_engine E, post(E, F) (etc.) is safe while E.stopped() is false, or true, or even across that value
57 * changing, by boost.asio docs guaranteeing this tread safety. */
58 for (Task_engine_ptr& task_engine_ptr_in_container : m_task_engines)
59 {
60 // Attn: The concurrency-hint=1 may avoid or all most locking in boost.asio. Exactly 1 thread in the Task_engine.
61 task_engine_ptr_in_container.reset(new Task_engine(1));
62
63 /* Task_engine starts in !stopped() mode ready to run(). start() pre-condition is stopped() so for simplicity
64 * start in the same state that our stop() would put the Task_engine into: */
65 task_engine_ptr_in_container->stop();
66 // Now our start() can always do the sequence: restart() (to make it !stopped()), then run().
67 }
68
69 // Initialize our Ops_list of pre-created Ops which in our case simply store all `n` `Task_engine_ptr`s.
70 const size_t n = n_threads();
71 m_per_thread_ops.reset(new Op_list(get_logger(), n,
72 [this](size_t idx) -> Op
73 { return Op(static_cast<Task_engine_ptr>(m_task_engines[idx])); }));
74 /* (The static_cast<> is probably unnecessary but makes the compiler check our type logic for us. That's quite
75 * helpful in this rare situation where we're essentially using a dynamically typed variable in C++ [boost::any].
76 * There is 0 perf cost to it by the way.) */
77
78 FLOW_LOG_INFO("Segregated_thread_task_loop [" << static_cast<const void*>(this) << "] "
79 "with nickname [" << m_nickname << "] "
80 "with segregated-thread scheduling via individual Task_engines "
81 "across [" << n << "] threads-to-be: "
82 "Created; can accept work. Task_qing_thread(s) not started yet until start().");
83
84 /* Each Task_engine can now be post()ed onto and otherwise used with boost.asio; won't do anything until we
85 * start threads and run() it. start() does that. We're done. */
86} // Segregated_thread_task_loop::Segregated_thread_task_loop()
87
89{
90 FLOW_LOG_INFO("Segregated_thread_task_loop [" << this << "]: Destroying object; will stop threads/tasks unless "
91 "already stopped earlier.");
92 stop();
93
94 /* m_qing_threads.clear(), m_task_engines.clear() equivalents will now happen automatically. That is mere cleanup of
95 * stuff in memory that may include the destruction of (already stop()ped!) Task_engines. */
96}
97
98void Segregated_thread_task_loop::start(Task&& init_task_or_empty,
99 const Thread_init_func& thread_init_func_or_empty) // Virtual.
100{
101 using util::Thread;
102 using util::Task_engine;
103 using boost::promise;
104 using std::transform;
105 using std::vector;
106
107 /* Is the check thread-safe? Yes, since Concurrent_task_loop::stop() must not be called concurrently with itself or
108 * start() per our contract in doc header. */
109 if (!m_task_engines.front()->stopped()) // They're all started/stopped together, so check random guy #1.
110 {
111 FLOW_LOG_INFO("Starting Segregated_thread_task_loop [" << this << "]: Already started earlier. Ignoring.");
112 return;
113 }
114 // else
115
116 const size_t n = n_threads();
117 FLOW_LOG_INFO("Segregated_thread_task_loop [" << this << "] with nickname [" << m_nickname << "]: "
118 "1-to-1 Task_engines across [" << n << "] Task_qing_threads: Starting.");
119
120 /* Create/start the threads.
121 * See same logic in Cross_thread_task_loop, including comments. It's too brief to worry about code reuse here.
122 * Note, though, that we could just this->post(op, thread_init_func..., S_ASYNC_AND_AWAIT_CONCURRENT_COMPLETION)
123 * for each `op` in per_thread_ops(). We promised to run thread_init_func...() *first-thing* in its thread,
124 * though, so let's keep to the letter of our contract. Also, this way we can do it in parallel instead of
125 * serially. */
126
127 vector<promise<void>> thread_init_done_promises(n);
128 for (size_t idx = 0; idx != n; ++idx)
129 {
130 Task task_qing_thread_init_func;
131 if (!thread_init_func_or_empty.empty())
132 {
133 task_qing_thread_init_func = [idx, &thread_init_func_or_empty]()
134 {
135 thread_init_func_or_empty(idx);
136 };
137 }
138 else
139 {
140 assert(task_qing_thread_init_func.empty()); // Just leave it.
141 }
142
144
145 // boost.asio subtlety: While stopped(), run() will instantly return, unless one does this first.
146 task_engine->restart();
147 // Now its Task_qing_thread can do ->run() as most of its thread body (and it won't just return).
148
149 // Create/start the thread.
152 task_engine, true, // Its *own* 1-1 Task_engine.
153 &(thread_init_done_promises[idx]),
154 std::move(task_qing_thread_init_func)));
155 } // for (idx in [0, n))
156 FLOW_LOG_INFO("All threads are asynchronously starting. Awaiting their readiness barrier-style, in sequence.");
157 for (size_t idx = 0; idx != n; ++idx)
158 {
159 thread_init_done_promises[idx].get_future().wait();
160 FLOW_LOG_INFO("Thread [" << idx << "] (0-based) of [" << n << "] (1-based) is ready.");
161 }
162
163 // Threads are running and ready for work.
164
165 // See same logic in Cross_thread_task_loop, including comments. It's too brief to worry about code reuse here.
166 if (m_n_threads_or_zero == 0)
167 {
168 FLOW_LOG_INFO("Thread count was auto-determined. Further attempting thread-to-core scheduling optimization.");
169
170 vector<Thread*> worker_threads(n); // Initialized to nulls. Now set them to the raw `Thread*`s.
171 transform(m_qing_threads.begin(), m_qing_threads.end(), worker_threads.begin(),
172 [](const Task_qing_thread_ptr& qing_thread_ptr) -> Thread*
173 { return qing_thread_ptr->raw_worker_thread(); });
177 }
178
179 /* That's it; ready for work... but help out by running this optional init task: the key is this will wait
180 * until that task completes in a spawned thread. So we return only once that has returned. */
181 if (!init_task_or_empty.empty())
182 {
183 post(std::move(init_task_or_empty), Synchronicity::S_ASYNC_AND_AWAIT_CONCURRENT_COMPLETION);
184 }
185
186 /* Sanity check: start() isn't thread-safe against stop() and start(), as we warned in our contract.
187 * But we've also promised that other API calls *are* thread-safe, including against
188 * stop(). To mentally test that <see similar comment in stop()>. TL;DR: post() and similar APIs work via
189 * m_task_engines safely, even as underlying thread(s) start or stop and m_task_engines[]->run(). */
190} // Segregated_thread_task_loop::start()
191
193{
194 // This is probably unnecessary, but... <see similar comment in Cross_thread_task_loop::stop()>.
195 if (m_task_engines.front()->stopped()) // They're all started/stopped together, so check random guy #1.
196 {
197 FLOW_LOG_INFO("Stopping Segregated_thread_task_loop [" << this << "]: Already stopped earlier. Ignoring.");
198 return;
199 }
200 // else
201
202 FLOW_LOG_INFO("Stopping Segregated_thread_task_loop [" << this << "]: All ongoing tasks will complete normally; "
203 "all pending thread tasks will be belayed [sic]; "
204 "each thread will be asked to gracefully terminate and be joined synchronously. "
205 "Any subsequently-queued tasks will not run until start().");
206
207#ifndef NDEBUG
208 size_t idx = 0;
209#endif
210 for (Task_qing_thread_ptr& thread_ptr_in_container : m_qing_threads)
211 {
212 /* Consider E = thread_ptr_in_container->task_engine(). The following statement has the following effects,
213 * in order:
214 * - Any currently executing post()ed task on E completes normally (presumably quickly, if well-behaved).
215 * (There may well be no such task running at this time. Thread might be idle.)
216 * - Any subsequent queued tasks on E are prevented from running, even if they otherwise would have run
217 * after the aforementioned final task (if any). (They stay queued on )
218 * - E->run() therefore returns, so `*thread` exits.
219 * - `*thread_ptr_in_container`'s thread is joined, meaning the following blocks until that thread exits
220 * gracefully (i.e., all of the above completes, particularly E->run() returns).
221 * - `*thread_ptr_in_container` is destroyed (dtor runs). This decrements ref-count of the E shared_ptr<>,
222 * but we have E in m_task_engines[] always, so it continues to live.
223 *
224 * *E continues to exist for now. In particular, if a not-yet-stopped thread's
225 * currently executing task (if any) tries to post(*E, F), that call will safely execute but do
226 * nothing (F() will not run, because `E->stopped() == true` now).
227 *
228 * @warning Without m_task_engines storing the shared_ptr<> E also, the above would not be safe, because:
229 * A race could occur wherein Task_engine E1 is destroyed, while not-yet-stopped Task_engine E2 tries to
230 * post(*E2, F) (crash/undefined behavior). In that case we'd need to first do
231 * a round of ->stop()s; and only then auto-destruct m_qing_threads[] and their underlying E's. */
232 assert(thread_ptr_in_container->task_engine() == m_task_engines[idx]);
233 thread_ptr_in_container.reset();
234 assert(m_task_engines[idx]->stopped());
235 // Any pending tasks on that Task_engine now await another thread to ->run(). Then the queue will resume.
236
237#ifndef NDEBUG
238 ++idx;
239#endif
240 } // for (thread_ptr_in_container : m_qing_threads)
241
242 /* Now every thread in the thread pool has exited. Therefore by definition no post()ed tasks on `*this` will
243 * execute until start(), and none is executing now. In particular, they won't try to, themselves, post() more
244 * work onto `*this`, because "they" cannot actually run until start().
245 *
246 * As promised in our contract, however, user can still post() (and other async work) from outside the tread pool;
247 * it will work, but (again) no task/completion handler will actually execute. There are no threads left in which it
248 * would.
249 *
250 * Don't nullify each m_task_engines[]; post() (and others) will thus (haplessly but safely) post tasks onto
251 * m_task_engines[], as we promised in our contract. */
252
253 /* Sanity check: stop() isn't thread-safe against stop(), as we warned in our contract. But we've also promised
254 * that other API calls *are* thread-safe, including against stop(). For brevity I leave that proof as an exercise
255 * to reader. That said see Cross_thread_task_loop::stop() similarly placed comment which justifies its
256 * somewhat-more-complex situation. Having proven that, the same logic applies here, except we don't even have the
257 * complication of a shared Task_engine among threads. Check the logic inside Task_qing_thread::stop(). */
258} // Segregated_thread_task_loop::stop()
259
261{
262 return m_qing_threads.size();
263}
264
266{
267 return per_thread_ops().random_op(); // It makes a copy (which is just a Task_engine_ptr copy ultimately).
268}
269
271{
272 return *m_per_thread_ops; // It's fine until destructor runs.
273}
274
275void Segregated_thread_task_loop::post(Task&& task, Synchronicity synchronicity) // Virtual.
276{
277 /* Each thread has its own single-run()-executing Task_engine, so it's left to us to choose a random one.
278 * Theoretically this is less efficient than in cross-thread sibling which might choose a thread that is more
279 * free to do work rather than just being entirely random about the decision. @todo It's conceivable some
280 * querying as to each thread's state or load could be used to make a more informed choice. It might also be
281 * wortwhile to make that choice here in post() instead of at create_op() time. Class doc header as of this writing
282 * includes these formal to-dos. See also task_engine() where this to-do applies. */
283
284 auto const chosen_thread_idx = per_thread_ops().random_idx();
285 auto const chosen_task_engine = m_task_engines[chosen_thread_idx];
286 // Could use random_op() for slightly shorter code, but the above is potentially a bit faster, bypassing boost.any.
287
288 FLOW_LOG_TRACE("Segregated_thread_task_loop [" << this << "]: About to post single-task sequence "
289 "on randomly selected thread [" << chosen_thread_idx << "]: "
290 "Task_engine [" << chosen_task_engine << "].");
291
292 post_impl(chosen_task_engine, synchronicity, std::move(task));
293}
294
295void Segregated_thread_task_loop::post(const Op& op, Task&& task, Synchronicity synchronicity) // Virtual.
296{
297 using util::Task_engine;
298
299 /* Since both create_op() and per_thread_ops()[i] just return an Op containing a pointer
300 * to one of n_threads() threads, we simply execute on that thread, thus guaranteeing all other post()ed
301 * tasks by definition of "thread" cannot run concurrently with this `task`. */
302
303 auto const chosen_task_engine = op_to_exec_ctx<Task_engine_ptr>(this, op);
304
305 FLOW_LOG_TRACE("Segregated_thread_task_loop [" << this << "]: About to post a task in multi-task sequence "
306 "on previously chosen thread: "
307 "Task_engine [" << chosen_task_engine << "].");
308
309 post_impl(chosen_task_engine, synchronicity, std::move(task));
310}
311
313 Scheduled_task&& task) // Virtual.
314{
315 // See 1-arg post(). Keeping comments light.
316
317 auto const chosen_thread_idx = per_thread_ops().random_idx();
318 auto const chosen_task_engine = m_task_engines[chosen_thread_idx];
319
320 FLOW_LOG_TRACE("Segregated_thread_task_loop [" << this << "]: "
321 "About to boost.asio-timer-schedule single-task sequence "
322 "on randomly selected thread [" << chosen_thread_idx << "]: "
323 "Task_engine [" << chosen_task_engine << "].");
324
325 return schedule_from_now_impl(chosen_task_engine, from_now, std::move(task));
326}
327
329 const Fine_duration& from_now,
330 Scheduled_task&& task) // Virtual.
331{
332 using util::Task_engine;
333
334 // See 2-arg post(). Keeping comments light.
335
336 auto const chosen_task_engine = op_to_exec_ctx<Task_engine_ptr>(this, op);
337
338 FLOW_LOG_TRACE("Segregated_thread_task_loop [" << this << "]: "
339 "About to boost.asio-timer-schedule a task in multi-task sequence on previously chosen thread: "
340 "Task_engine [" << chosen_task_engine << "].");
341
342 return schedule_from_now_impl(chosen_task_engine, from_now, std::move(task));
343}
344
346 Scheduled_task&& task) // Virtual.
347{
348 // Similar comment to Cross_thread_task_loop::schedule_at().
349 return schedule_from_now(at - Fine_clock::now(), std::move(task));
350}
351
353 const Fine_time_pt& at,
354 Scheduled_task&& task) // Virtual.
355{
356 // Same comment as in other schedule_at().
357 return schedule_from_now(op, at - Fine_clock::now(), std::move(task));
358}
359
361 Synchronicity synchronicity, Task&& task)
362{
363 using util::Task_engine;
364
365 /* The "hard" part was choosing chosen_qing_thread; now we can post to that thread's segregated Task_engine.
366 * Any details as to when to run it and whether/how long to wait for completion are forwarded to
367 * the following per contract. */
368
369 asio_exec_ctx_post<Task_engine>
370 (get_logger(), chosen_task_engine.get(), synchronicity, std::move(task));
371}
372
375 const Fine_duration& from_now,
376 Scheduled_task&& task)
377{
379
380 // See post_impl(). Keeping comments light.
381
382 auto const logger_ptr = get_logger();
383 if (logger_ptr && logger_ptr->should_log(log::Sev::S_TRACE, get_log_component()))
384 {
385 Scheduled_task task_plus_logs = [this, task = std::move(task)]
386 (bool short_fired)
387 {
388 FLOW_LOG_TRACE_WITHOUT_CHECKING("Scheduled task starting: "
389 "current processor logical core index [" << cpu_idx() << "].");
390 task(short_fired);
391 FLOW_LOG_TRACE_WITHOUT_CHECKING("Scheduled task ended: "
392 "current processor logical core index [" << cpu_idx() << "].");
393 };
394
395 return schedule_task_from_now(get_logger(), from_now, n_threads() == 1,
396 chosen_task_engine.get(), std::move(task_plus_logs));
397 }
398 // else
399
400 return schedule_task_from_now(get_logger(), from_now, n_threads() == 1,
401 chosen_task_engine.get(), std::move(task));
402}
403
405{
406 // See 1-arg post(). Keeping comments light. Note the @todo about load-balancing applies here as well.
407
408 auto const chosen_thread_idx = per_thread_ops().random_idx();
409 auto const chosen_task_engine = m_task_engines[chosen_thread_idx];
410
411 FLOW_LOG_TRACE("Segregated_thread_task_loop [" << this << "]: About to return boost.asio Task_engine "
412 "assigned to randomly selected thread [" << chosen_thread_idx << "]: "
413 "Task_engine [" << chosen_task_engine << "].");
414
415 return chosen_task_engine;
416}
417
418template<>
420{
421 using boost::any_cast;
422
423 assert(op.type() == typeid(Task_engine_ptr));
424 // @todo It'd be also nice to assert(<this Task_engine_ptr is one of m_task_engines[]>).
425
426 return any_cast<Task_engine_ptr>(op);
427}
428
429} // namespace flow::async
The core flow::async interface, providing an optionally multi-threaded thread pool onto which runnabl...
Simple, immutable vector-like sequence of N opaque async::Op objects, usually corresponding to N work...
Definition: op.hpp:58
const Op & random_op(size_t *chosen_idx=0) const
Returns (*this)[R], where we randomly select R as if by random_idx() and communicate it to the caller...
Definition: op.cpp:51
size_t random_idx() const
Returns a randomly selected index from range [O, size()).
Definition: op.cpp:65
std::vector< Task_qing_thread_ptr > m_qing_threads
N task-execution-capable worker threads whose lifetimes equal those of *this, each with its own util:...
Segregated_thread_task_loop(log::Logger *logger_ptr, util::String_view nickname, size_t n_threads_or_zero, bool est_hw_core_sharing_helps_algo=false, bool est_hw_core_pinning_helps_algo=false, bool hw_threads_is_grouping_collated=false)
Constructs object, making it available for post() and similar work, but without starting any threads ...
size_t n_threads() const override
Implements superclass API.
boost::movelib::unique_ptr< const Op_list > m_per_thread_ops
See per_thread_ops().
const Op_list & per_thread_ops() override
Implements superclass API.
util::Scheduled_task_handle schedule_from_now(const Fine_duration &from_now, Scheduled_task &&task) override
Implements superclass API.
void start(Task &&init_task_or_empty=Task(), const Thread_init_func &thread_init_func_or_empty=Thread_init_func()) override
Implements superclass API.
Task_engine_ptr task_engine() override
See superclass API.
void post_impl(const Task_engine_ptr &chosen_task_engine, Synchronicity synchronicity, Task &&task)
Helper performing the core Task_engine::post() (or similar) call on behalf of the various post() over...
Op create_op() override
Implements superclass API.
void stop() override
Implements superclass API.
util::Scheduled_task_handle schedule_from_now_impl(const Task_engine_ptr &chosen_task_engine, const Fine_duration &from_now, Scheduled_task &&task)
Helper performing the core util::schedule_task_from_now() call on behalf of the various schedule_from...
boost::movelib::unique_ptr< Task_qing_thread > Task_qing_thread_ptr
Short-hand for smart pointer to Task_qing_thread.
void post(Task &&task, Synchronicity synchronicity=Synchronicity::S_ASYNC) override
Implements superclass API.
~Segregated_thread_task_loop() override
See superclass destructor.
std::vector< Task_engine_ptr > m_task_engines
boost.asio Task_engines (a/k/a io_services) used by each respective element in m_qing_threads.
util::Scheduled_task_handle schedule_at(const Fine_time_pt &at, Scheduled_task &&task) override
Implements superclass API.
Internally used building block of various concrete Concurrent_task_loop subclasses that encapsulates ...
const Component & get_log_component() const
Returns reference to the stored Component object, particularly as many FLOW_LOG_*() macros expect.
Definition: log.cpp:229
Logger * get_logger() const
Returns the stored Logger pointer, particularly as many FLOW_LOG_*() macros expect.
Definition: log.cpp:224
Interface that the user should implement, passing the implementing Logger into logging classes (Flow'...
Definition: log.hpp:1291
#define FLOW_LOG_INFO(ARG_stream_fragment)
Logs an INFO message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
Definition: log.hpp:197
#define FLOW_LOG_TRACE_WITHOUT_CHECKING(ARG_stream_fragment)
Logs a TRACE message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
Definition: log.hpp:354
#define FLOW_LOG_TRACE(ARG_stream_fragment)
Logs a TRACE message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
Definition: log.hpp:227
Flow module containing tools enabling multi-threaded event loops operating under the asynchronous-tas...
Definition: async_fwd.hpp:75
boost::any Op
An object of this opaque type represents a collection of 1 or more async::Task, past or future,...
Definition: async_fwd.hpp:153
void optimize_pinning_in_thread_pool(flow::log::Logger *logger_ptr, const std::vector< util::Thread * > &threads_in_pool, bool est_hw_core_sharing_helps_algo, bool est_hw_core_pinning_helps_algo, bool hw_threads_is_grouping_collated)
Assuming the same situation as documented for optimal_worker_thread_count_per_pool(),...
Synchronicity
Enumeration indicating the manner in which asio_exec_ctx_post(), and various boost....
Definition: async_fwd.hpp:223
@ S_ASYNC
Simply post the given task to execute asynchronously in some execution context – as soon as the conte...
@ S_ASYNC_AND_AWAIT_CONCURRENT_COMPLETION
Same as Synchronicity::S_ASYNC but the posting routine then waits as long as necessary for the given ...
uint16_t cpu_idx()
Returns the 0-based processor logical (not hardware) core index of the core executing the calling thr...
Definition: async.cpp:30
boost::shared_ptr< util::Task_engine > Task_engine_ptr
Short-hand for reference-counting pointer to a mutable util::Task_engine (a/k/a boost::asio::io_servi...
Definition: async_fwd.hpp:198
unsigned int optimal_worker_thread_count_per_pool(flow::log::Logger *logger_ptr, bool est_hw_core_sharing_helps_algo)
Assuming a planned thread pool will be receiving ~symmetrical load, and its UX-affecting (in particul...
Task_engine_ptr op_to_exec_ctx< Task_engine_ptr >(Concurrent_task_loop *loop, const Op &op)
Template specialization for operation that obtains the underlying execution context,...
@ S_TRACE
Message indicates any condition that may occur with great frequency (thus verbose if logged).
std::string ostream_op_string(T const &... ostream_args)
Equivalent to ostream_op_to_string() but returns a new string by value instead of writing to the call...
Definition: util.hpp:356
Scheduled_task_handle schedule_task_from_now(log::Logger *logger_ptr, const Fine_duration &from_now, bool single_threaded, Task_engine *task_engine, Scheduled_task_handler &&task_body_moved)
Schedule the given function to execute in a certain amount of time: A handy wrapper around Timer (asi...
Definition: sched_task.hpp:34
boost::shared_ptr< Scheduled_task_handle_state > Scheduled_task_handle
Black-box type that represents a handle to a scheduled task as scheduled by schedule_task_at() or sch...
boost::asio::io_service Task_engine
Short-hand for boost.asio event service, the central class of boost.asio.
Definition: util_fwd.hpp:135
Basic_string_view< char > String_view
Commonly used char-based Basic_string_view. See its doc header.
boost::thread Thread
Short-hand for standard thread class.
Definition: util_fwd.hpp:78
Flow_log_component
The flow::log::Component payload enumeration comprising various log components used by Flow's own int...
Definition: common.hpp:632
Fine_clock::duration Fine_duration
A high-res time duration as computed from two Fine_time_pts.
Definition: common.hpp:410
Fine_clock::time_point Fine_time_pt
A high-res time point as returned by Fine_clock::now() and suitable for precise time math in general.
Definition: common.hpp:407