30                                                         size_t n_threads_or_zero,
 
   31                                                         bool est_hw_core_sharing_helps_algo,
 
   32                                                         bool est_hw_core_pinning_helps_algo,
 
   33                                                         bool hw_threads_is_grouping_collated) :
 
   37  m_n_threads_or_zero(n_threads_or_zero),
 
   38  m_est_hw_core_sharing_helps_algo(est_hw_core_sharing_helps_algo),
 
   39  m_est_hw_core_pinning_helps_algo(est_hw_core_pinning_helps_algo),
 
   40  m_hw_threads_is_grouping_collated(hw_threads_is_grouping_collated),
 
   46  m_qing_threads((m_n_threads_or_zero == 0) 
 
   48                   : m_n_threads_or_zero),
 
   50  m_task_engines(n_threads()) 
 
   61    task_engine_ptr_in_container.reset(
new Task_engine(1));
 
   65    task_engine_ptr_in_container->stop();
 
   72                                     [
this](
size_t idx) -> 
Op 
   78  FLOW_LOG_INFO(
"Segregated_thread_task_loop [" << 
static_cast<const void*
>(
this) << 
"] " 
   80                "with segregated-thread scheduling via individual Task_engines " 
   81                "across [" << n << 
"] threads-to-be: " 
   82                "Created; can accept work.  Task_qing_thread(s) not started yet until start().");
 
   90  FLOW_LOG_INFO(
"Segregated_thread_task_loop [" << 
this << 
"]: Destroying object; will stop threads/tasks unless " 
   91                "already stopped earlier.");
 
  103  using boost::promise;
 
  104  using std::transform;
 
  111    FLOW_LOG_INFO(
"Starting Segregated_thread_task_loop [" << 
this << 
"]: Already started earlier.  Ignoring.");
 
  118                "1-to-1 Task_engines across [" << n << 
"] Task_qing_threads: Starting.");
 
  127  vector<promise<void>> thread_init_done_promises(n);
 
  128  for (
size_t idx = 0; idx != n; ++idx)
 
  130    Task task_qing_thread_init_func;
 
  131    if (!thread_init_func_or_empty.empty())
 
  133      task_qing_thread_init_func = [idx, &thread_init_func_or_empty]()
 
  135        thread_init_func_or_empty(idx);
 
  140      assert(task_qing_thread_init_func.empty()); 
 
  153                                                   &(thread_init_done_promises[idx]),
 
  154                                                   std::move(task_qing_thread_init_func)));
 
  156  FLOW_LOG_INFO(
"All threads are asynchronously starting.  Awaiting their readiness barrier-style, in sequence.");
 
  157  for (
size_t idx = 0; idx != n; ++idx)
 
  159    thread_init_done_promises[idx].get_future().wait();
 
  160    FLOW_LOG_INFO(
"Thread [" << idx << 
"] (0-based) of [" << n << 
"] (1-based) is ready.");
 
  168    FLOW_LOG_INFO(
"Thread count was auto-determined.  Further attempting thread-to-core scheduling optimization.");
 
  170    vector<Thread*> worker_threads(n); 
 
  173                { return qing_thread_ptr->raw_worker_thread(); });
 
  181  if (!init_task_or_empty.empty())
 
  197    FLOW_LOG_INFO(
"Stopping Segregated_thread_task_loop [" << 
this << 
"]: Already stopped earlier.  Ignoring.");
 
  202  FLOW_LOG_INFO(
"Stopping Segregated_thread_task_loop [" << 
this << 
"]: All ongoing tasks will complete normally; " 
  203                "all pending thread tasks will be belayed [sic]; " 
  204                "each thread will be asked to gracefully terminate and be joined synchronously.  " 
  205                "Any subsequently-queued tasks will not run until start().");
 
  232    assert(thread_ptr_in_container->task_engine() == 
m_task_engines[idx]);
 
  233    thread_ptr_in_container.reset();
 
  285  auto const chosen_task_engine = 
m_task_engines[chosen_thread_idx];
 
  288  FLOW_LOG_TRACE(
"Segregated_thread_task_loop [" << 
this << 
"]: About to post single-task sequence " 
  289                 "on randomly selected thread [" << chosen_thread_idx << 
"]: " 
  290                 "Task_engine [" << chosen_task_engine << 
"].");
 
  292  post_impl(chosen_task_engine, synchronicity, std::move(task));
 
  305  FLOW_LOG_TRACE(
"Segregated_thread_task_loop [" << 
this << 
"]: About to post a task in multi-task sequence " 
  306                 "on previously chosen thread: " 
  307                 "Task_engine [" << chosen_task_engine << 
"].");
 
  309  post_impl(chosen_task_engine, synchronicity, std::move(task));
 
  318  auto const chosen_task_engine = 
m_task_engines[chosen_thread_idx];
 
  321                 "About to boost.asio-timer-schedule single-task sequence " 
  322                 "on randomly selected thread [" << chosen_thread_idx << 
"]: " 
  323                 "Task_engine [" << chosen_task_engine << 
"].");
 
  339                 "About to boost.asio-timer-schedule a task in multi-task sequence on previously chosen thread: " 
  340                 "Task_engine [" << chosen_task_engine << 
"].");
 
  369  asio_exec_ctx_post<Task_engine>
 
  370    (
get_logger(), chosen_task_engine.get(), synchronicity, std::move(task));
 
  389                                      "current processor logical core index [" << 
cpu_idx() << 
"].");
 
  392                                      "current processor logical core index [" << 
cpu_idx() << 
"].");
 
  396                                  chosen_task_engine.get(), std::move(task_plus_logs));
 
  401                                chosen_task_engine.get(), std::move(task));
 
  409  auto const chosen_task_engine = 
m_task_engines[chosen_thread_idx];
 
  411  FLOW_LOG_TRACE(
"Segregated_thread_task_loop [" << 
this << 
"]: About to return boost.asio Task_engine " 
  412                 "assigned to randomly selected thread [" << chosen_thread_idx << 
"]: " 
  413                 "Task_engine [" << chosen_task_engine << 
"].");
 
  415  return chosen_task_engine;
 
  421  using boost::any_cast;
 
  426  return any_cast<Task_engine_ptr>(op);
 
The core flow::async interface, providing an optionally multi-threaded thread pool onto which runnabl...
Simple, immutable vector-like sequence of N opaque async::Op objects, usually corresponding to N work...
const Op & random_op(size_t *chosen_idx=0) const
Returns (*this)[R], where we randomly select R as if by random_idx() and communicate it to the caller...
size_t random_idx() const
Returns a randomly selected index from range [O, size()).
const std::string m_nickname
See constructor.
std::vector< Task_qing_thread_ptr > m_qing_threads
N task-execution-capable worker threads whose lifetimes equal those of *this, each with its own util:...
Segregated_thread_task_loop(log::Logger *logger_ptr, util::String_view nickname, size_t n_threads_or_zero, bool est_hw_core_sharing_helps_algo=false, bool est_hw_core_pinning_helps_algo=false, bool hw_threads_is_grouping_collated=false)
Constructs object, making it available for post() and similar work, but without starting any threads ...
size_t n_threads() const override
Implements superclass API.
boost::movelib::unique_ptr< const Op_list > m_per_thread_ops
See per_thread_ops().
const Op_list & per_thread_ops() override
Implements superclass API.
util::Scheduled_task_handle schedule_from_now(const Fine_duration &from_now, Scheduled_task &&task) override
Implements superclass API.
void start(Task &&init_task_or_empty=Task(), const Thread_init_func &thread_init_func_or_empty=Thread_init_func()) override
Implements superclass API.
Task_engine_ptr task_engine() override
See superclass API.
const size_t m_n_threads_or_zero
See constructor.
void post_impl(const Task_engine_ptr &chosen_task_engine, Synchronicity synchronicity, Task &&task)
Helper performing the core post(Task_engine&) (or similar) call on behalf of the various post() overl...
const bool m_hw_threads_is_grouping_collated
See constructor.
Op create_op() override
Implements superclass API.
void stop() override
Implements superclass API.
util::Scheduled_task_handle schedule_from_now_impl(const Task_engine_ptr &chosen_task_engine, const Fine_duration &from_now, Scheduled_task &&task)
Helper performing the core util::schedule_task_from_now() call on behalf of the various schedule_from...
const bool m_est_hw_core_sharing_helps_algo
See constructor.
boost::movelib::unique_ptr< Task_qing_thread > Task_qing_thread_ptr
Short-hand for smart pointer to Task_qing_thread.
void post(Task &&task, Synchronicity synchronicity=Synchronicity::S_ASYNC) override
Implements superclass API.
~Segregated_thread_task_loop() override
See superclass destructor.
std::vector< Task_engine_ptr > m_task_engines
boost.asio Task_engines (a/k/a io_contexts) used by each respective element in m_qing_threads.
const bool m_est_hw_core_pinning_helps_algo
See constructor.
util::Scheduled_task_handle schedule_at(const Fine_time_pt &at, Scheduled_task &&task) override
Implements superclass API.
Internally used building block of various concrete Concurrent_task_loop subclasses that encapsulates ...
const Component & get_log_component() const
Returns reference to the stored Component object, particularly as many FLOW_LOG_*() macros expect.
Logger * get_logger() const
Returns the stored Logger pointer, particularly as many FLOW_LOG_*() macros expect.
Interface that the user should implement, passing the implementing Logger into logging classes (Flow'...
#define FLOW_LOG_INFO(ARG_stream_fragment)
Logs an INFO message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
#define FLOW_LOG_TRACE_WITHOUT_CHECKING(ARG_stream_fragment)
Logs a TRACE message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
#define FLOW_LOG_TRACE(ARG_stream_fragment)
Logs a TRACE message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
Flow module containing tools enabling multi-threaded event loops operating under the asynchronous-tas...
boost::any Op
An object of this opaque type represents a collection of 1 or more async::Task, past or future,...
Synchronicity
Enumeration indicating the manner in which asio_exec_ctx_post(), and various boost....
@ S_ASYNC
Simply post the given task to execute asynchronously in some execution context – as soon as the conte...
@ S_ASYNC_AND_AWAIT_CONCURRENT_COMPLETION
Same as Synchronicity::S_ASYNC but the posting routine then waits as long as necessary for the given ...
void optimize_pinning_in_thread_pool(log::Logger *logger_ptr, const std::vector< util::Thread * > &threads_in_pool, bool est_hw_core_sharing_helps_algo, bool est_hw_core_pinning_helps_algo, bool hw_threads_is_grouping_collated, Error_code *err_code=nullptr)
Assuming the same situation as documented for optimal_worker_thread_count_per_pool(),...
uint16_t cpu_idx()
Returns the 0-based processor logical (not hardware) core index of the core executing the calling thr...
boost::shared_ptr< util::Task_engine > Task_engine_ptr
Short-hand for reference-counting pointer to a mutable util::Task_engine (a/k/a boost::asio::io_conte...
Task_engine_ptr op_to_exec_ctx< Task_engine_ptr >(Concurrent_task_loop *loop, const Op &op)
Template specialization for operation that obtains the underlying execution context,...
unsigned int optimal_worker_thread_count_per_pool(log::Logger *logger_ptr, bool est_hw_core_sharing_helps_algo)
Assuming a planned thread pool will be receiving ~symmetrical load, and its UX-affecting (in particul...
@ S_TRACE
Message indicates any condition that may occur with great frequency (thus verbose if logged).
std::string ostream_op_string(T const &... ostream_args)
Equivalent to ostream_op_to_string() but returns a new string by value instead of writing to the call...
Scheduled_task_handle schedule_task_from_now(log::Logger *logger_ptr, const Fine_duration &from_now, bool single_threaded, Task_engine *task_engine, Scheduled_task_handler &&task_body_moved)
Schedule the given function to execute in a certain amount of time: A handy wrapper around Timer (asi...
boost::shared_ptr< Scheduled_task_handle_state > Scheduled_task_handle
Black-box type that represents a handle to a scheduled task as scheduled by schedule_task_at() or sch...
boost::asio::io_context Task_engine
Short-hand for boost.asio event service, the central class of boost.asio.
Basic_string_view< char > String_view
Commonly used char-based Basic_string_view. See its doc header.
boost::thread Thread
Short-hand for standard thread class.
Flow_log_component
The flow::log::Component payload enumeration comprising various log components used by Flow's own int...
Fine_clock::duration Fine_duration
A high-res time duration as computed from two Fine_time_pts.
Fine_clock::time_point Fine_time_pt
A high-res time point as returned by Fine_clock::now() and suitable for precise time math in general.