Flow 1.0.0
Flow project: Full implementation reference.
util.hpp
Go to the documentation of this file.
1/* Flow
2 * Copyright 2023 Akamai Technologies, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the
5 * "License"); you may not use this file except in
6 * compliance with the License. You may obtain a copy
7 * of the License at
8 *
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in
12 * writing, software distributed under the License is
13 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
14 * CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing
16 * permissions and limitations under the License. */
17
18/// @file
19#pragma once
20
22
23namespace flow::async
24{
25
26// Free functions: in *_fwd.hpp.
27
28// Template implementations.
29
30template<typename Execution_context>
31void asio_exec_ctx_post(log::Logger* logger_ptr, Execution_context* exec_ctx, Synchronicity synchronicity, Task&& task)
32{
33 using log::Sev;
34 using boost::asio::post;
35 using boost::asio::dispatch;
36 using boost::promise;
37
38 FLOW_LOG_SET_CONTEXT(logger_ptr, Flow_log_component::S_ASYNC);
39
40 /* As noted in contract, Execution_context is either Task_engine or Strand. Since a Strand is born of a Task_engine,
41 * everything that would apply to its parent Task_engine applies to the Strand, so we can talk about exclusively
42 * Task_engines in below comments to explain the reasoning. Strand only adds an extra constraint, namely that
43 * the chosen thread is not executing something from the same Strand at the time that F() eventually runs. This is
44 * typically irrelevant to the below logic; when it is relevant we will explicitly discuss the possibility that
45 * *exec_ctx is a Strand after all.
46 *
47 * Lastly, as the contract notes, this all quite likely works with the general ExecutionContext formal concept
48 * from boost.asio docs; but we have made no formal effort to ensure this in our comments/reasoning. This may be
49 * corrected as needed in the future.
50 *
51 * Let us now discuss the asio_exec_ctx_post(E, synchronicity, F) call, where E is a Task_engine.
52 *
53 * We post to some random-ish thread in E, which is run()ning in at least 1 thread, using built-in Task_engine
54 * (boost.asio) functionality wherein: post(E, F) will execute F() within any thread currently executing E.run(),
55 * for a given Task_engine E.
56 *
57 * post(E, F) is indeed used in the most common synchronicity mode, S_ASYNC, and in its
58 * close cousin S_ASYNC_AND_AWAIT_CONCURRENT_COMPLETION. If calling thread is outside pool E, then
59 * F() will run sometime in the future concurrently to the calling thread; including to our code in this method.
60 * If calling thread is in E's pool (we're in in a task being called by E.run() now), then F()
61 * will run sometime after we return.
62 *
63 * However, S_ASYNC_AND_AWAIT_CONCURRENT_COMPLETION will additionally cause us to await F()'s completion before
64 * we return control to caller. If we're inside pool E, this might cause a deadlock (and *definitely* will
65 * if n_threads() == 1), because we'll wait to return 'til a condition that can only become true after we return
66 * (unless by luck the post() targets a different thread in the pool). Hence we advertise undefined behavior if the
67 * user makes the improper choice to use this mode from inside an E-posted task.
68 *
69 * [Sanity check: What if E is a Strand after all (from some Task_engine E*), and F() is currently executing from
70 * within E*'s pool after all... but also E* happens to have itself been posted through some *other* Strand
71 * E#? Would that preclude the deadlock situation, since seemingly being in different Strands would prevent
72 * boost.asio from posting F() onto W (current thread)? Answer: No; just because it can't put it on the current
73 * thread *now* doesn't mean it can't queue it to run on W after we return... and we're back to deadlock. So,
74 * the contract stands without any further allowances due to E's Strandiness.]
75 *
76 * S_ASYNC_AND_AWAIT_CONCURRENT_START is similar to ..._COMPLETION; but it returns "early": just before F()
77 * executes (in such a way as to guarantee E.stop(), when E is a Task_engine, executed after we return will be unable
78 * to prevent F() from running). The same deadlock situation applies: we can't wait for F() to start, if it can only
79 * start once we're done waiting.
80 *
81 * Lastly: S_OPPORTUNISTIC_SYNC_ELSE_ASYNC is much like S_ASYNC, except dispatch(E, F) is used instead of post(E, F).
82 * If post(E, F) would have caused F() to execute in the current thread anyway as soon as we return, then F() is
83 * executed synchronously, now, and *then* we will return. (This is a performance shortcut, so that boost.asio
84 * won't pointlessly exit us and immediately re-enter F().) If, however, post(E, F) would have caused F() to execute
85 * on some other thread and/or later, then dispatch(E, F) acts identically to post(E, F) after all, meaning it'll
86 * execute in the background concurrently and/or later. (In other words, the performance shortcut is not possible
87 * due to other constraints.) */
88
89 // @todo Maybe provide operator<<(ostream, Task_engine/Strand) so we can output *exec_ctx too.
90 FLOW_LOG_TRACE("About to post task via boost.asio execution context (Task_engine or Strand as of this writing) "
91 "[@" << exec_ctx << "]; might sync-execute in this thread? = "
92 "[" << (synchronicity == Synchronicity::S_OPPORTUNISTIC_SYNC_ELSE_ASYNC) << "]; "
93 "will we ensure concurrent completion before continuing? = "
94 "[" << (synchronicity == Synchronicity::S_ASYNC_AND_AWAIT_CONCURRENT_COMPLETION) << "]; "
95 "will we ensure concurrent initiation before continuing? = "
96 "[" << (synchronicity == Synchronicity::S_ASYNC_AND_AWAIT_CONCURRENT_START) << "].");
97
98 Task actual_task(std::move(task));
99
100 /* If the current log level suggests we want TRACE logging then book-end their task with some log statements.
101 * For perf: entirely avoid this wrapping if almost certainly no logging would occur anyway. */
102 if (logger_ptr && logger_ptr->should_log(Sev::S_TRACE, get_log_component()))
103 {
104 actual_task = [get_logger, get_log_component, wrapped_task = std::move(actual_task)]()
105 {
106 FLOW_LOG_TRACE("Task starting: current processor logical core index [" << cpu_idx() << "].");
107 wrapped_task();
108 FLOW_LOG_TRACE("Task ended: current processor logical core index [" << cpu_idx() << "].");
109 };
110 }
111
112 /* Now we want to (1) post `actual_task` onto *m_shared_task_engine, so that it runs at some point; and (2)
113 * either wait or not wait for it to complete concurrently. The details are discussed above and
114 * in Synchronicity member doc headers. */
115
116 if (synchronicity == Synchronicity::S_ASYNC)
117 {
118 post(*exec_ctx, std::move(actual_task));
119 // That's it; give control back to caller. actual_task() has not run here (but may run concurrently).
120 }
121 else if (synchronicity == Synchronicity::S_OPPORTUNISTIC_SYNC_ELSE_ASYNC)
122 {
123 dispatch(*exec_ctx, std::move(actual_task));
124 // That's it; give control back to caller. actual_task() may have just run -- or else it's just like post().
125 }
126 else
127 {
128 const bool assure_finish_else_start = synchronicity == Synchronicity::S_ASYNC_AND_AWAIT_CONCURRENT_COMPLETION;
129 assert((assure_finish_else_start || (synchronicity == Synchronicity::S_ASYNC_AND_AWAIT_CONCURRENT_START))
130 && "asio_exec_ctx_post() failed to handle one of the Synchronicity enum values.");
131
132 /* In this interesting mode, by contract, we are *not* in W now. (@todo Consider finding a way to assert() this.)
133 * As noted above we still post(E, F) -- but instead of returning await its completion, or initiation; then return.
134 * A future/promise pair accomplishes the latter easily. (Historical note: I (ygoldfel) used to do the
135 * future/promise thing manually in many places. It's boiler-plate, short but a little hairy, so it's nice to
136 * give them this feature instead. Also some might not know about promise/future and try to use
137 * mutex/cond_var, which is always a nightmare of potential bugs.) */
138
139 FLOW_LOG_TRACE("From outside thread pool, we post task to execute concurrently; and await its "
140 "[" << (assure_finish_else_start ? "completion" : "initiation") << "]. "
141 "A deadlock here implies bug, namely that we are *not* outside thread pool now after all.");
142
143 promise<void> wait_done_promise;
144
145 if (assure_finish_else_start)
146 {
147 post(*exec_ctx, [&]() // We're gonna block until the task completes, so just keep all context by &reference.
148 {
149 actual_task();
150 wait_done_promise.set_value();
151 });
152 }
153 else
154 {
155 post(*exec_ctx, [&wait_done_promise, // Safe to capture: .get_future().wait() returns after .set_value().
156 actual_task = std::move(actual_task)] // Gotta cap it; else `actual_task` hosed while executing.
157 ()
158 {
159 wait_done_promise.set_value(); // We've started! Nothing (other than a thread killing) can prevent:
160 actual_task();
161 });
162 }
163
164 wait_done_promise.get_future().wait();
165 FLOW_LOG_TRACE("Wait for concurrent execution complete. Done.");
166
167 /* That's it; give control back to caller. actual_task() *did* run and finish but not in here; or at least
168 * it will have definitely finished eventually, even if caller tries E.stop() (if applicable) after we return. */
169 } // if (sync == S_ASYNC_AND_AWAIT_CONCURRENT_{COMPLETION|START})
170} // asio_exec_ctx_post()
171
172} // namespace flow::async
Interface that the user should implement, passing the implementing Logger into logging classes (Flow'...
Definition: log.hpp:1291
virtual bool should_log(Sev sev, const Component &component) const =0
Given attributes of a hypothetical message that would be logged, return true if that message should b...
#define FLOW_LOG_SET_CONTEXT(ARG_logger_ptr, ARG_component_payload)
For the rest of the block within which this macro is instantiated, causes all FLOW_LOG_....
Definition: log.hpp:405
#define FLOW_LOG_TRACE(ARG_stream_fragment)
Logs a TRACE message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
Definition: log.hpp:227
Flow module containing tools enabling multi-threaded event loops operating under the asynchronous-tas...
Definition: async_fwd.hpp:75
Synchronicity
Enumeration indicating the manner in which asio_exec_ctx_post(), and various boost....
Definition: async_fwd.hpp:223
@ S_ASYNC
Simply post the given task to execute asynchronously in some execution context – as soon as the conte...
@ S_ASYNC_AND_AWAIT_CONCURRENT_COMPLETION
Same as Synchronicity::S_ASYNC but the posting routine then waits as long as necessary for the given ...
@ S_ASYNC_AND_AWAIT_CONCURRENT_START
Same as Synchronicity::S_ASYNC but the posting routine then waits as long as necessary for the given ...
@ S_OPPORTUNISTIC_SYNC_ELSE_ASYNC
Execute the given task synchronously, if the scheduler determines that the calling thread is in its t...
void asio_exec_ctx_post(log::Logger *logger_ptr, Execution_context *exec_ctx, Synchronicity synchronicity, Task &&task)
An extension of boost.asio's post() and dispatch() free function templates, this free function templa...
Definition: util.hpp:31
uint16_t cpu_idx()
Returns the 0-based processor logical (not hardware) core index of the core executing the calling thr...
Definition: async.cpp:33
Sev
Enumeration containing one of several message severity levels, ordered from highest to lowest.
Definition: log_fwd.hpp:224