Flow 1.0.2
Flow project: Full implementation reference.
concurrent_task_loop.cpp
Go to the documentation of this file.
1/* Flow
2 * Copyright 2023 Akamai Technologies, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the
5 * "License"); you may not use this file except in
6 * compliance with the License. You may obtain a copy
7 * of the License at
8 *
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in
12 * writing, software distributed under the License is
13 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
14 * CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing
16 * permissions and limitations under the License. */
17
18/// @file
20#include "flow/error/error.hpp"
21#ifdef FLOW_OS_MAC
22# include <mach/thread_policy.h>
23# include <mach/thread_act.h>
24#endif
25
26namespace flow::async
27{
28
29// Method implementations.
30
32
33// Free function implementations.
34
36 bool est_hw_core_sharing_helps_algo)
37{
38 using util::Thread;
39
40 FLOW_LOG_SET_CONTEXT(logger_ptr, Flow_log_component::S_ASYNC);
41
42 const unsigned int n_phys_cores = Thread::physical_concurrency();
43 const unsigned int n_logic_cores = Thread::hardware_concurrency();
44 const bool core_sharing_supported = n_phys_cores != n_logic_cores;
45
46 FLOW_LOG_INFO("System reports processor with [" << n_phys_cores << "] physical cores; and "
47 "[" << n_logic_cores << "] hardware threads a/k/a logical cores; core sharing (a/k/a "
48 "hyper-threading) is "
49 "thus [" << (core_sharing_supported ? "supported" : "unsupported") << "].");
50
51 if (!core_sharing_supported)
52 {
53 FLOW_LOG_INFO("Core sharing is [unsupported]. "
54 "Therefore suggested thread pool thread count is "
55 "simply the logical core count = [" << n_logic_cores << "].");
56 return n_logic_cores;
57 }
58 // else if (n_phys_cores != n_logic_cores)
59 if (est_hw_core_sharing_helps_algo)
60 {
61 FLOW_LOG_INFO("Application estimates this thread pool DOES benefit from 2+ hardware threads sharing physical "
62 "processor core (a/k/a hyper-threading); therefore we shall act as if there is 1 hardware thread "
63 "a/k/a logical core per physical core, even though in reality above shows it is [supported]. "
64 "Therefore suggested thread pool thread count is "
65 "simply the logical core count = [" << n_logic_cores << "].");
66 return n_logic_cores;
67 }
68 // else
69
70 FLOW_LOG_INFO("Application estimates this thread pool does NOT benefit from 2+ hardware threads sharing physical "
71 "processor core (a/k/a hyper-threading); "
72 "therefore suggested thread pool thread count is "
73 "simply the physical core count = [" << n_phys_cores << "].");
74
75 return n_phys_cores;
76} // optimal_worker_thread_count_per_pool()
77
79 const std::vector<util::Thread*>& threads_in_pool,
80 [[maybe_unused]] bool est_hw_core_sharing_helps_algo,
81 bool est_hw_core_pinning_helps_algo,
82 bool hw_threads_is_grouping_collated)
83{
84 /* There are 2 ways (known to us) to set thread-core affinity. In reality they are mutually exclusive (one is Mac,
85 * other is Linux), but conceptually they could co-exist. With the latter in mind, note the subtlety that we choose
86 * the Linux way over the Mac way, had they both been available. The Mac way doesn't rely on specifying a hardware
87 * thread index, hence it needs to make no assumptions about the semantics of which threads share which cores, and
88 * for this and related reasons it's actually superior to the Linux way. The reason we choose the inferior Linux
89 * way in that case is "thin," but it's this: per
90 * https://developer.apple.com/library/archive/releasenotes/Performance/RN-AffinityAPI
91 * the affinity tags are not shared between separate processes (except via fork() after the first affinity API call,
92 * which we probably could do if it came down to it, but it almost certainly won't). So in the rare case where
93 * it'd help performance that a "producer thread" is pinned to the same hardware core as a "consumer thread," the
94 * Linux way lets one easily do this, whereas the Mac way doesn't (except via the fork() thing). In our case,
95 * the pinning is about avoiding the NEGATIVE implications of core sharing, but there could be POSITIVE
96 * implications in some cases. So in that case it's nice to pin those to the same
97 * core which will indeed occur in the Linux algorithm below. */
98
99#if FLOW_ASYNC_HW_THREAD_AFFINITY_PTHREAD_VIA_CORE_IDX
100 using ::cpu_set_t;
101 using ::pthread_setaffinity_np;
102#elif FLOW_ASYNC_HW_THREAD_AFFINITY_MACH_VIA_POLICY_TAG
103 using ::pthread_mach_thread_np;
104 using ::thread_affinity_policy_data_t;
105 using ::thread_policy_set;
106 // using ::THREAD_AFFINITY_POLICY; // Nope; it's a #define.
107#else
108 static_assert(false, "We only know how to deal with thread-core affinities in Darwin/Mac and Linux.");
109#endif
110 using boost::system::system_category;
111 using std::runtime_error;
113 using util::Thread;
114
115 FLOW_LOG_SET_CONTEXT(logger_ptr, Flow_log_component::S_ASYNC);
116
117 if (!est_hw_core_pinning_helps_algo)
118 {
119 FLOW_LOG_INFO("Application estimates the logic in this thread pool would not benefit from pinning threads to "
120 "processor cores; therefore letting system control assignment of threads to processor cores.");
121 return;
122 }
123 // else
124 assert(est_hw_core_pinning_helps_algo);
125
126 // This mode only works if we started in this mode earlier when determing # of threads in pool. @todo assert()?
127 const auto n_pool_threads = threads_in_pool.size();
128 assert(n_pool_threads == optimal_worker_thread_count_per_pool(get_logger(), est_hw_core_sharing_helps_algo));
129
130 const auto n_logic_cores_per_pool_thread = Thread::hardware_concurrency() / n_pool_threads;
131
132 FLOW_LOG_INFO("Application estimates thread pool would benefit from pinning threads to processor cores; "
133 "will set affinities as follows below. "
134 "Thread count in pool is [" << n_pool_threads << "]; "
135 "at [" << n_logic_cores_per_pool_thread << "] logical processor cores each.");
136
137 for (unsigned int thread_idx = 0; thread_idx != n_pool_threads; ++thread_idx)
138 {
139 Thread* thread = threads_in_pool[thread_idx];
140 const auto native_pthread_thread_id = thread->native_handle();
141
142#if FLOW_ASYNC_HW_THREAD_AFFINITY_PTHREAD_VIA_CORE_IDX
143 cpu_set_t cpu_set_for_thread;
144 CPU_ZERO(&cpu_set_for_thread);
145
146 for (unsigned int logical_core_idx_given_thread_idx = 0;
147 logical_core_idx_given_thread_idx != n_logic_cores_per_pool_thread;
148 ++logical_core_idx_given_thread_idx)
149 {
150 /* (If you're confused, suggest first looking at doc header's explanation of hw_threads_is_grouping_collated.
151 * Also consider classic example configuration with 8 hardware threads, 4 physical threads, and
152 * !hw_threads_is_grouping_collated, resulting in system hardware thread indexing 01230123.
153 * Or if hw_threads_is_grouping_collated, then it's 00112233.) */
154 const unsigned int native_logical_core_id
155 = hw_threads_is_grouping_collated ? ((thread_idx * n_logic_cores_per_pool_thread)
156 + logical_core_idx_given_thread_idx)
157 : ((logical_core_idx_given_thread_idx * n_pool_threads)
158 + thread_idx);
159 FLOW_LOG_INFO("Thread [" << thread_idx << "] in pool: adding affinity for "
160 "logical core/hardware thread [" << native_logical_core_id << "].");
161
162 CPU_SET(native_logical_core_id, &cpu_set_for_thread);
163 }
164
165 const auto code = pthread_setaffinity_np(native_pthread_thread_id, sizeof(cpu_set_for_thread), &cpu_set_for_thread);
166 if (code == -1)
167 {
168 const Error_code sys_err_code(errno, system_category());
169 FLOW_ERROR_SYS_ERROR_LOG_WARNING(); // Log non-portable error.
170 throw error::Runtime_error(sys_err_code,
171 "pthread_setaffinity_np() call in optimize_pinning_in_thread_pool()");
172 }
173 // else OK!
174#elif FLOW_ASYNC_HW_THREAD_AFFINITY_MACH_VIA_POLICY_TAG
175 const unsigned int native_affinity_tag = 1 + thread_idx;
176 FLOW_LOG_INFO("Thread [" << thread_idx << "] in pool: setting Mach affinity tag [" << native_affinity_tag << "].");
177
178 flow::Error_code sys_err_code;
179 const auto native_mach_thread_id = pthread_mach_thread_np(native_pthread_thread_id);
180 if (native_pthread_thread_id == 0)
181 {
182 const Error_code sys_err_code(errno, system_category()); // As above....
184 throw error::Runtime_error(sys_err_code, "pthread_mach_thread_np() call in optimize_pinning_in_thread_pool()");
185 }
186 // else
187 FLOW_LOG_TRACE("pthread ID [" << native_pthread_thread_id << "] "
188 "<=> Mach thread ID [" << native_mach_thread_id << "].");
189
190 /* @todo CAUTION! There may be a bug here. The (Darwin-specific) documentation
191 * https://developer.apple.com/library/archive/releasenotes/Performance/RN-AffinityAPI/
192 * recommends one create the thread, then set its affinity, and only THEN run the thread.
193 * The Boost (and the derived STL equivalent) thread lib does not allow one to create a thread in suspended state;
194 * pthread_create_suspended_np() does but isn't accessible (and seemingly is a barely-documented Darwin thing
195 * not available in Linux, though the present code is Mac anyway) nor possible to somehow splice into the
196 * boost::thread ctor's execution. We can't/shouldn't abandon the thread API, so we are stuck.
197 * Now, when it *recommends* it, does it actually require it? The wording seems to imply "no"... but
198 * there is no guarantee. Empirically speaking, when trying it out, it's unclear (via cpu_idx() calls elsewhere
199 * in this file) whether Darwin's listening to us. It definitely keeps migrating the threads back and forth,
200 * which MIGHT suggest it's not working, as the doc claims setting an affinity tag would "tend" to reduce migration
201 * (but how much, and what does "tend" mean?). The to-do is to resolve this; but it is low-priority, because
202 * in reality we care about Linux only and do the Mac thing for completeness only, in this PoC.
203 *
204 * There's an official @todo in our doc header, and it refers to this text and code. */
205
206 thread_affinity_policy_data_t native_thread_policy_data{ int(native_affinity_tag) }; // Cannot be const due to API.
207 const auto code = thread_policy_set(native_mach_thread_id, THREAD_AFFINITY_POLICY,
208 // The evil cast is necessary given the Mach API design. At least they're ptrs.
209 reinterpret_cast<thread_policy_t>(&native_thread_policy_data), 1);
210 if (code != 0)
211 {
212 /* I don't know/understand Mach error code system, and there are no `man` pages as such that I can find
213 * (including on Internet) -- though brief kernel code perusal suggests fairly strongly it's not simply errno
214 * here -- so let's just save the numeric code in a general runtime error exception string; hence do not
215 * use Error_code-taking Runtime_exception as we would normally for a nice message. @todo If we wanted
216 * to we could make a whole boost.system error category for these Mach errors, etc. etc. Maybe someone has.
217 * Maybe Boost has! Who knows? We don't care about this corner case at the moment and doubtful if ever will.
218 * @todo For sure though should use error::Runtime_error here, the ctor that takes no Error_code.
219 * That ctor did not exist when the present code was written; as of this writing Flow is Linux-only.
220 * Would do it right now but lack the time to verify any changes for Mac at the moment. */
221 throw runtime_error(ostream_op_string("[MACH_KERN_RETURN_T:", code,
222 "] [thread_policy_set(THREAD_AFFINITY_POLICY) failed]"));
223 }
224 // else OK!
225#else
226 static_assert(false, "Compiler should not have reached this point; serious bug?");
227#endif
228 } // for (thread_idx in [0, n_pool_threads))
229} // optimize_pinning_in_thread_pool()
230
231} // namespace flow::async
~Concurrent_task_loop() override
Any implementing subclass's destructor shall execute stop() – see its doc header please – and then cl...
An std::runtime_error (which is an std::exception) that stores an Error_code.
Definition: error.hpp:49
Interface that the user should implement, passing the implementing Logger into logging classes (Flow'...
Definition: log.hpp:1291
#define FLOW_ERROR_SYS_ERROR_LOG_WARNING()
Logs a warning about the (often errno-based or from a library) error code in sys_err_code.
Definition: error.hpp:269
#define FLOW_LOG_INFO(ARG_stream_fragment)
Logs an INFO message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
Definition: log.hpp:197
#define FLOW_LOG_SET_CONTEXT(ARG_logger_ptr, ARG_component_payload)
For the rest of the block within which this macro is instantiated, causes all FLOW_LOG_....
Definition: log.hpp:405
#define FLOW_LOG_TRACE(ARG_stream_fragment)
Logs a TRACE message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
Definition: log.hpp:227
Flow module containing tools enabling multi-threaded event loops operating under the asynchronous-tas...
Definition: async_fwd.hpp:75
void optimize_pinning_in_thread_pool(flow::log::Logger *logger_ptr, const std::vector< util::Thread * > &threads_in_pool, bool est_hw_core_sharing_helps_algo, bool est_hw_core_pinning_helps_algo, bool hw_threads_is_grouping_collated)
Assuming the same situation as documented for optimal_worker_thread_count_per_pool(),...
unsigned int optimal_worker_thread_count_per_pool(flow::log::Logger *logger_ptr, bool est_hw_core_sharing_helps_algo)
Assuming a planned thread pool will be receiving ~symmetrical load, and its UX-affecting (in particul...
std::string ostream_op_string(T const &... ostream_args)
Equivalent to ostream_op_to_string() but returns a new string by value instead of writing to the call...
Definition: util.hpp:356
boost::thread Thread
Short-hand for standard thread class.
Definition: util_fwd.hpp:78
boost::system::error_code Error_code
Short-hand for a boost.system error code (which basically encapsulates an integer/enum error code and...
Definition: common.hpp:503