30                                               size_t n_threads_or_zero,
 
   31                                               bool est_hw_core_sharing_helps_algo,
 
   32                                               bool est_hw_core_pinning_helps_algo,
 
   33                                               bool hw_threads_is_grouping_collated) :
 
   37  m_n_threads_or_zero(n_threads_or_zero),
 
   38  m_est_hw_core_sharing_helps_algo(est_hw_core_sharing_helps_algo),
 
   39  m_est_hw_core_pinning_helps_algo(est_hw_core_pinning_helps_algo),
 
   40  m_hw_threads_is_grouping_collated(hw_threads_is_grouping_collated),
 
   46  m_qing_threads((m_n_threads_or_zero == 0) 
 
   48                   : m_n_threads_or_zero),
 
   51  m_shared_task_engine(new util::
Task_engine(n_threads())),
 
   53  m_per_thread_strands(logger_ptr, n_threads(),
 
   59  m_shared_task_engine->stop();
 
   62  FLOW_LOG_INFO(
"Cross_thread_task_loop [" << 
static_cast<const void*
>(
this) << 
"] " 
   63                "with nickname [" << m_nickname << 
"] " 
   64                "with cross-thread scheduling via single Task_engine across [" << n_threads() << 
"] threads-to-be: " 
   65                "Created; can accept work.  Task_qing_thread(s) not started yet until start().");
 
   73  FLOW_LOG_INFO(
"Cross_thread_task_loop [" << 
this << 
"]: Destroying object; will stop threads/tasks unless already " 
   86  using boost::unique_future;
 
   87  using boost::movelib::unique_ptr;
 
   97    FLOW_LOG_WARNING(
"Starting Cross_thread_task_loop [" << 
this << 
"]: Already started earlier.  Ignoring.");
 
  104                "Single Task_engine across [" << n << 
"] Task_qing_threads: Starting.");
 
  119  vector<promise<void>> thread_init_done_promises(n);
 
  120  for (
size_t idx = 0; idx != n; ++idx)
 
  122    Task task_qing_thread_init_func;
 
  123    if (!thread_init_func_or_empty.empty())
 
  125      task_qing_thread_init_func = [idx, &thread_init_func_or_empty]()
 
  127        thread_init_func_or_empty(idx);
 
  132      assert(task_qing_thread_init_func.empty()); 
 
  138                                                   &(thread_init_done_promises[idx]),
 
  139                                                   std::move(task_qing_thread_init_func)));
 
  143  FLOW_LOG_INFO(
"All threads are asynchronously starting.  Awaiting their readiness barrier-style, in sequence.");
 
  144  for (
size_t idx = 0; idx != n; ++idx)
 
  146    thread_init_done_promises[idx].get_future().wait();
 
  147    FLOW_LOG_INFO(
"Thread [" << idx << 
"] (0-based) of [" << n << 
"] (1-based) is ready.");
 
  165    FLOW_LOG_INFO(
"Thread count was auto-determined.  Further attempting thread-to-core scheduling optimization.");
 
  167    vector<Thread*> worker_threads(n); 
 
  170                { return qing_thread_ptr->raw_worker_thread(); });
 
  181  if (!init_task_or_empty.empty())
 
  204    FLOW_LOG_INFO(
"Stopping Cross_thread_task_loop [" << 
this << 
"]: Already stopped earlier.  Ignoring.");
 
  210                "All ongoing thread tasks will complete normally; all pending thread tasks will be belayed [sic]; " 
  211                "each thread will be asked to gracefully terminate and be joined synchronously.  " 
  212                "Any subsequently-queued tasks will not run until start().");
 
  232    thread_ptr_in_container->stop();
 
  243    thread_ptr_in_container.reset();
 
  282                 "About to post single-task sequence via boost.asio scheduler; details TRACE-logged below.");
 
  288  asio_exec_ctx_post<Task_engine>
 
  299    post(std::move(task));
 
  306                 "About to post a task in multi-task sequence " 
  307                 "via previously created strand; details TRACE-logged below.");
 
  315  asio_exec_ctx_post<Strand>
 
  316    (
get_logger(), chosen_strand_ptr.get(), synchronicity, std::move(task));
 
  330                                    "About to schedule single-task sequence via boost.asio timer+scheduler.");
 
  336                                      "current processor logical core index [" << 
cpu_idx() << 
"].");
 
  339                                      "current processor logical core index [" << 
cpu_idx() << 
"].");
 
  357  using boost::asio::bind_executor;
 
  374                                    "About to schedule single-task sequence via boost.asio timer+scheduler " 
  375                                    "via previously created strand [" << chosen_strand_ptr << 
"].");
 
  381                                      "current processor logical core index [" << 
cpu_idx() << 
"].");
 
  384                                      "current processor logical core index [" << 
cpu_idx() << 
"].");
 
  388                                  bind_executor(*chosen_strand_ptr, task_plus_logs)); 
 
  393                                bind_executor(*chosen_strand_ptr, task)); 
 
  422  using boost::any_cast;
 
  425  auto const strand_ptr = any_cast<Strand_ptr>(op);
 
  426  assert(&(strand_ptr->context()) == loop->task_engine().get());
 
The core flow::async interface, providing an optionally multi-threaded thread pool onto which runnabl...
const bool m_est_hw_core_pinning_helps_algo
See constructor.
util::Scheduled_task_handle schedule_from_now(const Fine_duration &from_now, Scheduled_task &&task) override
Implements superclass API.
~Cross_thread_task_loop() override
See superclass destructor.
Op create_op() override
Implements superclass API.
std::vector< Task_qing_thread_ptr > m_qing_threads
N task-execution-capable worker threads whose lifetimes equal those of *this, all sharing m_shared_ta...
Task_engine_ptr m_shared_task_engine
boost.asio Task_engine (a/k/a io_context) co-used by all m_qing_threads.
void start(Task &&init_task_or_empty=Task(), const Thread_init_func &thread_init_func_or_empty=Thread_init_func()) override
Implements superclass API.
size_t n_threads() const override
Implements superclass API.
boost::movelib::unique_ptr< Task_qing_thread > Task_qing_thread_ptr
Short-hand for smart pointer to Task_qing_thread.
Cross_thread_task_loop(log::Logger *logger_ptr, util::String_view nickname, size_t n_threads_or_zero, bool est_hw_core_sharing_helps_algo=false, bool est_hw_core_pinning_helps_algo=false, bool hw_threads_is_grouping_collated=false)
Constructs object, making it available for post() and similar work, but without starting any threads ...
const bool m_hw_threads_is_grouping_collated
See constructor.
const std::string m_nickname
See constructor.
const bool m_est_hw_core_sharing_helps_algo
See constructor.
void stop() override
Implements superclass API.
util::Scheduled_task_handle schedule_at(const Fine_time_pt &at, Scheduled_task &&task) override
Implements superclass API.
const size_t m_n_threads_or_zero
See constructor.
void post(Task &&task, Synchronicity synchronicity=Synchronicity::S_ASYNC) override
Implements superclass API.
Task_engine_ptr task_engine() override
See superclass API.
const Op_list m_per_thread_strands
See per_thread_ops(). The boost::any payload is always of the type async::Strand_ptr.
const Op_list & per_thread_ops() override
Implements superclass API.
Simple, immutable vector-like sequence of N opaque async::Op objects, usually corresponding to N work...
Internally used building block of various concrete Concurrent_task_loop subclasses that encapsulates ...
const Component & get_log_component() const
Returns reference to the stored Component object, particularly as many FLOW_LOG_*() macros expect.
Logger * get_logger() const
Returns the stored Logger pointer, particularly as many FLOW_LOG_*() macros expect.
Interface that the user should implement, passing the implementing Logger into logging classes (Flow'...
#define FLOW_LOG_INFO(ARG_stream_fragment)
Logs an INFO message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
#define FLOW_LOG_WARNING(ARG_stream_fragment)
Logs a WARNING message into flow::log::Logger *get_logger() with flow::log::Component get_log_compone...
#define FLOW_LOG_TRACE_WITHOUT_CHECKING(ARG_stream_fragment)
Logs a TRACE message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
#define FLOW_LOG_TRACE(ARG_stream_fragment)
Logs a TRACE message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
Flow module containing tools enabling multi-threaded event loops operating under the asynchronous-tas...
boost::any Op
An object of this opaque type represents a collection of 1 or more async::Task, past or future,...
void optimize_pinning_in_thread_pool(flow::log::Logger *logger_ptr, const std::vector< util::Thread * > &threads_in_pool, bool est_hw_core_sharing_helps_algo, bool est_hw_core_pinning_helps_algo, bool hw_threads_is_grouping_collated)
Assuming the same situation as documented for optimal_worker_thread_count_per_pool(),...
Synchronicity
Enumeration indicating the manner in which asio_exec_ctx_post(), and various boost....
@ S_ASYNC
Simply post the given task to execute asynchronously in some execution context – as soon as the conte...
@ S_ASYNC_AND_AWAIT_CONCURRENT_COMPLETION
Same as Synchronicity::S_ASYNC but the posting routine then waits as long as necessary for the given ...
boost::shared_ptr< util::Strand > Strand_ptr
Short-hand for ref-counted pointer to util::Strand.
uint16_t cpu_idx()
Returns the 0-based processor logical (not hardware) core index of the core executing the calling thr...
Strand_ptr op_to_exec_ctx< Strand_ptr >(Concurrent_task_loop *loop, const Op &op)
Template specialization for operation that obtains the underlying execution context,...
boost::shared_ptr< util::Task_engine > Task_engine_ptr
Short-hand for reference-counting pointer to a mutable util::Task_engine (a/k/a boost::asio::io_conte...
unsigned int optimal_worker_thread_count_per_pool(flow::log::Logger *logger_ptr, bool est_hw_core_sharing_helps_algo)
Assuming a planned thread pool will be receiving ~symmetrical load, and its UX-affecting (in particul...
@ S_TRACE
Message indicates any condition that may occur with great frequency (thus verbose if logged).
std::string ostream_op_string(T const &... ostream_args)
Equivalent to ostream_op_to_string() but returns a new string by value instead of writing to the call...
Scheduled_task_handle schedule_task_from_now(log::Logger *logger_ptr, const Fine_duration &from_now, bool single_threaded, Task_engine *task_engine, Scheduled_task_handler &&task_body_moved)
Schedule the given function to execute in a certain amount of time: A handy wrapper around Timer (asi...
boost::shared_ptr< Scheduled_task_handle_state > Scheduled_task_handle
Black-box type that represents a handle to a scheduled task as scheduled by schedule_task_at() or sch...
Task_engine::strand Strand
Short-hand for boost.asio strand, an ancillary class that works with Task_engine for advanced task sc...
boost::asio::io_context Task_engine
Short-hand for boost.asio event service, the central class of boost.asio.
Basic_string_view< char > String_view
Commonly used char-based Basic_string_view. See its doc header.
boost::thread Thread
Short-hand for standard thread class.
Flow_log_component
The flow::log::Component payload enumeration comprising various log components used by Flow's own int...
Fine_clock::duration Fine_duration
A high-res time duration as computed from two Fine_time_pts.
Fine_clock::time_point Fine_time_pt
A high-res time point as returned by Fine_clock::now() and suitable for precise time math in general.