Flow 2.0.0
Flow project: Full implementation reference.
concurrent_task_loop.cpp
Go to the documentation of this file.
1/* Flow
2 * Copyright 2023 Akamai Technologies, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the
5 * "License"); you may not use this file except in
6 * compliance with the License. You may obtain a copy
7 * of the License at
8 *
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in
12 * writing, software distributed under the License is
13 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
14 * CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing
16 * permissions and limitations under the License. */
17
18/// @file
20#include "flow/error/error.hpp"
21#ifdef FLOW_OS_MAC
22# if 0 // That code is disabled at the moment (see below).
23# include <mach/thread_policy.h>
24# include <mach/thread_act.h>
25# endif
26#endif
27
28namespace flow::async
29{
30
31// Method implementations.
32
34
35// Free function implementations.
36
38 bool est_hw_core_sharing_helps_algo)
39{
40 using util::Thread;
41
42 FLOW_LOG_SET_CONTEXT(logger_ptr, Flow_log_component::S_ASYNC);
43
44 const unsigned int n_phys_cores = Thread::physical_concurrency();
45 const unsigned int n_logic_cores = Thread::hardware_concurrency();
46 const bool core_sharing_supported = n_phys_cores != n_logic_cores;
47
48 FLOW_LOG_INFO("System reports processor with [" << n_phys_cores << "] physical cores; and "
49 "[" << n_logic_cores << "] hardware threads a/k/a logical cores; core sharing (a/k/a "
50 "hyper-threading) is "
51 "thus [" << (core_sharing_supported ? "supported" : "unsupported") << "].");
52
53 if (!core_sharing_supported)
54 {
55 FLOW_LOG_INFO("Core sharing is [unsupported]. "
56 "Therefore suggested thread pool thread count is "
57 "simply the logical core count = [" << n_logic_cores << "].");
58 return n_logic_cores;
59 }
60 // else if (n_phys_cores != n_logic_cores)
61 if (est_hw_core_sharing_helps_algo)
62 {
63 FLOW_LOG_INFO("Application estimates this thread pool DOES benefit from 2+ hardware threads sharing physical "
64 "processor core (a/k/a hyper-threading); therefore we shall act as if there is 1 hardware thread "
65 "a/k/a logical core per physical core, even though in reality above shows it is [supported]. "
66 "Therefore suggested thread pool thread count is "
67 "simply the logical core count = [" << n_logic_cores << "].");
68 return n_logic_cores;
69 }
70 // else
71
72 FLOW_LOG_INFO("Application estimates this thread pool does NOT benefit from 2+ hardware threads sharing physical "
73 "processor core (a/k/a hyper-threading); "
74 "therefore suggested thread pool thread count is "
75 "simply the physical core count = [" << n_phys_cores << "].");
76
77 return n_phys_cores;
78} // optimal_worker_thread_count_per_pool()
79
81 const std::vector<util::Thread*>& threads_in_pool,
82 bool est_hw_core_sharing_helps_algo,
83 bool est_hw_core_pinning_helps_algo,
84 bool hw_threads_is_grouping_collated,
85 Error_code* err_code)
86{
88 ([&](Error_code* actual_err_code)
89 { optimize_pinning_in_thread_pool(logger_ptr, threads_in_pool, est_hw_core_sharing_helps_algo,
90 est_hw_core_pinning_helps_algo, hw_threads_is_grouping_collated,
91 actual_err_code); },
92 err_code, "flow::async::optimize_pinning_in_thread_pool()"))
93 {
94 return;
95 }
96 // else if (err_code):
97 err_code->clear();
98
99 /* There are 2 ways (known to us) to set thread-core affinity. In reality they are mutually exclusive (one is Mac,
100 * other is Linux), but conceptually they could co-exist. With the latter in mind, note the subtlety that we choose
101 * the Linux way over the Mac way, had they both been available. The Mac way doesn't rely on specifying a hardware
102 * thread index, hence it needs to make no assumptions about the semantics of which threads share which cores, and
103 * for this and related reasons it's actually superior to the Linux way. The reason we choose the inferior Linux
104 * way in that case is "thin," but it's this: per
105 * https://developer.apple.com/library/archive/releasenotes/Performance/RN-AffinityAPI
106 * the affinity tags are not shared between separate processes (except via fork() after the first affinity API call,
107 * which we probably could do if it came down to it, but it almost certainly won't). So in the rare case where
108 * it'd help performance that a "producer thread" is pinned to the same hardware core as a "consumer thread," the
109 * Linux way lets one easily do this, whereas the Mac way doesn't (except via the fork() thing). In our case,
110 * the pinning is about avoiding the NEGATIVE implications of core sharing, but there could be POSITIVE
111 * implications in some cases. So in that case it's nice to pin those to the same
112 * core which will indeed occur in the Linux algorithm below. */
113
115 "We only know how to deal with thread-core affinities in Darwin/Mac and Linux.");
116
117 using boost::system::system_category;
118 using std::runtime_error;
120 using util::Thread;
121
122 FLOW_LOG_SET_CONTEXT(logger_ptr, Flow_log_component::S_ASYNC);
123
124 if (!est_hw_core_pinning_helps_algo)
125 {
126 FLOW_LOG_INFO("Application estimates the logic in this thread pool would not benefit from pinning threads to "
127 "processor cores; therefore letting system control assignment of threads to processor cores.");
128 return;
129 }
130 // else
131 assert(est_hw_core_pinning_helps_algo);
132
133 // This mode only works if we started in this mode earlier when determing # of threads in pool. @todo assert()?
134 const auto n_pool_threads = threads_in_pool.size();
135 assert(n_pool_threads == optimal_worker_thread_count_per_pool(get_logger(), est_hw_core_sharing_helps_algo));
136
137 const auto n_logic_cores_per_pool_thread = Thread::hardware_concurrency() / n_pool_threads;
138
139 FLOW_LOG_INFO("Application estimates thread pool would benefit from pinning threads to processor cores; "
140 "will set affinities as follows below. "
141 "Thread count in pool is [" << n_pool_threads << "]; "
142 "at [" << n_logic_cores_per_pool_thread << "] logical processor cores each.");
143
144 for (unsigned int thread_idx = 0; thread_idx != n_pool_threads; ++thread_idx)
145 {
146 Thread* thread = threads_in_pool[thread_idx];
147 const auto native_pthread_thread_id = thread->native_handle();
148
149#if FLOW_ASYNC_HW_THREAD_AFFINITY_PTHREAD_VIA_CORE_IDX
150 using ::cpu_set_t;
151 using ::pthread_setaffinity_np;
152
153 cpu_set_t cpu_set_for_thread;
154 CPU_ZERO(&cpu_set_for_thread);
155
156 for (unsigned int logical_core_idx_given_thread_idx = 0;
157 logical_core_idx_given_thread_idx != n_logic_cores_per_pool_thread;
158 ++logical_core_idx_given_thread_idx)
159 {
160 /* (If you're confused, suggest first looking at doc header's explanation of hw_threads_is_grouping_collated.
161 * Also consider classic example configuration with 8 hardware threads, 4 physical threads, and
162 * !hw_threads_is_grouping_collated, resulting in system hardware thread indexing 01230123.
163 * Or if hw_threads_is_grouping_collated, then it's 00112233.) */
164 const unsigned int native_logical_core_id
165 = hw_threads_is_grouping_collated ? ((thread_idx * n_logic_cores_per_pool_thread)
166 + logical_core_idx_given_thread_idx)
167 : ((logical_core_idx_given_thread_idx * n_pool_threads)
168 + thread_idx);
169 FLOW_LOG_INFO("Thread [" << thread_idx << "] in pool: adding affinity for "
170 "logical core/hardware thread [" << native_logical_core_id << "].");
171
172 CPU_SET(native_logical_core_id, &cpu_set_for_thread);
173 }
174
175 const auto code = pthread_setaffinity_np(native_pthread_thread_id, sizeof(cpu_set_for_thread), &cpu_set_for_thread);
176 if (code == -1)
177 {
178 const Error_code sys_err_code{errno, system_category()};
179 FLOW_ERROR_SYS_ERROR_LOG_WARNING(); // Log non-portable error.
180 *err_code = sys_err_code;
181 return;
182 }
183 // else OK!
184#else // if FLOW_ASYNC_HW_THREAD_AFFINITY_MACH_VIA_POLICY_TAG
185
186 static_assert(false, "This strongly platform-dependent function has not been properly tested and maintained "
187 "for Darwin/Mac in a long time, as Flow has been Linux-only for many years. "
188 "There is also a likely (documented, known) bug in this impl. Please revisit when "
189 "we re-add Mac/Darwin support.")
190
191# if 0
192 /* Maintenance note: When/if re-enabling this Darwin/Mac section:
193 * - Resolve the likely bug noted in below @todo.
194 * - Update the code to generate an Error_code even on the Mach error (will need to make a new Error_code
195 * category, in flow.async; which is no big deal; we do that all the time; or if there is one out there
196 * for these Mach errors specifically, then use that).
197 * - Update the code assign to *err_code, not throw (the rest of the function now acts this way, per standard
198 * Flow semantics).
199 * - Test, test, test... Mac, ARM64... all that. */
200
201 using ::pthread_mach_thread_np;
202 using ::thread_affinity_policy_data_t;
203 using ::thread_policy_set;
204 // using ::THREAD_AFFINITY_POLICY; // Nope; it's a #define.
205
206 const unsigned int native_affinity_tag = 1 + thread_idx;
207 FLOW_LOG_INFO("Thread [" << thread_idx << "] in pool: setting Mach affinity tag [" << native_affinity_tag << "].");
208
209 Error_code sys_err_code;
210 const auto native_mach_thread_id = pthread_mach_thread_np(native_pthread_thread_id);
211 if (native_pthread_thread_id == 0)
212 {
213 const Error_code sys_err_code(errno, system_category()); // As above....
215 throw error::Runtime_error(sys_err_code, "pthread_mach_thread_np() call in optimize_pinning_in_thread_pool()");
216 }
217 // else
218 FLOW_LOG_TRACE("pthread ID [" << native_pthread_thread_id << "] "
219 "<=> Mach thread ID [" << native_mach_thread_id << "].");
220
221 /* @todo CAUTION! There may be a bug here. The (Darwin-specific) documentation
222 * https://developer.apple.com/library/archive/releasenotes/Performance/RN-AffinityAPI/
223 * recommends one create the thread, then set its affinity, and only THEN run the thread.
224 * The Boost (and the derived STL equivalent) thread lib does not allow one to create a thread in suspended state;
225 * pthread_create_suspended_np() does but isn't accessible (and seemingly is a barely-documented Darwin thing
226 * not available in Linux, though the present code is Mac anyway) nor possible to somehow splice into the
227 * boost::thread ctor's execution. We can't/shouldn't abandon the thread API, so we are stuck.
228 * Now, when it *recommends* it, does it actually require it? The wording seems to imply "no"... but
229 * there is no guarantee. Empirically speaking, when trying it out, it's unclear (via cpu_idx() calls elsewhere
230 * in this file) whether Darwin's listening to us. It definitely keeps migrating the threads back and forth,
231 * which MIGHT suggest it's not working, as the doc claims setting an affinity tag would "tend" to reduce migration
232 * (but how much, and what does "tend" mean?). The to-do is to resolve this; but it is low-priority, because
233 * in reality we care about Linux only and do the Mac thing for completeness only, in this PoC.
234 *
235 * There's an official @todo in our doc header, and it refers to this text and code. */
236
237 thread_affinity_policy_data_t native_thread_policy_data{ int(native_affinity_tag) }; // Cannot be const due to API.
238 const auto code = thread_policy_set(native_mach_thread_id, THREAD_AFFINITY_POLICY,
239 // The evil cast is necessary given the Mach API design. At least they're ptrs.
240 reinterpret_cast<thread_policy_t>(&native_thread_policy_data), 1);
241 if (code != 0)
242 {
243 /* I don't know/understand Mach error code system, and there are no `man` pages as such that I can find
244 * (including on Internet) -- though brief kernel code perusal suggests fairly strongly it's not simply errno
245 * here -- so let's just save the numeric code in a general runtime error exception string; hence do not
246 * use Error_code-taking Runtime_exception as we would normally for a nice message. @todo If we wanted
247 * to we could make a whole boost.system error category for these Mach errors, etc. etc. Maybe someone has.
248 * Maybe Boost has! Who knows? We don't care about this corner case at the moment and doubtful if ever will.
249 * @todo For sure though should use error::Runtime_error here, the ctor that takes no Error_code.
250 * That ctor did not exist when the present code was written; as of this writing Flow is Linux-only.
251 * Would do it right now but lack the time to verify any changes for Mac at the moment. */
252 throw runtime_error(ostream_op_string("[MACH_KERN_RETURN_T:", code,
253 "] [thread_policy_set(THREAD_AFFINITY_POLICY) failed]"));
254 }
255 // else OK!
256# endif // if 0
257#endif
258 } // for (thread_idx in [0, n_pool_threads))
259} // optimize_pinning_in_thread_pool()
260
261void reset_thread_pinning(log::Logger* logger_ptr, util::Thread* thread_else_ours, Error_code* err_code)
262{
264 ([&](Error_code* actual_err_code) { reset_thread_pinning(logger_ptr, thread_else_ours, actual_err_code); },
265 err_code, "flow::async::reset_thread_pinning()"))
266 {
267 return;
268 }
269 // else if (err_code):
270 err_code->clear();
271
272 using util::Thread;
273 using boost::system::system_category;
274
276 "For this function we only know how to deal with thread-core affinities in Linux.");
277
278 using ::cpu_set_t;
279 using ::pthread_setaffinity_np;
280 using ::pthread_self;
281
282 FLOW_LOG_SET_CONTEXT(logger_ptr, Flow_log_component::S_ASYNC);
283
284 const auto native_pthread_thread_id = thread_else_ours ? thread_else_ours->native_handle()
285 // `man` -v- page says, "This function always succeeds."
286 : pthread_self();
287 cpu_set_t cpu_set_for_thread;
288 CPU_ZERO(&cpu_set_for_thread);
289
290 const unsigned int n_logic_cores = Thread::hardware_concurrency();
291 for (unsigned int logic_core_idx = 0; logic_core_idx != n_logic_cores; ++logic_core_idx)
292 {
293 CPU_SET(logic_core_idx, &cpu_set_for_thread);
294 }
295
296 FLOW_LOG_INFO("Thread with native ID [" << native_pthread_thread_id << "]: resetting processor-affinity, "
297 "so that no particular core is preferred for this thread. (This may have already been the case.)");
298
299 if (pthread_setaffinity_np(native_pthread_thread_id, sizeof(cpu_set_for_thread), &cpu_set_for_thread) == -1)
300 {
301 const Error_code sys_err_code{errno, system_category()};
302 FLOW_ERROR_SYS_ERROR_LOG_WARNING(); // Log non-portable error.
303 *err_code = sys_err_code;
304 return;
305 }
306 // else OK!
307} // reset_thread_pinning()
308
309void reset_this_thread_pinning() // I know this looks odd and pointless; but see our doc header.
310{
312}
313
314} // namespace flow::async
~Concurrent_task_loop() override
Any implementing subclass's destructor shall execute stop() – see its doc header please – and then cl...
An std::runtime_error (which is an std::exception) that stores an Error_code.
Definition: error.hpp:49
Interface that the user should implement, passing the implementing Logger into logging classes (Flow'...
Definition: log.hpp:1284
#define FLOW_ASYNC_HW_THREAD_AFFINITY_PTHREAD_VIA_CORE_IDX
Macro set to 1 (else 0) if and only if natively the pthread API allows one to set thread-to-core affi...
#define FLOW_ASYNC_HW_THREAD_AFFINITY_MACH_VIA_POLICY_TAG
Macro set to 1 (else 0) if and only if natively there is Mach kernel API that allows to set thread-to...
#define FLOW_ERROR_SYS_ERROR_LOG_WARNING()
Logs a warning about the (often errno-based or from a library) error code in sys_err_code.
Definition: error.hpp:269
#define FLOW_LOG_INFO(ARG_stream_fragment)
Logs an INFO message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
Definition: log.hpp:197
#define FLOW_LOG_SET_CONTEXT(ARG_logger_ptr, ARG_component_payload)
For the rest of the block within which this macro is instantiated, causes all FLOW_LOG_....
Definition: log.hpp:405
#define FLOW_LOG_TRACE(ARG_stream_fragment)
Logs a TRACE message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
Definition: log.hpp:227
Flow module containing tools enabling multi-threaded event loops operating under the asynchronous-tas...
Definition: async_fwd.hpp:75
void reset_this_thread_pinning()
Resets processor-affinity of the calling thread; does not log; and throws on extremely unlikely syste...
void optimize_pinning_in_thread_pool(log::Logger *logger_ptr, const std::vector< util::Thread * > &threads_in_pool, bool est_hw_core_sharing_helps_algo, bool est_hw_core_pinning_helps_algo, bool hw_threads_is_grouping_collated, Error_code *err_code=nullptr)
Assuming the same situation as documented for optimal_worker_thread_count_per_pool(),...
void reset_thread_pinning(log::Logger *logger_ptr=nullptr, util::Thread *thread_else_ours=nullptr, Error_code *err_code=nullptr)
Resets the processor-affinity of the given thread – or calling thread – to be managed as the OS deems...
unsigned int optimal_worker_thread_count_per_pool(log::Logger *logger_ptr, bool est_hw_core_sharing_helps_algo)
Assuming a planned thread pool will be receiving ~symmetrical load, and its UX-affecting (in particul...
bool exec_void_and_throw_on_error(const Func &func, Error_code *err_code, util::String_view context)
Equivalent of exec_and_throw_on_error() for operations with void return type.
Definition: error.hpp:168
std::string ostream_op_string(T const &... ostream_args)
Equivalent to ostream_op_to_string() but returns a new string by value instead of writing to the call...
Definition: util.hpp:381
boost::thread Thread
Short-hand for standard thread class.
Definition: util_fwd.hpp:78
boost::system::error_code Error_code
Short-hand for a boost.system error code (which basically encapsulates an integer/enum error code and...
Definition: common.hpp:508