20#include "flow/error/error.hpp"
22# include <mach/thread_policy.h>
23# include <mach/thread_act.h>
36 bool est_hw_core_sharing_helps_algo)
42 const unsigned int n_phys_cores = Thread::physical_concurrency();
43 const unsigned int n_logic_cores = Thread::hardware_concurrency();
44 const bool core_sharing_supported = n_phys_cores != n_logic_cores;
46 FLOW_LOG_INFO(
"System reports processor with [" << n_phys_cores <<
"] physical cores; and "
47 "[" << n_logic_cores <<
"] hardware threads a/k/a logical cores; core sharing (a/k/a "
48 "hyper-threading) is "
49 "thus [" << (core_sharing_supported ?
"supported" :
"unsupported") <<
"].");
51 if (!core_sharing_supported)
54 "Therefore suggested thread pool thread count is "
55 "simply the logical core count = [" << n_logic_cores <<
"].");
59 if (est_hw_core_sharing_helps_algo)
61 FLOW_LOG_INFO(
"Application estimates this thread pool DOES benefit from 2+ hardware threads sharing physical "
62 "processor core (a/k/a hyper-threading); therefore we shall act as if there is 1 hardware thread "
63 "a/k/a logical core per physical core, even though in reality above shows it is [supported]. "
64 "Therefore suggested thread pool thread count is "
65 "simply the logical core count = [" << n_logic_cores <<
"].");
70 FLOW_LOG_INFO(
"Application estimates this thread pool does NOT benefit from 2+ hardware threads sharing physical "
71 "processor core (a/k/a hyper-threading); "
72 "therefore suggested thread pool thread count is "
73 "simply the physical core count = [" << n_phys_cores <<
"].");
79 const std::vector<util::Thread*>& threads_in_pool,
80 [[maybe_unused]]
bool est_hw_core_sharing_helps_algo,
81 bool est_hw_core_pinning_helps_algo,
82 bool hw_threads_is_grouping_collated)
99#if FLOW_ASYNC_HW_THREAD_AFFINITY_PTHREAD_VIA_CORE_IDX
101 using ::pthread_setaffinity_np;
102#elif FLOW_ASYNC_HW_THREAD_AFFINITY_MACH_VIA_POLICY_TAG
103 using ::pthread_mach_thread_np;
104 using ::thread_affinity_policy_data_t;
105 using ::thread_policy_set;
108 static_assert(
false,
"We only know how to deal with thread-core affinities in Darwin/Mac and Linux.");
110 using boost::system::system_category;
111 using std::runtime_error;
117 if (!est_hw_core_pinning_helps_algo)
119 FLOW_LOG_INFO(
"Application estimates the logic in this thread pool would not benefit from pinning threads to "
120 "processor cores; therefore letting system control assignment of threads to processor cores.");
124 assert(est_hw_core_pinning_helps_algo);
127 const auto n_pool_threads = threads_in_pool.size();
130 const auto n_logic_cores_per_pool_thread = Thread::hardware_concurrency() / n_pool_threads;
132 FLOW_LOG_INFO(
"Application estimates thread pool would benefit from pinning threads to processor cores; "
133 "will set affinities as follows below. "
134 "Thread count in pool is [" << n_pool_threads <<
"]; "
135 "at [" << n_logic_cores_per_pool_thread <<
"] logical processor cores each.");
137 for (
unsigned int thread_idx = 0; thread_idx != n_pool_threads; ++thread_idx)
139 Thread* thread = threads_in_pool[thread_idx];
140 const auto native_pthread_thread_id = thread->native_handle();
142#if FLOW_ASYNC_HW_THREAD_AFFINITY_PTHREAD_VIA_CORE_IDX
143 cpu_set_t cpu_set_for_thread;
144 CPU_ZERO(&cpu_set_for_thread);
146 for (
unsigned int logical_core_idx_given_thread_idx = 0;
147 logical_core_idx_given_thread_idx != n_logic_cores_per_pool_thread;
148 ++logical_core_idx_given_thread_idx)
154 const unsigned int native_logical_core_id
155 = hw_threads_is_grouping_collated ? ((thread_idx * n_logic_cores_per_pool_thread)
156 + logical_core_idx_given_thread_idx)
157 : ((logical_core_idx_given_thread_idx * n_pool_threads)
159 FLOW_LOG_INFO(
"Thread [" << thread_idx <<
"] in pool: adding affinity for "
160 "logical core/hardware thread [" << native_logical_core_id <<
"].");
162 CPU_SET(native_logical_core_id, &cpu_set_for_thread);
165 const auto code = pthread_setaffinity_np(native_pthread_thread_id,
sizeof(cpu_set_for_thread), &cpu_set_for_thread);
168 const Error_code sys_err_code(errno, system_category());
171 "pthread_setaffinity_np() call in optimize_pinning_in_thread_pool()");
174#elif FLOW_ASYNC_HW_THREAD_AFFINITY_MACH_VIA_POLICY_TAG
175 const unsigned int native_affinity_tag = 1 + thread_idx;
176 FLOW_LOG_INFO(
"Thread [" << thread_idx <<
"] in pool: setting Mach affinity tag [" << native_affinity_tag <<
"].");
179 const auto native_mach_thread_id = pthread_mach_thread_np(native_pthread_thread_id);
180 if (native_pthread_thread_id == 0)
182 const Error_code sys_err_code(errno, system_category());
184 throw error::Runtime_error(sys_err_code,
"pthread_mach_thread_np() call in optimize_pinning_in_thread_pool()");
188 "<=> Mach thread ID [" << native_mach_thread_id <<
"].");
206 thread_affinity_policy_data_t native_thread_policy_data{ int(native_affinity_tag) };
207 const auto code = thread_policy_set(native_mach_thread_id, THREAD_AFFINITY_POLICY,
209 reinterpret_cast<thread_policy_t
>(&native_thread_policy_data), 1);
222 "] [thread_policy_set(THREAD_AFFINITY_POLICY) failed]"));
226 static_assert(
false,
"Compiler should not have reached this point; serious bug?");
~Concurrent_task_loop() override
Any implementing subclass's destructor shall execute stop() – see its doc header please – and then cl...
An std::runtime_error (which is an std::exception) that stores an Error_code.
Interface that the user should implement, passing the implementing Logger into logging classes (Flow'...
#define FLOW_ERROR_SYS_ERROR_LOG_WARNING()
Logs a warning about the (often errno-based or from a library) error code in sys_err_code.
#define FLOW_LOG_INFO(ARG_stream_fragment)
Logs an INFO message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
#define FLOW_LOG_SET_CONTEXT(ARG_logger_ptr, ARG_component_payload)
For the rest of the block within which this macro is instantiated, causes all FLOW_LOG_....
#define FLOW_LOG_TRACE(ARG_stream_fragment)
Logs a TRACE message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
Flow module containing tools enabling multi-threaded event loops operating under the asynchronous-tas...
void optimize_pinning_in_thread_pool(flow::log::Logger *logger_ptr, const std::vector< util::Thread * > &threads_in_pool, bool est_hw_core_sharing_helps_algo, bool est_hw_core_pinning_helps_algo, bool hw_threads_is_grouping_collated)
Assuming the same situation as documented for optimal_worker_thread_count_per_pool(),...
unsigned int optimal_worker_thread_count_per_pool(flow::log::Logger *logger_ptr, bool est_hw_core_sharing_helps_algo)
Assuming a planned thread pool will be receiving ~symmetrical load, and its UX-affecting (in particul...
std::string ostream_op_string(T const &... ostream_args)
Equivalent to ostream_op_to_string() but returns a new string by value instead of writing to the call...
boost::thread Thread
Short-hand for standard thread class.
boost::system::error_code Error_code
Short-hand for a boost.system error code (which basically encapsulates an integer/enum error code and...