Flow 2.0.0
Flow project: Full implementation reference.
task_qing_thread.cpp
Go to the documentation of this file.
1/* Flow
2 * Copyright 2023 Akamai Technologies, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the
5 * "License"); you may not use this file except in
6 * compliance with the License. You may obtain a copy
7 * of the License at
8 *
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in
12 * writing, software distributed under the License is
13 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
14 * CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing
16 * permissions and limitations under the License. */
17
18/// @file
20#include "flow/log/config.hpp"
21#include <boost/asio.hpp>
22#include <boost/move/make_unique.hpp>
23#include <boost/asio/executor_work_guard.hpp>
24#include <string>
25#include <exception>
26#include <cstdlib>
27
28namespace flow::async
29{
30
31// Static initializations.
32
34
35// Implementations.
36
38 const Task_engine_ptr& task_engine_ptr, bool own_task_engine,
39 boost::promise<void>* done_promise_else_block,
40 Task&& init_func_or_empty) :
41 flow::log::Log_context(logger_ptr, Flow_log_component::S_ASYNC),
42 m_task_engine(task_engine_ptr), // shared_ptr<> copy.
43 m_own_task_engine(own_task_engine)
44{
45 using boost::promise;
46 using boost::asio::post;
47 using boost::asio::make_work_guard;
48 using boost::movelib::unique_ptr;
49 using boost::movelib::make_unique;
50 using std::exception;
51 using std::string;
52 using std::exit;
53 using util::Thread;
55 using log::Logger;
57 using Task_engine_work = boost::asio::executor_work_guard<Task_engine::executor_type>;
58 using Log_config = log::Config;
59
60 assert(m_task_engine);
61 string nickname(nickname_view); // We need an std::string below anyway, so copy this now.
62
63 // Some programs start tons of threads. Let's be stingy with INFO messages.
64
65 FLOW_LOG_INFO("Task_qing_thread [" << static_cast<const void*>(this) << "] with nickname [" << nickname << "] "
66 "will " << (m_own_task_engine ? "have own Task_engine, probably with concurrency-hint=1"
67 : "share Task_engine") << ": [@" << m_task_engine << "].");
68
69 /* We'll satisfy this `promise` inside the thread we are about to spawn -- as soon as it is up and run()ing which
70 * means post()s (etc.) will actually execute from that point on instead of continuing to get queued up.
71 * Then in the current thread we'll briefly await for the `promise` to be satisfied, then return from this ctor;
72 * or if they've passed in their own done_promise (done_promise_else_block not null), then they'll need to
73 * do the satisfaction-awaiting themselves.
74 *
75 * The trial-balloon post()ed task is isn't necessary; post() is formally known to work
76 * across run()ning threads starting and stopping; even with
77 * other tasks pre-queued on the Task_engine (which our doc header contract explicitly says is allowed). It's
78 * a nice trial balloon, and if there's something wrong, it'll block the constructor and make it really obvious;
79 * this has proven to be helpful for debugging/testing in my experience (ygoldfel).
80 *
81 * Update: done_promise has now been extended by a new API arg init_func_or_empty, to indicate in particular
82 * that we've executed init_func_or_empty(). So, while waiting for the subsequent post()ed trial-balloon task
83 * to complete is arguably at best a nicety, nevertheless the fact that can only occur *after* init_func_or_empty()
84 * returns satisfies a *necessity* (to wit: that we don't satisfy done_promise until init_func_or_empty() returns). */
85 unique_ptr<promise<void>> our_done_promise_or_null;
86 promise<void>& done_promise
87 = done_promise_else_block ? *done_promise_else_block
88 : *(our_done_promise_or_null = make_unique<promise<void>>());
89
90 /* As promised in our doc header apply any current verbosity override to the new thread. After all, there's no
91 * opportunity for them to set the override, since there *is* no thread until we start it just now.
92 * For now memorize it; then immediately apply it in the young thread. (If there is no override, that's just
93 * `sev_override == Sev::S_END_SENTINEL`; we need not even track it as a special case.) */
94 const auto sev_override = *(Log_config::this_thread_verbosity_override());
95
96 m_worker_thread.reset(new Thread([this, // Valid throughout thread { body }.
97 sev_override,
98 nickname = std::move(nickname), // Valid throughout thread { body }.
99 init_func_or_empty = std::move(init_func_or_empty),
100 &done_promise]() // Valid until post({ body }) .set_value()s it.
101 {
102 auto const logger_ptr = get_logger();
103
104 // Before anything (at all) can log, begin the section where logging is limited to sev_override.
105 {
106 const auto sev_override_auto = Log_config::this_thread_verbosity_override_auto(sev_override);
107
108 // Now do the pre-loop work.
109
110 Logger::this_thread_set_logged_nickname(nickname, logger_ptr); // This INFO-logs a nice message.
111
112 // Use standard beautified formatting for chrono durations/etc. output (and conversely input).
114
115 FLOW_LOG_TRACE("Worker thread starting.");
116
117 if (!init_func_or_empty.empty())
118 {
119 FLOW_LOG_TRACE("Thread initializer callback executing synchronously.");
120 init_func_or_empty();
121 // This is INFO, as it may be important to separate between post()ed tasks and the initial setup.
122 FLOW_LOG_INFO("Thread initializer callback execution finished. Continuing event loop setup in this thread.");
123 }
124
125 // Pre-queue this before run().
126 post(*m_task_engine, [this,
127 &done_promise]() // done_promise is (by promise/future's nature) alive until .set_value().
128 {
130 {
131 FLOW_LOG_TRACE("First task from Task_qing_thread [" << this << "] executing "
132 "in that thread since not sharing Task_engine object with more threads.");
133 }
134 else
135 {
136 FLOW_LOG_TRACE("First task from Task_qing_thread [" << this << "] executing. "
137 "May or may not be in that thread since sharing Task_engine object with more threads.");
138 }
139
140 // Safe to say stuff can be post()ed, possibly to us, as we were post()ed and are executing indeed.
141 done_promise.set_value();
142 FLOW_LOG_TRACE("First task complete.");
143 });
144 /* If not sharing Task_engine, above will run soon from this new thread, from within the following ->run().
145 * Otherwise it might run from any thread in pool of threads sharing Task_engine; possibly right now concurrently.
146 * Even then, though, subsequent post()s (etc.) might land into this new thread as soon as the following call
147 * starts running in earnest. */
148 } // const auto sev_override_auto = // Restore logging to normal (how it normally is at thread start).
149
150 // Avoid loop, thread exiting when no pending tasks remain.
151 Task_engine_work avoid_task_engine_stop(make_work_guard(*m_task_engine));
152
153 // Block -- wait for tasks to be posted on this thread's (possibly shared with other threads) Task_engine.
154 m_task_engine->run();
155
156 // Lastly, after everything -- as promised -- re-apply the same Sev override as during startup, for any last msgs.
157 {
158 const auto sev_override_auto = Log_config::this_thread_verbosity_override_auto(sev_override);
159
160 // Now do the post-loop work.
161
162 /* m_task_engine->run() must have exited to have reached here. The only reason this can happen, assuming no
163 * misuse of thread API(s?) by the user, is if stop() (or dtor which runs stop()) ran. Or it threw exception. */
164 FLOW_LOG_INFO("Event loop finished: Task_qing_thread [" << this << "] presumably being stopped! "
165 "Thread exit imminent."); // This is good to keep as INFO.
166 } // const auto sev_override_auto =
167
168 /* Let's discuss handling of various terrible things that can occur inside ->run() which is essentially the thread
169 * body, even though boost.asio is deciding when to run the various pieces of code in that thread body.
170 *
171 * Firstly conditions like SEGV (seg fault -- bad ptr dereference, etc.), FPE (division by zero, etc.),
172 * BUS (like seg fault but with code), etc., may occur and raise a signal via system functionality. That is none
173 * of our concern; to the extent that we could trap such things and do something smart it would apply to threads
174 * not handled by us; so it's not within our purview.
175 *
176 * Secondly an assert() could fail (or, equivalently but much less likely, [std]::abort() could be called directly).
177 * abort() usually raises SIGABRT and, as such, anything we could do about this -- as in the preceding paragraph --
178 * would apply to all threads; so again not in our purview.
179 *
180 * Lastly an exception could be thrown. If caught (within a given task loaded onto *m_task_engine) then that's no
181 * different from handling an error return of a C-style function and wouldn't reach us and is handled within the
182 * task; none of our concern or anyone's concern; it's handled.
183 *
184 * That leaves one last condition: An exception was thrown within a task loaded onto *m_task_engine and *not*
185 * caught within that task. boost.asio shall *not* catch it; hence ->run() above will be emitting an exception.
186 * We can do one of 2 things. We can catch it; or not. If we do catch it there's no way to continue the ->run()
187 * in any fashion that would lead to anything but undefined behavior. We could ->run() again but what happens then
188 * is utterly unpredictable; it's trying to pretend an uncaught exception isn't exceptional. The next least
189 * exceptional behavior for us would be to log something about the exception and then exit the thread the same
190 * was we do above when ->run() returns gracefully (meaning event loop finished gracefully). However, then the
191 * rest of the program would think maybe the thread exited gracefully -- but it didn't -- which again is pretending
192 * an uncaught exception isn't exceptional. (@todo It's conceivable to make this an optional behavior specified
193 * by the *this creator. However even then it would absolutely not be a correct default behavior; so we must
194 * in this discussion decide what *is* correct *default* behavior.)
195 *
196 * We've now whittled down to these possible courses of action in the uncaught-exception-in-a-task scenario.
197 * - Catch it here; then:
198 * -1- Inform the rest of the program in some reasonable way and let them deal with it; e.g., execute some
199 * callback function passed to this ctor, wherein they can do what they want before the thread exits. Or:
200 * -2- Exit the entire program.
201 * -3- Do not catch it here; let it fall through.
202 *
203 * Let's for now not do -1-; not because it's a bad idea (it's a decent idea optionally), but because it doesn't
204 * provide a good default behavior that will do the right thing without the user having to worry about it.
205 *
206 * -2- can be done, basically, either via exit(<non-zero>) or via abort(). exit(<non-zero>) is not good as a
207 * default behavior: Printing the exception's .what() would be decent, but there's no guarantee this would be
208 * flushed to any actual log (e.g., log file) due to the sudden exit() not through main(). In general this wrests
209 * control of the exit() code from the program writer and arbitrarily gives it to Flow. abort(), however, is
210 * fine! After all, an uncaught exception in a thread not managed by us indeed calls abort() through
211 * std::terminate() by default (std::terminate() can also be overridden program-wide). Other than bypassing
212 * std::terminate(), then, it's the same behavior as is generally expected by default. To not bypass
213 * std::terminate() we could instead rethrow the exception.
214 *
215 * -3-, however, is even more direct. If we let it fall through, std::terminate() -- the default behavior -- would
216 * simply be in effect without our inserting ourselves into the situation. If that's all we want (and we've now
217 * made a good case for it) then doing nothing is less entropy then doing something that results in the same thing.
218 *
219 * So -3- is strictly better due to lessened entropy alone -- all else being equal. Is there anything about -2-
220 * that would make "all else" not equal? Yes: we could log .what() first before re-throwing/aborting.
221 * This, however, would come at a major cost: By 1st catching the exception we'd lose the stack trace information:
222 * any generated core would show the stack of the location here in the catch-location, not the original
223 * throw-location. Between printing the exception message and getting the full stack trace in a potential core,
224 * the latter wins in practice by far. At least, by default, we shouldn't be taking that possibility away.
225 *
226 * So that's why we do nothing.
227 *
228 * Last thought: Is there something one can do to *both* get the stack trace in a core file *and* print the
229 * exception .what()? (For that matter, with boost.backtrace and similar, it should be possible to print a stack
230 * trace to the logs as well.) The answer is yes, though it's not on us to do it. One should do such work either
231 * in std::terminate() (by using std::set_terminate()) or, arguably even better, in a global SIGABRT handler.
232 * I am only mentioning it here as opportunistic advice -- again, it's not in our purview, as shown above. */
233 })); // Thread body.
234 // `nickname`, `init_task_or_empty` may now be hosed.
235
236 if (done_promise_else_block)
237 {
238 FLOW_LOG_TRACE("Thread started -- confirmation that it is up shall be signalled to caller through a `promise`.");
239 }
240 else
241 {
242 FLOW_LOG_TRACE("Thread started -- awaiting confirmation it is up.");
243 done_promise.get_future().wait();
244 FLOW_LOG_TRACE("Confirmed thread started -- Task_qing_thread [" << this << "] ready.");
245 }
246} // Task_qing_thread::Task_qing_thread()
247
249{
250 FLOW_LOG_TRACE("Task_qing_thread [" << this << "]: Destroying object.");
251
252 stop();
253
254 // Last thing logged on our behalf -- INFO.
255 FLOW_LOG_INFO("Task_qing_thread [" << this << "]: Thread was stopped/joined just now or earlier; "
256 "destructor return imminent; Task_engine destruction *may* therefore also be imminent.");
257}
258
260{
261 /* Subtlety: stop() is to be a harmless no-op if called twice. We therefore perform an explicit check for it
262 * and return if detected. This isn't strictly necessary; Task_engine::stop() and Thread::join() do work if called
263 * again after calling them once, and they just no-op, as desired. We do it because:
264 * - It's cleaner: one less code path to have to reason about in terms of safety.
265 * - Less spurious/verbose logging.
266 * - It's a tiny bit faster.
267 *
268 * This isn't a mere corner case; the destructor calls us even if stop() was called explicitly earlier. */
269 const bool redundant = !m_worker_thread->joinable(); // A/k/a "not-a-thread" <=> "not joinable."
270
271 if (redundant)
272 {
273 FLOW_LOG_TRACE("Task_qing_thread [" << this << "]: Not waiting for worker thread "
274 "[T" << m_worker_thread->get_id() << "] to finish, as it was already stop()ped earlier.");
275 }
276 else
277 {
278 FLOW_LOG_INFO("Task_qing_thread [" << this << "]: Waiting for worker thread "
279 "[T" << m_worker_thread->get_id() << "] to finish, as it was not stop()ped earlier.");
280 }
281 // Ensure we understand properly the relationship between Task_engine::stopped() and this thread's existence.
282 assert((!m_own_task_engine) // In this case, as promised, it's up to them to stop m_task_engine at some point.
283 || (m_task_engine->stopped() == redundant)); // && m_own_task_engine)
284 if (redundant)
285 {
286 return;
287 }
288 // else OK; this is the first stop() call. Do as we promised.
289
291 {
292 m_task_engine->stop(); // Trigger our owned Task_engine to execute no futher tasks and hence exit run().
293 }
294 // else { Caller will have to do something similar before the following statement is able to return. }
295
296 m_worker_thread->join();
297 // Mirror the top of this method.
298 assert(!m_worker_thread->joinable());
299 assert((!m_own_task_engine) || m_task_engine->stopped());
300
301 FLOW_LOG_TRACE("Task_qing_thread [" << this << "]: Thread stopped/joined; stop() return imminent.");
302} // Task_qing_thread::stop()
303
305{
306 return m_task_engine;
307}
308
310{
311 return m_worker_thread.get(); // Valid until destructor.
312}
313
314} // namespace flow::async
bool m_own_task_engine
See constructor.
Task_engine_ptr m_task_engine
See task_engine().
util::Thread * raw_worker_thread()
Returns the util::Thread – a thin wrapper around the native OS thread handle – corresponding to the w...
void stop()
Blocks the calling thread until the constructor-started thread has finished; if the underlying Task_e...
boost::movelib::unique_ptr< util::Thread > m_worker_thread
Thread created in constructor. Not-a-thread after stop(); not not-a-thread before stop().
static const int S_BAD_EXIT
exit() code returned to OS in the event Task_qing_thread chooses to exit() the entire program (as of ...
Task_engine_ptr task_engine()
Returns pointer to util::Task_engine such that post()ing to it will cause the subsequent asynchronous...
Task_qing_thread(flow::log::Logger *logger_ptr, util::String_view nickname, const Task_engine_ptr &task_engine, bool own_task_engine, boost::promise< void > *done_promise_else_block=0, Task &&init_func_or_empty=Task())
Constructs object, immediately spawning new (worker) thread, memorizing a ref-counted reference to th...
~Task_qing_thread()
stop(), followed by forgetting the Task_engine returned by task_engine(); the latter action may destr...
Class used to configure the filtering and logging behavior of Loggers; its use in your custom Loggers...
Definition: config.hpp:317
Logger * get_logger() const
Returns the stored Logger pointer, particularly as many FLOW_LOG_*() macros expect.
Definition: log.cpp:217
Interface that the user should implement, passing the implementing Logger into logging classes (Flow'...
Definition: log.hpp:1284
#define FLOW_LOG_INFO(ARG_stream_fragment)
Logs an INFO message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
Definition: log.hpp:197
#define FLOW_LOG_TRACE(ARG_stream_fragment)
Logs a TRACE message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
Definition: log.hpp:227
Flow module containing tools enabling multi-threaded event loops operating under the asynchronous-tas...
Definition: async_fwd.hpp:75
@ S_ASYNC
Simply post the given task to execute asynchronously in some execution context – as soon as the conte...
boost::shared_ptr< util::Task_engine > Task_engine_ptr
Short-hand for reference-counting pointer to a mutable util::Task_engine (a/k/a boost::asio::io_conte...
Definition: async_fwd.hpp:198
void beautify_chrono_logger_this_thread(Logger *logger_ptr)
Sets certain chrono-related formatting on the given Logger in the current thread that results in a co...
Definition: log.cpp:271
boost::asio::io_context Task_engine
Short-hand for boost.asio event service, the central class of boost.asio.
Definition: util_fwd.hpp:135
Basic_string_view< char > String_view
Commonly used char-based Basic_string_view. See its doc header.
boost::thread Thread
Short-hand for standard thread class.
Definition: util_fwd.hpp:78
Catch-all namespace for the Flow project: A collection of various production-quality modules written ...
Definition: async_fwd.hpp:75
Flow_log_component
The flow::log::Component payload enumeration comprising various log components used by Flow's own int...
Definition: common.hpp:638