30 size_t n_threads_or_zero,
31 bool est_hw_core_sharing_helps_algo,
32 bool est_hw_core_pinning_helps_algo,
33 bool hw_threads_is_grouping_collated) :
37 m_n_threads_or_zero(n_threads_or_zero),
38 m_est_hw_core_sharing_helps_algo(est_hw_core_sharing_helps_algo),
39 m_est_hw_core_pinning_helps_algo(est_hw_core_pinning_helps_algo),
40 m_hw_threads_is_grouping_collated(hw_threads_is_grouping_collated),
46 m_qing_threads((m_n_threads_or_zero == 0)
48 : m_n_threads_or_zero),
50 m_task_engines(n_threads())
61 task_engine_ptr_in_container.reset(
new Task_engine(1));
65 task_engine_ptr_in_container->stop();
72 [
this](
size_t idx) ->
Op
78 FLOW_LOG_INFO(
"Segregated_thread_task_loop [" <<
static_cast<const void*
>(
this) <<
"] "
80 "with segregated-thread scheduling via individual Task_engines "
81 "across [" << n <<
"] threads-to-be: "
82 "Created; can accept work. Task_qing_thread(s) not started yet until start().");
90 FLOW_LOG_INFO(
"Segregated_thread_task_loop [" <<
this <<
"]: Destroying object; will stop threads/tasks unless "
91 "already stopped earlier.");
103 using boost::promise;
104 using std::transform;
111 FLOW_LOG_INFO(
"Starting Segregated_thread_task_loop [" <<
this <<
"]: Already started earlier. Ignoring.");
118 "1-to-1 Task_engines across [" << n <<
"] Task_qing_threads: Starting.");
127 vector<promise<void>> thread_init_done_promises(n);
128 for (
size_t idx = 0; idx != n; ++idx)
130 Task task_qing_thread_init_func;
131 if (!thread_init_func_or_empty.empty())
133 task_qing_thread_init_func = [idx, &thread_init_func_or_empty]()
135 thread_init_func_or_empty(idx);
140 assert(task_qing_thread_init_func.empty());
153 &(thread_init_done_promises[idx]),
154 std::move(task_qing_thread_init_func)));
156 FLOW_LOG_INFO(
"All threads are asynchronously starting. Awaiting their readiness barrier-style, in sequence.");
157 for (
size_t idx = 0; idx != n; ++idx)
159 thread_init_done_promises[idx].get_future().wait();
160 FLOW_LOG_INFO(
"Thread [" << idx <<
"] (0-based) of [" << n <<
"] (1-based) is ready.");
168 FLOW_LOG_INFO(
"Thread count was auto-determined. Further attempting thread-to-core scheduling optimization.");
170 vector<Thread*> worker_threads(n);
173 { return qing_thread_ptr->raw_worker_thread(); });
181 if (!init_task_or_empty.empty())
197 FLOW_LOG_INFO(
"Stopping Segregated_thread_task_loop [" <<
this <<
"]: Already stopped earlier. Ignoring.");
202 FLOW_LOG_INFO(
"Stopping Segregated_thread_task_loop [" <<
this <<
"]: All ongoing tasks will complete normally; "
203 "all pending thread tasks will be belayed [sic]; "
204 "each thread will be asked to gracefully terminate and be joined synchronously. "
205 "Any subsequently-queued tasks will not run until start().");
232 assert(thread_ptr_in_container->task_engine() ==
m_task_engines[idx]);
233 thread_ptr_in_container.reset();
285 auto const chosen_task_engine =
m_task_engines[chosen_thread_idx];
288 FLOW_LOG_TRACE(
"Segregated_thread_task_loop [" <<
this <<
"]: About to post single-task sequence "
289 "on randomly selected thread [" << chosen_thread_idx <<
"]: "
290 "Task_engine [" << chosen_task_engine <<
"].");
292 post_impl(chosen_task_engine, synchronicity, std::move(task));
305 FLOW_LOG_TRACE(
"Segregated_thread_task_loop [" <<
this <<
"]: About to post a task in multi-task sequence "
306 "on previously chosen thread: "
307 "Task_engine [" << chosen_task_engine <<
"].");
309 post_impl(chosen_task_engine, synchronicity, std::move(task));
318 auto const chosen_task_engine =
m_task_engines[chosen_thread_idx];
321 "About to boost.asio-timer-schedule single-task sequence "
322 "on randomly selected thread [" << chosen_thread_idx <<
"]: "
323 "Task_engine [" << chosen_task_engine <<
"].");
339 "About to boost.asio-timer-schedule a task in multi-task sequence on previously chosen thread: "
340 "Task_engine [" << chosen_task_engine <<
"].");
369 asio_exec_ctx_post<Task_engine>
370 (
get_logger(), chosen_task_engine.get(), synchronicity, std::move(task));
389 "current processor logical core index [" <<
cpu_idx() <<
"].");
392 "current processor logical core index [" <<
cpu_idx() <<
"].");
396 chosen_task_engine.get(), std::move(task_plus_logs));
401 chosen_task_engine.get(), std::move(task));
409 auto const chosen_task_engine =
m_task_engines[chosen_thread_idx];
411 FLOW_LOG_TRACE(
"Segregated_thread_task_loop [" <<
this <<
"]: About to return boost.asio Task_engine "
412 "assigned to randomly selected thread [" << chosen_thread_idx <<
"]: "
413 "Task_engine [" << chosen_task_engine <<
"].");
415 return chosen_task_engine;
421 using boost::any_cast;
426 return any_cast<Task_engine_ptr>(op);
The core flow::async interface, providing an optionally multi-threaded thread pool onto which runnabl...
Simple, immutable vector-like sequence of N opaque async::Op objects, usually corresponding to N work...
const Op & random_op(size_t *chosen_idx=0) const
Returns (*this)[R], where we randomly select R as if by random_idx() and communicate it to the caller...
size_t random_idx() const
Returns a randomly selected index from range [O, size()).
const std::string m_nickname
See constructor.
std::vector< Task_qing_thread_ptr > m_qing_threads
N task-execution-capable worker threads whose lifetimes equal those of *this, each with its own util:...
Segregated_thread_task_loop(log::Logger *logger_ptr, util::String_view nickname, size_t n_threads_or_zero, bool est_hw_core_sharing_helps_algo=false, bool est_hw_core_pinning_helps_algo=false, bool hw_threads_is_grouping_collated=false)
Constructs object, making it available for post() and similar work, but without starting any threads ...
size_t n_threads() const override
Implements superclass API.
boost::movelib::unique_ptr< const Op_list > m_per_thread_ops
See per_thread_ops().
const Op_list & per_thread_ops() override
Implements superclass API.
util::Scheduled_task_handle schedule_from_now(const Fine_duration &from_now, Scheduled_task &&task) override
Implements superclass API.
void start(Task &&init_task_or_empty=Task(), const Thread_init_func &thread_init_func_or_empty=Thread_init_func()) override
Implements superclass API.
Task_engine_ptr task_engine() override
See superclass API.
const size_t m_n_threads_or_zero
See constructor.
void post_impl(const Task_engine_ptr &chosen_task_engine, Synchronicity synchronicity, Task &&task)
Helper performing the core Task_engine::post() (or similar) call on behalf of the various post() over...
const bool m_hw_threads_is_grouping_collated
See constructor.
Op create_op() override
Implements superclass API.
void stop() override
Implements superclass API.
util::Scheduled_task_handle schedule_from_now_impl(const Task_engine_ptr &chosen_task_engine, const Fine_duration &from_now, Scheduled_task &&task)
Helper performing the core util::schedule_task_from_now() call on behalf of the various schedule_from...
const bool m_est_hw_core_sharing_helps_algo
See constructor.
boost::movelib::unique_ptr< Task_qing_thread > Task_qing_thread_ptr
Short-hand for smart pointer to Task_qing_thread.
void post(Task &&task, Synchronicity synchronicity=Synchronicity::S_ASYNC) override
Implements superclass API.
~Segregated_thread_task_loop() override
See superclass destructor.
std::vector< Task_engine_ptr > m_task_engines
boost.asio Task_engines (a/k/a io_services) used by each respective element in m_qing_threads.
const bool m_est_hw_core_pinning_helps_algo
See constructor.
util::Scheduled_task_handle schedule_at(const Fine_time_pt &at, Scheduled_task &&task) override
Implements superclass API.
Internally used building block of various concrete Concurrent_task_loop subclasses that encapsulates ...
const Component & get_log_component() const
Returns reference to the stored Component object, particularly as many FLOW_LOG_*() macros expect.
Logger * get_logger() const
Returns the stored Logger pointer, particularly as many FLOW_LOG_*() macros expect.
Interface that the user should implement, passing the implementing Logger into logging classes (Flow'...
#define FLOW_LOG_INFO(ARG_stream_fragment)
Logs an INFO message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
#define FLOW_LOG_TRACE_WITHOUT_CHECKING(ARG_stream_fragment)
Logs a TRACE message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
#define FLOW_LOG_TRACE(ARG_stream_fragment)
Logs a TRACE message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
Flow module containing tools enabling multi-threaded event loops operating under the asynchronous-tas...
boost::any Op
An object of this opaque type represents a collection of 1 or more async::Task, past or future,...
void optimize_pinning_in_thread_pool(flow::log::Logger *logger_ptr, const std::vector< util::Thread * > &threads_in_pool, bool est_hw_core_sharing_helps_algo, bool est_hw_core_pinning_helps_algo, bool hw_threads_is_grouping_collated)
Assuming the same situation as documented for optimal_worker_thread_count_per_pool(),...
Synchronicity
Enumeration indicating the manner in which asio_exec_ctx_post(), and various boost....
@ S_ASYNC
Simply post the given task to execute asynchronously in some execution context – as soon as the conte...
@ S_ASYNC_AND_AWAIT_CONCURRENT_COMPLETION
Same as Synchronicity::S_ASYNC but the posting routine then waits as long as necessary for the given ...
uint16_t cpu_idx()
Returns the 0-based processor logical (not hardware) core index of the core executing the calling thr...
boost::shared_ptr< util::Task_engine > Task_engine_ptr
Short-hand for reference-counting pointer to a mutable util::Task_engine (a/k/a boost::asio::io_servi...
unsigned int optimal_worker_thread_count_per_pool(flow::log::Logger *logger_ptr, bool est_hw_core_sharing_helps_algo)
Assuming a planned thread pool will be receiving ~symmetrical load, and its UX-affecting (in particul...
Task_engine_ptr op_to_exec_ctx< Task_engine_ptr >(Concurrent_task_loop *loop, const Op &op)
Template specialization for operation that obtains the underlying execution context,...
@ S_TRACE
Message indicates any condition that may occur with great frequency (thus verbose if logged).
std::string ostream_op_string(T const &... ostream_args)
Equivalent to ostream_op_to_string() but returns a new string by value instead of writing to the call...
Scheduled_task_handle schedule_task_from_now(log::Logger *logger_ptr, const Fine_duration &from_now, bool single_threaded, Task_engine *task_engine, Scheduled_task_handler &&task_body_moved)
Schedule the given function to execute in a certain amount of time: A handy wrapper around Timer (asi...
boost::shared_ptr< Scheduled_task_handle_state > Scheduled_task_handle
Black-box type that represents a handle to a scheduled task as scheduled by schedule_task_at() or sch...
boost::asio::io_service Task_engine
Short-hand for boost.asio event service, the central class of boost.asio.
Basic_string_view< char > String_view
Commonly used char-based Basic_string_view. See its doc header.
boost::thread Thread
Short-hand for standard thread class.
Flow_log_component
The flow::log::Component payload enumeration comprising various log components used by Flow's own int...
Fine_clock::duration Fine_duration
A high-res time duration as computed from two Fine_time_pts.
Fine_clock::time_point Fine_time_pt
A high-res time point as returned by Fine_clock::now() and suitable for precise time math in general.