20#include "flow/error/error.hpp"
23# include <mach/thread_policy.h>
24# include <mach/thread_act.h>
38 bool est_hw_core_sharing_helps_algo)
44 const unsigned int n_phys_cores = Thread::physical_concurrency();
45 const unsigned int n_logic_cores = Thread::hardware_concurrency();
46 const bool core_sharing_supported = n_phys_cores != n_logic_cores;
48 FLOW_LOG_INFO(
"System reports processor with [" << n_phys_cores <<
"] physical cores; and "
49 "[" << n_logic_cores <<
"] hardware threads a/k/a logical cores; core sharing (a/k/a "
50 "hyper-threading) is "
51 "thus [" << (core_sharing_supported ?
"supported" :
"unsupported") <<
"].");
53 if (!core_sharing_supported)
56 "Therefore suggested thread pool thread count is "
57 "simply the logical core count = [" << n_logic_cores <<
"].");
61 if (est_hw_core_sharing_helps_algo)
63 FLOW_LOG_INFO(
"Application estimates this thread pool DOES benefit from 2+ hardware threads sharing physical "
64 "processor core (a/k/a hyper-threading); therefore we shall act as if there is 1 hardware thread "
65 "a/k/a logical core per physical core, even though in reality above shows it is [supported]. "
66 "Therefore suggested thread pool thread count is "
67 "simply the logical core count = [" << n_logic_cores <<
"].");
72 FLOW_LOG_INFO(
"Application estimates this thread pool does NOT benefit from 2+ hardware threads sharing physical "
73 "processor core (a/k/a hyper-threading); "
74 "therefore suggested thread pool thread count is "
75 "simply the physical core count = [" << n_phys_cores <<
"].");
81 const std::vector<util::Thread*>& threads_in_pool,
82 bool est_hw_core_sharing_helps_algo,
83 bool est_hw_core_pinning_helps_algo,
84 bool hw_threads_is_grouping_collated,
90 est_hw_core_pinning_helps_algo, hw_threads_is_grouping_collated,
92 err_code,
"flow::async::optimize_pinning_in_thread_pool()"))
115 "We only know how to deal with thread-core affinities in Darwin/Mac and Linux.");
117 using boost::system::system_category;
118 using std::runtime_error;
124 if (!est_hw_core_pinning_helps_algo)
126 FLOW_LOG_INFO(
"Application estimates the logic in this thread pool would not benefit from pinning threads to "
127 "processor cores; therefore letting system control assignment of threads to processor cores.");
131 assert(est_hw_core_pinning_helps_algo);
134 const auto n_pool_threads = threads_in_pool.size();
137 const auto n_logic_cores_per_pool_thread = Thread::hardware_concurrency() / n_pool_threads;
139 FLOW_LOG_INFO(
"Application estimates thread pool would benefit from pinning threads to processor cores; "
140 "will set affinities as follows below. "
141 "Thread count in pool is [" << n_pool_threads <<
"]; "
142 "at [" << n_logic_cores_per_pool_thread <<
"] logical processor cores each.");
144 for (
unsigned int thread_idx = 0; thread_idx != n_pool_threads; ++thread_idx)
146 Thread* thread = threads_in_pool[thread_idx];
147 const auto native_pthread_thread_id = thread->native_handle();
149#if FLOW_ASYNC_HW_THREAD_AFFINITY_PTHREAD_VIA_CORE_IDX
151 using ::pthread_setaffinity_np;
153 cpu_set_t cpu_set_for_thread;
154 CPU_ZERO(&cpu_set_for_thread);
156 for (
unsigned int logical_core_idx_given_thread_idx = 0;
157 logical_core_idx_given_thread_idx != n_logic_cores_per_pool_thread;
158 ++logical_core_idx_given_thread_idx)
164 const unsigned int native_logical_core_id
165 = hw_threads_is_grouping_collated ? ((thread_idx * n_logic_cores_per_pool_thread)
166 + logical_core_idx_given_thread_idx)
167 : ((logical_core_idx_given_thread_idx * n_pool_threads)
169 FLOW_LOG_INFO(
"Thread [" << thread_idx <<
"] in pool: adding affinity for "
170 "logical core/hardware thread [" << native_logical_core_id <<
"].");
172 CPU_SET(native_logical_core_id, &cpu_set_for_thread);
175 const auto code = pthread_setaffinity_np(native_pthread_thread_id,
sizeof(cpu_set_for_thread), &cpu_set_for_thread);
178 const Error_code sys_err_code{errno, system_category()};
180 *err_code = sys_err_code;
186 static_assert(
false,
"This strongly platform-dependent function has not been properly tested and maintained "
187 "for Darwin/Mac in a long time, as Flow has been Linux-only for many years. "
188 "There is also a likely (documented, known) bug in this impl. Please revisit when "
189 "we re-add Mac/Darwin support.")
201 using ::pthread_mach_thread_np;
202 using ::thread_affinity_policy_data_t;
203 using ::thread_policy_set;
206 const unsigned int native_affinity_tag = 1 + thread_idx;
207 FLOW_LOG_INFO(
"Thread [" << thread_idx <<
"] in pool: setting Mach affinity tag [" << native_affinity_tag <<
"].");
210 const auto native_mach_thread_id = pthread_mach_thread_np(native_pthread_thread_id);
211 if (native_pthread_thread_id == 0)
213 const Error_code sys_err_code(errno, system_category());
215 throw error::Runtime_error(sys_err_code,
"pthread_mach_thread_np() call in optimize_pinning_in_thread_pool()");
219 "<=> Mach thread ID [" << native_mach_thread_id <<
"].");
237 thread_affinity_policy_data_t native_thread_policy_data{ int(native_affinity_tag) };
238 const auto code = thread_policy_set(native_mach_thread_id, THREAD_AFFINITY_POLICY,
240 reinterpret_cast<thread_policy_t
>(&native_thread_policy_data), 1);
253 "] [thread_policy_set(THREAD_AFFINITY_POLICY) failed]"));
265 err_code,
"flow::async::reset_thread_pinning()"))
273 using boost::system::system_category;
276 "For this function we only know how to deal with thread-core affinities in Linux.");
279 using ::pthread_setaffinity_np;
280 using ::pthread_self;
284 const auto native_pthread_thread_id = thread_else_ours ? thread_else_ours->native_handle()
287 cpu_set_t cpu_set_for_thread;
288 CPU_ZERO(&cpu_set_for_thread);
290 const unsigned int n_logic_cores = Thread::hardware_concurrency();
291 for (
unsigned int logic_core_idx = 0; logic_core_idx != n_logic_cores; ++logic_core_idx)
293 CPU_SET(logic_core_idx, &cpu_set_for_thread);
296 FLOW_LOG_INFO(
"Thread with native ID [" << native_pthread_thread_id <<
"]: resetting processor-affinity, "
297 "so that no particular core is preferred for this thread. (This may have already been the case.)");
299 if (pthread_setaffinity_np(native_pthread_thread_id,
sizeof(cpu_set_for_thread), &cpu_set_for_thread) == -1)
301 const Error_code sys_err_code{errno, system_category()};
303 *err_code = sys_err_code;
~Concurrent_task_loop() override
Any implementing subclass's destructor shall execute stop() – see its doc header please – and then cl...
An std::runtime_error (which is an std::exception) that stores an Error_code.
Interface that the user should implement, passing the implementing Logger into logging classes (Flow'...
#define FLOW_ASYNC_HW_THREAD_AFFINITY_PTHREAD_VIA_CORE_IDX
Macro set to 1 (else 0) if and only if natively the pthread API allows one to set thread-to-core affi...
#define FLOW_ASYNC_HW_THREAD_AFFINITY_MACH_VIA_POLICY_TAG
Macro set to 1 (else 0) if and only if natively there is Mach kernel API that allows to set thread-to...
#define FLOW_ERROR_SYS_ERROR_LOG_WARNING()
Logs a warning about the (often errno-based or from a library) error code in sys_err_code.
#define FLOW_LOG_INFO(ARG_stream_fragment)
Logs an INFO message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
#define FLOW_LOG_SET_CONTEXT(ARG_logger_ptr, ARG_component_payload)
For the rest of the block within which this macro is instantiated, causes all FLOW_LOG_....
#define FLOW_LOG_TRACE(ARG_stream_fragment)
Logs a TRACE message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
Flow module containing tools enabling multi-threaded event loops operating under the asynchronous-tas...
void reset_this_thread_pinning()
Resets processor-affinity of the calling thread; does not log; and throws on extremely unlikely syste...
void optimize_pinning_in_thread_pool(log::Logger *logger_ptr, const std::vector< util::Thread * > &threads_in_pool, bool est_hw_core_sharing_helps_algo, bool est_hw_core_pinning_helps_algo, bool hw_threads_is_grouping_collated, Error_code *err_code=nullptr)
Assuming the same situation as documented for optimal_worker_thread_count_per_pool(),...
void reset_thread_pinning(log::Logger *logger_ptr=nullptr, util::Thread *thread_else_ours=nullptr, Error_code *err_code=nullptr)
Resets the processor-affinity of the given thread – or calling thread – to be managed as the OS deems...
unsigned int optimal_worker_thread_count_per_pool(log::Logger *logger_ptr, bool est_hw_core_sharing_helps_algo)
Assuming a planned thread pool will be receiving ~symmetrical load, and its UX-affecting (in particul...
bool exec_void_and_throw_on_error(const Func &func, Error_code *err_code, util::String_view context)
Equivalent of exec_and_throw_on_error() for operations with void return type.
std::string ostream_op_string(T const &... ostream_args)
Equivalent to ostream_op_to_string() but returns a new string by value instead of writing to the call...
boost::thread Thread
Short-hand for standard thread class.
boost::system::error_code Error_code
Short-hand for a boost.system error code (which basically encapsulates an integer/enum error code and...