Flow 1.0.2
Flow project: Full implementation reference.
x_thread_task_loop.cpp
Go to the documentation of this file.
1/* Flow
2 * Copyright 2023 Akamai Technologies, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the
5 * "License"); you may not use this file except in
6 * compliance with the License. You may obtain a copy
7 * of the License at
8 *
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in
12 * writing, software distributed under the License is
13 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
14 * CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing
16 * permissions and limitations under the License. */
17
18/// @file
21#include "flow/async/util.hpp"
23
24namespace flow::async
25{
26
27// Implementations.
28
30 size_t n_threads_or_zero,
31 bool est_hw_core_sharing_helps_algo,
32 bool est_hw_core_pinning_helps_algo,
33 bool hw_threads_is_grouping_collated) :
34 log::Log_context(logger_ptr, Flow_log_component::S_ASYNC),
35
36 m_nickname(nickname),
37 m_n_threads_or_zero(n_threads_or_zero),
38 m_est_hw_core_sharing_helps_algo(est_hw_core_sharing_helps_algo),
39 m_est_hw_core_pinning_helps_algo(est_hw_core_pinning_helps_algo),
40 m_hw_threads_is_grouping_collated(hw_threads_is_grouping_collated),
41
42 /* Note to reader: If you grok the doc headers of superclass and Cross_thread_task_loop, the following
43 * is easy to understand. If you don't, then to make it clear I'd have to repeat all of that here, basically.
44 * So please grok it first. */
45
46 m_qing_threads((m_n_threads_or_zero == 0) // n_threads() nulls: still need to set each one after this.
47 ? optimal_worker_thread_count_per_pool(logger_ptr, m_est_hw_core_sharing_helps_algo)
48 : m_n_threads_or_zero),
49 /* n_threads() is now accurate. Create the shared Task_engine capable of smartly scheduling across N threads.
50 * Attn: Give concurrency hint; 1 in particular may help avoid or eliminate locking inside boost.asio. */
51 m_shared_task_engine(new util::Task_engine(n_threads())),
52 // Forever initialize our Ops_list of pre-created Ops which in our case simply store long-lived Strands.
53 m_per_thread_strands(logger_ptr, n_threads(),
54 [this](size_t) -> Op
56{
57 /* Task_engine starts in !stopped() mode ready to run(). start() pre-condition is stopped() so for simplicity
58 * start in the same state that our stop() would put the Task_engine into: */
59 m_shared_task_engine->stop();
60 // Now our start() can always do the sequence: restart() (to make it !stopped()), then run().
61
62 FLOW_LOG_INFO("Cross_thread_task_loop [" << static_cast<const void*>(this) << "] "
63 "with nickname [" << m_nickname << "] "
64 "with cross-thread scheduling via single Task_engine across [" << n_threads() << "] threads-to-be: "
65 "Created; can accept work. Task_qing_thread(s) not started yet until start().");
66
67 /* *m_shared_task_engine can now be post()ed onto and otherwise used with boost.asio; won't do anything until we
68 * start threads and run() it. start() does that. We're done. */
69}
70
72{
73 FLOW_LOG_INFO("Cross_thread_task_loop [" << this << "]: Destroying object; will stop threads/tasks unless already "
74 "stopped earlier.");
75 stop();
76
77 /* m_qing_threads.clear() equivalent and m_shared_task_engine->~Task_engine() will now happen automatically.
78 * Both are mere cleanup of stuff in memory. */
79}
80
81void Cross_thread_task_loop::start(Task&& init_task_or_empty,
82 const Thread_init_func& thread_init_func_or_empty) // Virtual.
83{
84 using util::Thread;
85 using boost::promise;
86 using boost::unique_future;
87 using boost::movelib::unique_ptr;
88 using std::transform;
89 using std::vector;
90
91 /* Is the check thread-safe? Yes, since Concurrent_task_loop::stop() must not be called concurrently with itself or
92 * start() per our contract in doc header. */
93 if (!m_shared_task_engine->stopped())
94 {
95 /* Unlike stop() after stop(), start() after start() doesn't seem all that benign (whereas double stop() will often
96 * happen from dtor, say). So use WARNING, not INFO. */
97 FLOW_LOG_WARNING("Starting Cross_thread_task_loop [" << this << "]: Already started earlier. Ignoring.");
98 return;
99 }
100 // else
101
102 const size_t n = n_threads();
103 FLOW_LOG_INFO("Cross_thread_task_loop [" << this << "] with nickname [" << m_nickname << "]: "
104 "Single Task_engine across [" << n << "] Task_qing_threads: Starting.");
105
106 // boost.asio subtlety: While stopped(), run() will instantly return, unless one does this first.
107 m_shared_task_engine->restart();
108 // Now each Task_qing_thread can do ->run() as most of its thread body (and it won't just return).
109
110 /* Create/start the threads.
111 * Subtlety: If thread_init_func_or_empty is null, then we know Task_qing_thread ctor to put very quick-executing
112 * stuff onto the new thread, before it's officially ready to participate in post()ing. So it would be fine
113 * to just wait for each ctor to finish in series (in fact that's what we used to do). Otherwise, though,
114 * thread_init_func_or_empty() could be something lengthy... it's best to let the ctors launch the thread-init
115 * steps to happen asynchronously, each ctor returning potentially before its thread's init steps are finished.
116 * So that's why we use the Task_qing_thread ctor mode wherein we pass in our own `promise`s and then wait afterwards
117 * for them all to be satisfied, barrier-style. */
118
119 vector<promise<void>> thread_init_done_promises(n);
120 for (size_t idx = 0; idx != n; ++idx)
121 {
122 Task task_qing_thread_init_func;
123 if (!thread_init_func_or_empty.empty())
124 {
125 task_qing_thread_init_func = [idx, &thread_init_func_or_empty]()
126 {
127 thread_init_func_or_empty(idx);
128 };
129 }
130 else
131 {
132 assert(task_qing_thread_init_func.empty()); // Just leave it.
133 }
134
137 m_shared_task_engine, false, // A *shared* Task_engine.
138 &(thread_init_done_promises[idx]),
139 std::move(task_qing_thread_init_func)));
140 } // for (idx in [0, n))
141
142 // By barrier-style I mean that they all the waits must be done, before the loop exits.
143 FLOW_LOG_INFO("All threads are asynchronously starting. Awaiting their readiness barrier-style, in sequence.");
144 for (size_t idx = 0; idx != n; ++idx)
145 {
146 thread_init_done_promises[idx].get_future().wait();
147 FLOW_LOG_INFO("Thread [" << idx << "] (0-based) of [" << n << "] (1-based) is ready.");
148 }
149
150 /* Threads are running and ready for work. Note that if `m_n_threads_or_zero == 0`, then that enabled the
151 * auto-config of the thread arch based on the assumption of a processor/RAM-heavy thread pool that doesn't
152 * benefit from hyper-threading. This has already then had an effect by auto-determining `n`.
153 * Now we complete the functionality in this auto-config mode by performing whatever thread-core pinning is
154 * implied by the 3 constructor bool arguments, the use of which was enabled by `m_n_threads_or_zero == 0`.
155 *
156 * There is a sliiightly annoying corner case, which is that: not only are the threads *ready* for work, but
157 * they might already be executing work right now, namely if stuff has been post()ed (or other types of work
158 * posting) onto *m_shared_task_engine already. Yet we haven't yet done the pinning stuff! That's fine
159 * though. They've only been running for, like, microseconds; and it is certainly allowed to mess with their
160 * core affinities, even when they aren't idle. @todo It's technically possible to get rid of this minor
161 * sort-of-dirtiness... have Task_qing_thread wait for some signal (probably a promise/future) to actually
162 * kick off Task_engine::run()... that complexity does not seem in any way worth it at this time. Revisit maybe. */
163 if (m_n_threads_or_zero == 0)
164 {
165 FLOW_LOG_INFO("Thread count was auto-determined. Further attempting thread-to-core scheduling optimization.");
166
167 vector<Thread*> worker_threads(n); // Initialized to nulls. Now set them to the raw `Thread*`s.
168 transform(m_qing_threads.begin(), m_qing_threads.end(), worker_threads.begin(),
169 [](const Task_qing_thread_ptr& qing_thread_ptr) -> Thread*
170 { return qing_thread_ptr->raw_worker_thread(); });
171
175 // That logged details.
176 }
177
178 /* That's it; ready for work/working... but help out by running this optional init task: the key is this will wait
179 * until that task completes in a spawned thread. So we return only once that has returned.
180 * Do note that anything else queued while `*this` was stopped may well run before this guy (if any). */
181 if (!init_task_or_empty.empty())
182 {
183 post(std::move(init_task_or_empty), Synchronicity::S_ASYNC_AND_AWAIT_CONCURRENT_COMPLETION);
184 }
185
186 /* Sanity check: start() isn't thread-safe against stop() and start(), as we warned in our contract.
187 * But we've also promised that other API calls *are* thread-safe, including against
188 * stop(). To mentally test that <see similar comment in stop()>. TL;DR: post() and similar APIs work via
189 * *m_shared_task_engine safely, even as underlying thread(s) start or stop and m_shared_task_engine->run(). */
190} // Cross_thread_task_loop::start()
191
193{
194 /* This is probably unnecessary, since as of this writing all below statements are no-ops if stop() was called before.
195 * We still do it because:
196 * - It's cleaner: one less code path to have to reason about in terms of safety.
197 * - Less spurious/verbose logging (note Task_qing_thread would log in both its .stop() and dtor).
198 * - It's a tiny bit faster.
199 *
200 * Is it thread-safe? Yes, since Concurrent_task_loop::stop() must not be called concurrently with itself per our
201 * contract in doc header. */
202 if (m_shared_task_engine->stopped())
203 {
204 FLOW_LOG_INFO("Stopping Cross_thread_task_loop [" << this << "]: Already stopped earlier. Ignoring.");
205 return;
206 }
207 // else
208
209 FLOW_LOG_INFO("Cross_thread_task_loop [" << this << "] with nickname [" << m_nickname << "]: Stopping: "
210 "All ongoing thread tasks will complete normally; all pending thread tasks will be belayed [sic]; "
211 "each thread will be asked to gracefully terminate and be joined synchronously. "
212 "Any subsequently-queued tasks will not run until start().");
213
214 /* Kick off all the thread exits in parallel. To see how the following statement accomplishes this,
215 * consider what happens when Task_engine::stop() does its thing. boost.asio docs show
216 * that it lets any ongoing task(s) to complete normally/return. (That is even the case if it, and we, are called
217 * from within a task in the thread pool itself.) They further show any *more* tasks are prevented from running
218 * even if already queued up. Lastly, Task_engine::run() will exit, in any thread that invoked it.
219 *
220 * In our case, each thread's body is Task_engine::run(), meaning once that returns, the thread ends gracefully.
221 * So that's how the following kicks off all the thread exits. */
222 m_shared_task_engine->stop();
223 assert(m_shared_task_engine->stopped());
224 // Any pending tasks on *m_shared_task_engine now await another thread to ->run(). Then the queue will resume.
225
226 /* That has almost satisfied the contract in our doc header. All that's left is to return only once the thread-exit
227 * we've kicked off fully completes, a/k/a we must join those threads. Do so: */
228 for (Task_qing_thread_ptr& thread_ptr_in_container : m_qing_threads)
229 {
230 /* By its contract, this will simply wait for the threads to exit (and it's up to us to kick that off, which we
231 * did above). */
232 thread_ptr_in_container->stop();
233
234 /* Delete the Task_qing_thread. Sanity check: Is it safe? Yes, because in our case, nothing ever touches the
235 * thread objects m_qing_threads[] themselves after start() -- all posting/etc. is done indirectly through
236 * m_shared_task_engine, shared among all the threads; and that will keep working (but no-op'ing until the next
237 * start() if any).
238 *
239 * Lastly, why not m_qing_threads.clear() instead, which is equivalent to .reset() + clearing the container of
240 * pointers too, thus freeing a bit more RAM? Answer: n_threads() *does* access m_qing_threads.size() after stop();
241 * plus start() would assume the vector has the elements, albeit nulls; clear() would break that.
242 * The RAM savings via clear() are minor, so we haven't bothered making it more complicated. */
243 thread_ptr_in_container.reset();
244 }
245 // All threads joined as promised.
246
247 /* Sanity check: stop() isn't thread-safe against stop() and start(), as we warned in our contract.
248 * But we've also promised that other API calls *are* thread-safe, including against
249 * stop(). Let's mentally test that by analyzing the
250 * above statements against a hypothetical concurrent boost::asio::post(m_shared_task_engine, F) (and all our
251 * task-posting APIs reduce to boost::asio::post() ultimately).
252 * - Task_engine::stop() is thread-safe against boost::asio::post(<same Task_engine>, F) by boost.asio docs.
253 * - Task_qing_thread::stop() merely joins the thread (waits for Task_engine::run() to return as kicked off
254 * in previous bullet point). No prob: It's equally fine to boost::asio::post(E, F) before and after
255 * E.run() returns, as well as from within tasks invoked by E.run(), by boost.asio docs (and common sense).
256 * - ~Task_qing_thread() is safe, as m_qing_threads[] is never accessed at all after start() returns (outside of
257 * stop()), as discussed above.
258 *
259 * Mentally testing our thread-safety against other, non-posting, APIs is left as an exercise to the reader. */
260} // Cross_thread_task_loop::stop()
261
262size_t Cross_thread_task_loop::n_threads() const // Virtual.
263{
264 return m_qing_threads.size();
265}
266
268{
270}
271
273{
275}
276
277void Cross_thread_task_loop::post(Task&& task, Synchronicity synchronicity) // Virtual.
278{
279 using util::Task_engine;
280
281 FLOW_LOG_TRACE("Cross_thread_task_loop [" << this << "]: "
282 "About to post single-task sequence via boost.asio scheduler; details TRACE-logged below.");
283
284 /* No constraint vs. any other task: simply post to some random-ish thread in our pool-spanning shared Task_engine.
285 * Any details as to when to run it and whether/how long to wait for completion are forwarded to
286 * the following per contract. */
287
288 asio_exec_ctx_post<Task_engine>
289 (get_logger(), m_shared_task_engine.get(), synchronicity, std::move(task));
290} // Cross_thread_task_loop::post()
291
292void Cross_thread_task_loop::post(const Op& op, Task&& task, Synchronicity synchronicity) // Virtual.
293{
294 using util::Strand;
295
296 if (n_threads() == 1)
297 {
298 // Non-concurrency is always guaranteed with 1 thread. An optimization (though boost.asio likely does similar).
299 post(std::move(task));
300 return;
301 }
302 // else
303
304 auto const chosen_strand_ptr = op_to_exec_ctx<Strand_ptr>(this, op);
305 FLOW_LOG_TRACE("Cross_thread_task_loop [" << this << "]: "
306 "About to post a task in multi-task sequence "
307 "via previously created strand; details TRACE-logged below.");
308
309 /* This is much like the other post() but with the extra constraint: must not run concurrently with any other Task
310 * post()ed to the Strand pointed to by `op`'s payload. That is simply the core functionality of a Strand.
311 *
312 * Check out asio_exec_ctx_post() doc header, but TL;DR spoiler alert: It works equally well, because Strand has
313 * the same needed post() and dispatch() abilities as the Task_engine from which it is born. */
314
315 asio_exec_ctx_post<Strand>
316 (get_logger(), chosen_strand_ptr.get(), synchronicity, std::move(task));
317} // Cross_thread_task_loop::post()
318
320 Scheduled_task&& task) // Virtual.
321{
323
324 // See 1-arg post(). Keeping comments light.
325
326 auto const logger_ptr = get_logger();
327 if (logger_ptr && logger_ptr->should_log(log::Sev::S_TRACE, get_log_component()))
328 {
329 FLOW_LOG_TRACE_WITHOUT_CHECKING("Cross_thread_task_loop [" << this << "]: "
330 "About to schedule single-task sequence via boost.asio timer+scheduler.");
331
332 Scheduled_task task_plus_logs = [this, task = std::move(task)]
333 (bool short_fired)
334 {
335 FLOW_LOG_TRACE_WITHOUT_CHECKING("Scheduled task starting: "
336 "current processor logical core index [" << cpu_idx() << "].");
337 task(short_fired);
338 FLOW_LOG_TRACE_WITHOUT_CHECKING("Scheduled task ended: "
339 "current processor logical core index [" << cpu_idx() << "].");
340 };
341
342 return schedule_task_from_now(get_logger(), from_now, n_threads() == 1,
343 m_shared_task_engine.get(), std::move(task_plus_logs));
344 }
345 // else
346
347 return schedule_task_from_now(get_logger(), from_now, n_threads() == 1,
348 m_shared_task_engine.get(), std::move(task));
349}
350
352 const Fine_duration& from_now,
353 Scheduled_task&& task) // Virtual.
354{
355 using util::Strand;
357 using boost::asio::bind_executor;
358
359 // See 2-arg post(). Keeping comments light.
360
361 if (n_threads() == 1)
362 {
363 return schedule_from_now(from_now, std::move(task));
364 }
365 // else
366
367 assert(op.type() == typeid(Strand_ptr));
368 auto const chosen_strand_ptr = op_to_exec_ctx<Strand_ptr>(this, op);
369
370 auto const logger_ptr = get_logger();
371 if (logger_ptr && logger_ptr->should_log(log::Sev::S_TRACE, get_log_component()))
372 {
373 FLOW_LOG_TRACE_WITHOUT_CHECKING("Cross_thread_task_loop [" << this << "]: "
374 "About to schedule single-task sequence via boost.asio timer+scheduler "
375 "via previously created strand [" << chosen_strand_ptr << "].");
376
377 Scheduled_task task_plus_logs = [this, task = std::move(task)]
378 (bool short_fired)
379 {
380 FLOW_LOG_TRACE_WITHOUT_CHECKING("Scheduled task starting: "
381 "current processor logical core index [" << cpu_idx() << "].");
382 task(short_fired);
383 FLOW_LOG_TRACE_WITHOUT_CHECKING("Scheduled task ended: "
384 "current processor logical core index [" << cpu_idx() << "].");
385 };
386
387 return schedule_task_from_now(get_logger(), from_now, n_threads() == 1, m_shared_task_engine.get(),
388 bind_executor(*chosen_strand_ptr, task_plus_logs)); // <-- ATTN! Go through Strand.
389 }
390 // else
391
392 return schedule_task_from_now(get_logger(), from_now, n_threads() == 1, m_shared_task_engine.get(),
393 bind_executor(*chosen_strand_ptr, task)); // <-- ATTN! Go through Strand.
394}
395
397 Scheduled_task&& task) // Virtual.
398{
399 /* This is straightforward, clearly, but note that -- moreover -- there is no perf downside to doing this
400 * instead of copy-pasting schedule_from_now()'s body while using util::schedule_task_at() directly. That's
401 * according to the `### Performance ###` note in schedule_task_at()'s doc header. So there's no perf penalty for
402 * keeping this code trivial. */
403 return schedule_from_now(at - Fine_clock::now(), std::move(task));
404}
405
407 const Fine_time_pt& at,
408 Scheduled_task&& task) // Virtual.
409{
410 // Same comment as in other schedule_at().
411 return schedule_from_now(op, at - Fine_clock::now(), std::move(task));
412}
413
415{
417}
418
419template<>
421{
422 using boost::any_cast;
423
424 assert(op.type() == typeid(Strand_ptr));
425 auto const strand_ptr = any_cast<Strand_ptr>(op);
426 assert(&(strand_ptr->context()) == loop->task_engine().get());
427
428 return strand_ptr;
429}
430
431} // namespace flow::async
The core flow::async interface, providing an optionally multi-threaded thread pool onto which runnabl...
const bool m_est_hw_core_pinning_helps_algo
See constructor.
util::Scheduled_task_handle schedule_from_now(const Fine_duration &from_now, Scheduled_task &&task) override
Implements superclass API.
~Cross_thread_task_loop() override
See superclass destructor.
Op create_op() override
Implements superclass API.
std::vector< Task_qing_thread_ptr > m_qing_threads
N task-execution-capable worker threads whose lifetimes equal those of *this, all sharing m_shared_ta...
Task_engine_ptr m_shared_task_engine
boost.asio Task_engine (a/k/a io_service) co-used by all m_qing_threads.
void start(Task &&init_task_or_empty=Task(), const Thread_init_func &thread_init_func_or_empty=Thread_init_func()) override
Implements superclass API.
size_t n_threads() const override
Implements superclass API.
boost::movelib::unique_ptr< Task_qing_thread > Task_qing_thread_ptr
Short-hand for smart pointer to Task_qing_thread.
Cross_thread_task_loop(log::Logger *logger_ptr, util::String_view nickname, size_t n_threads_or_zero, bool est_hw_core_sharing_helps_algo=false, bool est_hw_core_pinning_helps_algo=false, bool hw_threads_is_grouping_collated=false)
Constructs object, making it available for post() and similar work, but without starting any threads ...
const bool m_hw_threads_is_grouping_collated
See constructor.
const std::string m_nickname
See constructor.
const bool m_est_hw_core_sharing_helps_algo
See constructor.
void stop() override
Implements superclass API.
util::Scheduled_task_handle schedule_at(const Fine_time_pt &at, Scheduled_task &&task) override
Implements superclass API.
const size_t m_n_threads_or_zero
See constructor.
void post(Task &&task, Synchronicity synchronicity=Synchronicity::S_ASYNC) override
Implements superclass API.
Task_engine_ptr task_engine() override
See superclass API.
const Op_list m_per_thread_strands
See per_thread_ops(). The boost::any payload is always of the type async::Strand_ptr.
const Op_list & per_thread_ops() override
Implements superclass API.
Simple, immutable vector-like sequence of N opaque async::Op objects, usually corresponding to N work...
Definition: op.hpp:58
Internally used building block of various concrete Concurrent_task_loop subclasses that encapsulates ...
const Component & get_log_component() const
Returns reference to the stored Component object, particularly as many FLOW_LOG_*() macros expect.
Definition: log.cpp:230
Logger * get_logger() const
Returns the stored Logger pointer, particularly as many FLOW_LOG_*() macros expect.
Definition: log.cpp:225
Interface that the user should implement, passing the implementing Logger into logging classes (Flow'...
Definition: log.hpp:1291
#define FLOW_LOG_INFO(ARG_stream_fragment)
Logs an INFO message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
Definition: log.hpp:197
#define FLOW_LOG_WARNING(ARG_stream_fragment)
Logs a WARNING message into flow::log::Logger *get_logger() with flow::log::Component get_log_compone...
Definition: log.hpp:152
#define FLOW_LOG_TRACE_WITHOUT_CHECKING(ARG_stream_fragment)
Logs a TRACE message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
Definition: log.hpp:354
#define FLOW_LOG_TRACE(ARG_stream_fragment)
Logs a TRACE message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
Definition: log.hpp:227
Flow module containing tools enabling multi-threaded event loops operating under the asynchronous-tas...
Definition: async_fwd.hpp:75
boost::any Op
An object of this opaque type represents a collection of 1 or more async::Task, past or future,...
Definition: async_fwd.hpp:153
void optimize_pinning_in_thread_pool(flow::log::Logger *logger_ptr, const std::vector< util::Thread * > &threads_in_pool, bool est_hw_core_sharing_helps_algo, bool est_hw_core_pinning_helps_algo, bool hw_threads_is_grouping_collated)
Assuming the same situation as documented for optimal_worker_thread_count_per_pool(),...
Synchronicity
Enumeration indicating the manner in which asio_exec_ctx_post(), and various boost....
Definition: async_fwd.hpp:223
@ S_ASYNC
Simply post the given task to execute asynchronously in some execution context – as soon as the conte...
@ S_ASYNC_AND_AWAIT_CONCURRENT_COMPLETION
Same as Synchronicity::S_ASYNC but the posting routine then waits as long as necessary for the given ...
boost::shared_ptr< util::Strand > Strand_ptr
Short-hand for ref-counted pointer to util::Strand.
Definition: async_fwd.hpp:212
uint16_t cpu_idx()
Returns the 0-based processor logical (not hardware) core index of the core executing the calling thr...
Definition: async.cpp:30
Strand_ptr op_to_exec_ctx< Strand_ptr >(Concurrent_task_loop *loop, const Op &op)
Template specialization for operation that obtains the underlying execution context,...
boost::shared_ptr< util::Task_engine > Task_engine_ptr
Short-hand for reference-counting pointer to a mutable util::Task_engine (a/k/a boost::asio::io_servi...
Definition: async_fwd.hpp:198
unsigned int optimal_worker_thread_count_per_pool(flow::log::Logger *logger_ptr, bool est_hw_core_sharing_helps_algo)
Assuming a planned thread pool will be receiving ~symmetrical load, and its UX-affecting (in particul...
@ S_TRACE
Message indicates any condition that may occur with great frequency (thus verbose if logged).
std::string ostream_op_string(T const &... ostream_args)
Equivalent to ostream_op_to_string() but returns a new string by value instead of writing to the call...
Definition: util.hpp:356
Scheduled_task_handle schedule_task_from_now(log::Logger *logger_ptr, const Fine_duration &from_now, bool single_threaded, Task_engine *task_engine, Scheduled_task_handler &&task_body_moved)
Schedule the given function to execute in a certain amount of time: A handy wrapper around Timer (asi...
Definition: sched_task.hpp:34
boost::shared_ptr< Scheduled_task_handle_state > Scheduled_task_handle
Black-box type that represents a handle to a scheduled task as scheduled by schedule_task_at() or sch...
Task_engine::strand Strand
Short-hand for boost.asio strand, an ancillary class that works with Task_engine for advanced task sc...
Definition: util_fwd.hpp:138
boost::asio::io_service Task_engine
Short-hand for boost.asio event service, the central class of boost.asio.
Definition: util_fwd.hpp:135
Basic_string_view< char > String_view
Commonly used char-based Basic_string_view. See its doc header.
boost::thread Thread
Short-hand for standard thread class.
Definition: util_fwd.hpp:78
Flow_log_component
The flow::log::Component payload enumeration comprising various log components used by Flow's own int...
Definition: common.hpp:633
Fine_clock::duration Fine_duration
A high-res time duration as computed from two Fine_time_pts.
Definition: common.hpp:411
Fine_clock::time_point Fine_time_pt
A high-res time point as returned by Fine_clock::now() and suitable for precise time math in general.
Definition: common.hpp:408