Flow 1.0.0
Flow project: Full implementation reference.
task_qing_thread.cpp
Go to the documentation of this file.
1/* Flow
2 * Copyright 2023 Akamai Technologies, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the
5 * "License"); you may not use this file except in
6 * compliance with the License. You may obtain a copy
7 * of the License at
8 *
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in
12 * writing, software distributed under the License is
13 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
14 * CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing
16 * permissions and limitations under the License. */
17
18/// @file
20#include "flow/log/config.hpp"
21#include <boost/asio.hpp>
22#include <boost/move/make_unique.hpp>
23#include <string>
24#include <exception>
25#include <cstdlib>
26
27namespace flow::async
28{
29
30// Static initializations.
31
33
34// Implementations.
35
37 const Task_engine_ptr& task_engine_ptr, bool own_task_engine,
38 boost::promise<void>* done_promise_else_block,
39 Task&& init_func_or_empty) :
40 flow::log::Log_context(logger_ptr, Flow_log_component::S_ASYNC),
41 m_task_engine(task_engine_ptr), // shared_ptr<> copy.
42 m_own_task_engine(own_task_engine)
43{
44 using boost::promise;
45 using boost::asio::post;
46 using boost::movelib::unique_ptr;
47 using boost::movelib::make_unique;
48 using std::exception;
49 using std::string;
50 using std::exit;
51 using util::Thread;
53 using log::Logger;
55 using Task_engine_work = Task_engine::work;
56 using Log_config = log::Config;
57
58 assert(m_task_engine);
59 string nickname(nickname_view); // We need an std::string below anyway, so copy this now.
60
61 // Some programs start tons of threads. Let's be stingy with INFO messages.
62
63 FLOW_LOG_INFO("Task_qing_thread [" << static_cast<const void*>(this) << "] with nickname [" << nickname << "] "
64 "will " << (m_own_task_engine ? "have own Task_engine, probably with concurrency-hint=1"
65 : "share Task_engine") << ": [@" << m_task_engine << "].");
66
67 /* We'll satisfy this `promise` inside the thread we are about to spawn -- as soon as it is up and run()ing which
68 * means post()s (etc.) will actually execute from that point on instead of continuing to get queued up.
69 * Then in the current thread we'll briefly await for the `promise` to be satisfied, then return from this ctor;
70 * or if they've passed in their own done_promise (done_promise_else_block not null), then they'll need to
71 * do the satisfaction-awaiting themselves.
72 *
73 * The trial-balloon post()ed task is isn't necessary; post() is formally known to work
74 * across run()ning threads starting and stopping; even with
75 * other tasks pre-queued on the Task_engine (which our doc header contract explicitly says is allowed). It's
76 * a nice trial balloon, and if there's something wrong, it'll block the constructor and make it really obvious;
77 * this has proven to be helpful for debugging/testing in my experience (ygoldfel).
78 *
79 * Update: done_promise has now been extended by a new API arg init_func_or_empty, to indicate in particular
80 * that we've executed init_func_or_empty(). So, while waiting for the subsequent post()ed trial-balloon task
81 * to complete is arguably at best a nicety, nevertheless the fact that can only occur *after* init_func_or_empty()
82 * returns satisfies a *necessity* (to wit: that we don't satisfy done_promise until init_func_or_empty() returns). */
83 unique_ptr<promise<void>> our_done_promise_or_null;
84 promise<void>& done_promise
85 = done_promise_else_block ? *done_promise_else_block
86 : *(our_done_promise_or_null = make_unique<promise<void>>());
87
88 /* As promised in our doc header apply any current verbosity override to the new thread. After all, there's no
89 * opportunity for them to set the override, since there *is* no thread until we start it just now.
90 * For now memorize it; then immediately apply it in the young thread. (If there is no override, that's just
91 * `sev_override == Sev::S_END_SENTINEL`; we need not even track it as a special case.) */
92 const auto sev_override = *(Log_config::this_thread_verbosity_override());
93
94 m_worker_thread.reset(new Thread([this, // Valid throughout thread { body }.
95 sev_override,
96 nickname = std::move(nickname), // Valid throughout thread { body }.
97 init_func_or_empty = std::move(init_func_or_empty),
98 &done_promise]() // Valid until post({ body }) .set_value()s it.
99 {
100 auto const logger_ptr = get_logger();
101
102 // Before anything (at all) can log, begin the section where logging is limited to sev_override.
103 {
104 const auto sev_override_auto = Log_config::this_thread_verbosity_override_auto(sev_override);
105
106 // Now do the pre-loop work.
107
108 Logger::this_thread_set_logged_nickname(nickname, logger_ptr); // This INFO-logs a nice message.
109
110 // Use standard beautified formatting for chrono durations/etc. output (and conversely input).
112
113 FLOW_LOG_TRACE("Worker thread starting.");
114
115 if (!init_func_or_empty.empty())
116 {
117 FLOW_LOG_TRACE("Thread initializer callback executing synchronously.");
118 init_func_or_empty();
119 // This is INFO, as it may be important to separate between post()ed tasks and the initial setup.
120 FLOW_LOG_INFO("Thread initializer callback execution finished. Continuing event loop setup in this thread.");
121 }
122
123 // Pre-queue this before run().
124 post(*m_task_engine, [this,
125 &done_promise]() // done_promise is (by promise/future's nature) alive until .set_value().
126 {
128 {
129 FLOW_LOG_TRACE("First task from Task_qing_thread [" << this << "] executing "
130 "in that thread since not sharing Task_engine object with more threads.");
131 }
132 else
133 {
134 FLOW_LOG_TRACE("First task from Task_qing_thread [" << this << "] executing. "
135 "May or may not be in that thread since sharing Task_engine object with more threads.");
136 }
137
138 // Safe to say stuff can be post()ed, possibly to us, as we were post()ed and are executing indeed.
139 done_promise.set_value();
140 FLOW_LOG_TRACE("First task complete.");
141 });
142 /* If not sharing Task_engine, above will run soon from this new thread, from within the following ->run().
143 * Otherwise it might run from any thread in pool of threads sharing Task_engine; possibly right now concurrently.
144 * Even then, though, subsequent post()s (etc.) might land into this new thread as soon as the following call
145 * starts running in earnest. */
146 } // const auto sev_override_auto = // Restore logging to normal (how it normally is at thread start).
147
148 // @todo boost::asio::io_context::work is deprecated; use its replacement (see io_context boost.asio doc page).
149 Task_engine_work avoid_task_engine_stop(*m_task_engine); // Avoid loop, thread exiting when no pending tasks remain.
150
151 // Block -- wait for tasks to be posted on this thread's (possibly shared with other threads) Task_engine.
152 m_task_engine->run();
153
154 // Lastly, after everything -- as promised -- re-apply the same Sev override as during startup, for any last msgs.
155 {
156 const auto sev_override_auto = Log_config::this_thread_verbosity_override_auto(sev_override);
157
158 // Now do the post-loop work.
159
160 /* m_task_engine->run() must have exited to have reached here. The only reason this can happen, assuming no
161 * misuse of thread API(s?) by the user, is if stop() (or dtor which runs stop()) ran. Or it threw exception. */
162 FLOW_LOG_INFO("Event loop finished: Task_qing_thread [" << this << "] presumably being stopped! "
163 "Thread exit imminent."); // This is good to keep as INFO.
164 } // const auto sev_override_auto =
165
166 /* Let's discuss handling of various terrible things that can occur inside ->run() which is essentially the thread
167 * body, even though boost.asio is deciding when to run the various pieces of code in that thread body.
168 *
169 * Firstly conditions like SEGV (seg fault -- bad ptr dereference, etc.), FPE (division by zero, etc.),
170 * BUS (like seg fault but with code), etc., may occur and raise a signal via system functionality. That is none
171 * of our concern; to the extent that we could trap such things and do something smart it would apply to threads
172 * not handled by us; so it's not within our purview.
173 *
174 * Secondly an assert() could fail (or, equivalently but much less likely, [std]::abort() could be called directly).
175 * abort() usually raises SIGABRT and, as such, anything we could do about this -- as in the preceding paragraph --
176 * would apply to all threads; so again not in our purview.
177 *
178 * Lastly an exception could be thrown. If caught (within a given task loaded onto *m_task_engine) then that's no
179 * different from handling an error return of a C-style function and wouldn't reach us and is handled within the
180 * task; none of our concern or anyone's concern; it's handled.
181 *
182 * That leaves one last condition: An exception was thrown within a task loaded onto *m_task_engine and *not*
183 * caught within that task. boost.asio shall *not* catch it; hence ->run() above will be emitting an exception.
184 * We can do one of 2 things. We can catch it; or not. If we do catch it there's no way to continue the ->run()
185 * in any fashion that would lead to anything but undefined behavior. We could ->run() again but what happens then
186 * is utterly unpredictable; it's trying to pretend an uncaught exception isn't exceptional. The next least
187 * exceptional behavior for us would be to log something about the exception and then exit the thread the same
188 * was we do above when ->run() returns gracefully (meaning event loop finished gracefully). However, then the
189 * rest of the program would think maybe the thread exited gracefully -- but it didn't -- which again is pretending
190 * an uncaught exception isn't exceptional. (@todo It's conceivable to make this an optional behavior specified
191 * by the *this creator. However even then it would absolutely not be a correct default behavior; so we must
192 * in this discussion decide what *is* correct *default* behavior.)
193 *
194 * We've now whittled down to these possible courses of action in the uncaught-exception-in-a-task scenario.
195 * - Catch it here; then:
196 * -1- Inform the rest of the program in some reasonable way and let them deal with it; e.g., execute some
197 * callback function passed to this ctor, wherein they can do what they want before the thread exits. Or:
198 * -2- Exit the entire program.
199 * -3- Do not catch it here; let it fall through.
200 *
201 * Let's for now not do -1-; not because it's a bad idea (it's a decent idea optionally), but because it doesn't
202 * provide a good default behavior that will do the right thing without the user having to worry about it.
203 *
204 * -2- can be done, basically, either via exit(<non-zero>) or via abort(). exit(<non-zero>) is not good as a
205 * default behavior: Printing the exception's .what() would be decent, but there's no guarantee this would be
206 * flushed to any actual log (e.g., log file) due to the sudden exit() not through main(). In general this wrests
207 * control of the exit() code from the program writer and arbitrarily gives it to Flow. abort(), however, is
208 * fine! After all, an uncaught exception in a thread not managed by us indeed calls abort() through
209 * std::terminate() by default (std::terminate() can also be overridden program-wide). Other than bypassing
210 * std::terminate(), then, it's the same behavior as is generally expected by default. To not bypass
211 * std::terminate() we could instead rethrow the exception.
212 *
213 * -3-, however, is even more direct. If we let it fall through, std::terminate() -- the default behavior -- would
214 * simply be in effect without our inserting ourselves into the situation. If that's all we want (and we've now
215 * made a good case for it) then doing nothing is less entropy then doing something that results in the same thing.
216 *
217 * So -3- is strictly better due to lessened entropy alone -- all else being equal. Is there anything about -2-
218 * that would make "all else" not equal? Yes: we could log .what() first before re-throwing/aborting.
219 * This, however, would come at a major cost: By 1st catching the exception we'd lose the stack trace information:
220 * any generated core would show the stack of the location here in the catch-location, not the original
221 * throw-location. Between printing the exception message and getting the full stack trace in a potential core,
222 * the latter wins in practice by far. At least, by default, we shouldn't be taking that possibility away.
223 *
224 * So that's why we do nothing.
225 *
226 * Last thought: Is there something one can do to *both* get the stack trace in a core file *and* print the
227 * exception .what()? (For that matter, with boost.backtrace and similar, it should be possible to print a stack
228 * trace to the logs as well.) The answer is yes, though it's not on us to do it. One should do such work either
229 * in std::terminate() (by using std::set_terminate()) or, arguably even better, in a global SIGABRT handler.
230 * I am only mentioning it here as opportunistic advice -- again, it's not in our purview, as shown above. */
231 })); // Thread body.
232 // `nickname`, `init_task_or_empty` may now be hosed.
233
234 if (done_promise_else_block)
235 {
236 FLOW_LOG_TRACE("Thread started -- confirmation that it is up shall be signalled to caller through a `promise`.");
237 }
238 else
239 {
240 FLOW_LOG_TRACE("Thread started -- awaiting confirmation it is up.");
241 done_promise.get_future().wait();
242 FLOW_LOG_TRACE("Confirmed thread started -- Task_qing_thread [" << this << "] ready.");
243 }
244} // Task_qing_thread::Task_qing_thread()
245
247{
248 FLOW_LOG_TRACE("Task_qing_thread [" << this << "]: Destroying object.");
249
250 stop();
251
252 // Last thing logged on our behalf -- INFO.
253 FLOW_LOG_INFO("Task_qing_thread [" << this << "]: Thread was stopped/joined just now or earlier; "
254 "destructor return imminent; Task_engine destruction *may* therefore also be imminent.");
255}
256
258{
259 /* Subtlety: stop() is to be a harmless no-op if called twice. We therefore perform an explicit check for it
260 * and return if detected. This isn't strictly necessary; Task_engine::stop() and Thread::join() do work if called
261 * again after calling them once, and they just no-op, as desired. We do it because:
262 * - It's cleaner: one less code path to have to reason about in terms of safety.
263 * - Less spurious/verbose logging.
264 * - It's a tiny bit faster.
265 *
266 * This isn't a mere corner case; the destructor calls us even if stop() was called explicitly earlier. */
267 const bool redundant = !m_worker_thread->joinable(); // A/k/a "not-a-thread" <=> "not joinable."
268
269 if (redundant)
270 {
271 FLOW_LOG_TRACE("Task_qing_thread [" << this << "]: Not waiting for worker thread "
272 "[T" << m_worker_thread->get_id() << "] to finish, as it was already stop()ped earlier.");
273 }
274 else
275 {
276 FLOW_LOG_INFO("Task_qing_thread [" << this << "]: Waiting for worker thread "
277 "[T" << m_worker_thread->get_id() << "] to finish, as it was not stop()ped earlier.");
278 }
279 // Ensure we understand properly the relationship between Task_engine::stopped() and this thread's existence.
280 assert((!m_own_task_engine) // In this case, as promised, it's up to them to stop m_task_engine at some point.
281 || (m_task_engine->stopped() == redundant)); // && m_own_task_engine)
282 if (redundant)
283 {
284 return;
285 }
286 // else OK; this is the first stop() call. Do as we promised.
287
289 {
290 m_task_engine->stop(); // Trigger our owned Task_engine to execute no futher tasks and hence exit run().
291 }
292 // else { Caller will have to do something similar before the following statement is able to return. }
293
294 m_worker_thread->join();
295 // Mirror the top of this method.
296 assert(!m_worker_thread->joinable());
297 assert((!m_own_task_engine) || m_task_engine->stopped());
298
299 FLOW_LOG_TRACE("Task_qing_thread [" << this << "]: Thread stopped/joined; stop() return imminent.");
300} // Task_qing_thread::stop()
301
303{
304 return m_task_engine;
305}
306
308{
309 return m_worker_thread.get(); // Valid until destructor.
310}
311
312} // namespace flow::async
bool m_own_task_engine
See constructor.
Task_engine_ptr m_task_engine
See task_engine().
util::Thread * raw_worker_thread()
Returns the util::Thread – a thin wrapper around the native OS thread handle – corresponding to the w...
void stop()
Blocks the calling thread until the constructor-started thread has finished; if the underlying Task_e...
boost::movelib::unique_ptr< util::Thread > m_worker_thread
Thread created in constructor. Not-a-thread after stop(); not not-a-thread before stop().
static const int S_BAD_EXIT
exit() code returned to OS in the event Task_qing_thread chooses to exit() the entire program (as of ...
Task_engine_ptr task_engine()
Returns pointer to util::Task_engine such that post()ing to it will cause the subsequent asynchronous...
Task_qing_thread(flow::log::Logger *logger_ptr, util::String_view nickname, const Task_engine_ptr &task_engine, bool own_task_engine, boost::promise< void > *done_promise_else_block=0, Task &&init_func_or_empty=Task())
Constructs object, immediately spawning new (worker) thread, memorizing a ref-counted reference to th...
~Task_qing_thread()
stop(), followed by forgetting the Task_engine returned by task_engine(); the latter action may destr...
Class used to configure the filtering and logging behavior of Loggers; its use in your custom Loggers...
Definition: config.hpp:200
Logger * get_logger() const
Returns the stored Logger pointer, particularly as many FLOW_LOG_*() macros expect.
Definition: log.cpp:224
Interface that the user should implement, passing the implementing Logger into logging classes (Flow'...
Definition: log.hpp:1291
#define FLOW_LOG_INFO(ARG_stream_fragment)
Logs an INFO message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
Definition: log.hpp:197
#define FLOW_LOG_TRACE(ARG_stream_fragment)
Logs a TRACE message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
Definition: log.hpp:227
Flow module containing tools enabling multi-threaded event loops operating under the asynchronous-tas...
Definition: async_fwd.hpp:75
@ S_ASYNC
Simply post the given task to execute asynchronously in some execution context – as soon as the conte...
boost::shared_ptr< util::Task_engine > Task_engine_ptr
Short-hand for reference-counting pointer to a mutable util::Task_engine (a/k/a boost::asio::io_servi...
Definition: async_fwd.hpp:198
void beautify_chrono_logger_this_thread(Logger *logger_ptr)
Sets certain chrono-related formatting on the given Logger in the current thread that results in a co...
Definition: log.cpp:278
boost::asio::io_service Task_engine
Short-hand for boost.asio event service, the central class of boost.asio.
Definition: util_fwd.hpp:135
Basic_string_view< char > String_view
Commonly used char-based Basic_string_view. See its doc header.
boost::thread Thread
Short-hand for standard thread class.
Definition: util_fwd.hpp:78
Catch-all namespace for the Flow project: A collection of various production-quality modules written ...
Definition: async_fwd.hpp:75
Flow_log_component
The flow::log::Component payload enumeration comprising various log components used by Flow's own int...
Definition: common.hpp:632