30 size_t n_threads_or_zero,
31 bool est_hw_core_sharing_helps_algo,
32 bool est_hw_core_pinning_helps_algo,
33 bool hw_threads_is_grouping_collated) :
37 m_n_threads_or_zero(n_threads_or_zero),
38 m_est_hw_core_sharing_helps_algo(est_hw_core_sharing_helps_algo),
39 m_est_hw_core_pinning_helps_algo(est_hw_core_pinning_helps_algo),
40 m_hw_threads_is_grouping_collated(hw_threads_is_grouping_collated),
46 m_qing_threads((m_n_threads_or_zero == 0)
48 : m_n_threads_or_zero),
51 m_shared_task_engine(new util::
Task_engine(n_threads())),
53 m_per_thread_strands(logger_ptr, n_threads(),
59 m_shared_task_engine->stop();
62 FLOW_LOG_INFO(
"Cross_thread_task_loop [" <<
static_cast<const void*
>(
this) <<
"] "
63 "with nickname [" << m_nickname <<
"] "
64 "with cross-thread scheduling via single Task_engine across [" << n_threads() <<
"] threads-to-be: "
65 "Created; can accept work. Task_qing_thread(s) not started yet until start().");
73 FLOW_LOG_INFO(
"Cross_thread_task_loop [" <<
this <<
"]: Destroying object; will stop threads/tasks unless already "
86 using boost::unique_future;
87 using boost::movelib::unique_ptr;
97 FLOW_LOG_WARNING(
"Starting Cross_thread_task_loop [" <<
this <<
"]: Already started earlier. Ignoring.");
104 "Single Task_engine across [" << n <<
"] Task_qing_threads: Starting.");
119 vector<promise<void>> thread_init_done_promises(n);
120 for (
size_t idx = 0; idx != n; ++idx)
122 Task task_qing_thread_init_func;
123 if (!thread_init_func_or_empty.empty())
125 task_qing_thread_init_func = [idx, &thread_init_func_or_empty]()
127 thread_init_func_or_empty(idx);
132 assert(task_qing_thread_init_func.empty());
138 &(thread_init_done_promises[idx]),
139 std::move(task_qing_thread_init_func)));
143 FLOW_LOG_INFO(
"All threads are asynchronously starting. Awaiting their readiness barrier-style, in sequence.");
144 for (
size_t idx = 0; idx != n; ++idx)
146 thread_init_done_promises[idx].get_future().wait();
147 FLOW_LOG_INFO(
"Thread [" << idx <<
"] (0-based) of [" << n <<
"] (1-based) is ready.");
165 FLOW_LOG_INFO(
"Thread count was auto-determined. Further attempting thread-to-core scheduling optimization.");
167 vector<Thread*> worker_threads(n);
170 { return qing_thread_ptr->raw_worker_thread(); });
181 if (!init_task_or_empty.empty())
204 FLOW_LOG_INFO(
"Stopping Cross_thread_task_loop [" <<
this <<
"]: Already stopped earlier. Ignoring.");
210 "All ongoing thread tasks will complete normally; all pending thread tasks will be belayed [sic]; "
211 "each thread will be asked to gracefully terminate and be joined synchronously. "
212 "Any subsequently-queued tasks will not run until start().");
232 thread_ptr_in_container->stop();
243 thread_ptr_in_container.reset();
282 "About to post single-task sequence via boost.asio scheduler; details TRACE-logged below.");
288 asio_exec_ctx_post<Task_engine>
299 post(std::move(task));
306 "About to post a task in multi-task sequence "
307 "via previously created strand; details TRACE-logged below.");
315 asio_exec_ctx_post<Strand>
316 (
get_logger(), chosen_strand_ptr.get(), synchronicity, std::move(task));
330 "About to schedule single-task sequence via boost.asio timer+scheduler.");
336 "current processor logical core index [" <<
cpu_idx() <<
"].");
339 "current processor logical core index [" <<
cpu_idx() <<
"].");
357 using boost::asio::bind_executor;
374 "About to schedule single-task sequence via boost.asio timer+scheduler "
375 "via previously created strand [" << chosen_strand_ptr <<
"].");
381 "current processor logical core index [" <<
cpu_idx() <<
"].");
384 "current processor logical core index [" <<
cpu_idx() <<
"].");
388 bind_executor(*chosen_strand_ptr, task_plus_logs));
393 bind_executor(*chosen_strand_ptr, task));
422 using boost::any_cast;
425 auto const strand_ptr = any_cast<Strand_ptr>(op);
426 assert(&(strand_ptr->context()) == loop->task_engine().get());
The core flow::async interface, providing an optionally multi-threaded thread pool onto which runnabl...
const bool m_est_hw_core_pinning_helps_algo
See constructor.
util::Scheduled_task_handle schedule_from_now(const Fine_duration &from_now, Scheduled_task &&task) override
Implements superclass API.
~Cross_thread_task_loop() override
See superclass destructor.
Op create_op() override
Implements superclass API.
std::vector< Task_qing_thread_ptr > m_qing_threads
N task-execution-capable worker threads whose lifetimes equal those of *this, all sharing m_shared_ta...
Task_engine_ptr m_shared_task_engine
boost.asio Task_engine (a/k/a io_service) co-used by all m_qing_threads.
void start(Task &&init_task_or_empty=Task(), const Thread_init_func &thread_init_func_or_empty=Thread_init_func()) override
Implements superclass API.
size_t n_threads() const override
Implements superclass API.
boost::movelib::unique_ptr< Task_qing_thread > Task_qing_thread_ptr
Short-hand for smart pointer to Task_qing_thread.
Cross_thread_task_loop(log::Logger *logger_ptr, util::String_view nickname, size_t n_threads_or_zero, bool est_hw_core_sharing_helps_algo=false, bool est_hw_core_pinning_helps_algo=false, bool hw_threads_is_grouping_collated=false)
Constructs object, making it available for post() and similar work, but without starting any threads ...
const bool m_hw_threads_is_grouping_collated
See constructor.
const std::string m_nickname
See constructor.
const bool m_est_hw_core_sharing_helps_algo
See constructor.
void stop() override
Implements superclass API.
util::Scheduled_task_handle schedule_at(const Fine_time_pt &at, Scheduled_task &&task) override
Implements superclass API.
const size_t m_n_threads_or_zero
See constructor.
void post(Task &&task, Synchronicity synchronicity=Synchronicity::S_ASYNC) override
Implements superclass API.
Task_engine_ptr task_engine() override
See superclass API.
const Op_list m_per_thread_strands
See per_thread_ops(). The boost::any payload is always of the type async::Strand_ptr.
const Op_list & per_thread_ops() override
Implements superclass API.
Simple, immutable vector-like sequence of N opaque async::Op objects, usually corresponding to N work...
Internally used building block of various concrete Concurrent_task_loop subclasses that encapsulates ...
const Component & get_log_component() const
Returns reference to the stored Component object, particularly as many FLOW_LOG_*() macros expect.
Logger * get_logger() const
Returns the stored Logger pointer, particularly as many FLOW_LOG_*() macros expect.
Interface that the user should implement, passing the implementing Logger into logging classes (Flow'...
#define FLOW_LOG_INFO(ARG_stream_fragment)
Logs an INFO message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
#define FLOW_LOG_WARNING(ARG_stream_fragment)
Logs a WARNING message into flow::log::Logger *get_logger() with flow::log::Component get_log_compone...
#define FLOW_LOG_TRACE_WITHOUT_CHECKING(ARG_stream_fragment)
Logs a TRACE message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
#define FLOW_LOG_TRACE(ARG_stream_fragment)
Logs a TRACE message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
Flow module containing tools enabling multi-threaded event loops operating under the asynchronous-tas...
boost::any Op
An object of this opaque type represents a collection of 1 or more async::Task, past or future,...
void optimize_pinning_in_thread_pool(flow::log::Logger *logger_ptr, const std::vector< util::Thread * > &threads_in_pool, bool est_hw_core_sharing_helps_algo, bool est_hw_core_pinning_helps_algo, bool hw_threads_is_grouping_collated)
Assuming the same situation as documented for optimal_worker_thread_count_per_pool(),...
Synchronicity
Enumeration indicating the manner in which asio_exec_ctx_post(), and various boost....
@ S_ASYNC
Simply post the given task to execute asynchronously in some execution context – as soon as the conte...
@ S_ASYNC_AND_AWAIT_CONCURRENT_COMPLETION
Same as Synchronicity::S_ASYNC but the posting routine then waits as long as necessary for the given ...
boost::shared_ptr< util::Strand > Strand_ptr
Short-hand for ref-counted pointer to util::Strand.
uint16_t cpu_idx()
Returns the 0-based processor logical (not hardware) core index of the core executing the calling thr...
Strand_ptr op_to_exec_ctx< Strand_ptr >(Concurrent_task_loop *loop, const Op &op)
Template specialization for operation that obtains the underlying execution context,...
boost::shared_ptr< util::Task_engine > Task_engine_ptr
Short-hand for reference-counting pointer to a mutable util::Task_engine (a/k/a boost::asio::io_servi...
unsigned int optimal_worker_thread_count_per_pool(flow::log::Logger *logger_ptr, bool est_hw_core_sharing_helps_algo)
Assuming a planned thread pool will be receiving ~symmetrical load, and its UX-affecting (in particul...
@ S_TRACE
Message indicates any condition that may occur with great frequency (thus verbose if logged).
std::string ostream_op_string(T const &... ostream_args)
Equivalent to ostream_op_to_string() but returns a new string by value instead of writing to the call...
Scheduled_task_handle schedule_task_from_now(log::Logger *logger_ptr, const Fine_duration &from_now, bool single_threaded, Task_engine *task_engine, Scheduled_task_handler &&task_body_moved)
Schedule the given function to execute in a certain amount of time: A handy wrapper around Timer (asi...
boost::shared_ptr< Scheduled_task_handle_state > Scheduled_task_handle
Black-box type that represents a handle to a scheduled task as scheduled by schedule_task_at() or sch...
Task_engine::strand Strand
Short-hand for boost.asio strand, an ancillary class that works with Task_engine for advanced task sc...
boost::asio::io_service Task_engine
Short-hand for boost.asio event service, the central class of boost.asio.
Basic_string_view< char > String_view
Commonly used char-based Basic_string_view. See its doc header.
boost::thread Thread
Short-hand for standard thread class.
Flow_log_component
The flow::log::Component payload enumeration comprising various log components used by Flow's own int...
Fine_clock::duration Fine_duration
A high-res time duration as computed from two Fine_time_pts.
Fine_clock::time_point Fine_time_pt
A high-res time point as returned by Fine_clock::now() and suitable for precise time math in general.