Flow 1.0.2
Flow project: Full implementation reference.
sched_task.hpp
Go to the documentation of this file.
1/* Flow
2 * Copyright 2023 Akamai Technologies, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the
5 * "License"); you may not use this file except in
6 * compliance with the License. You may obtain a copy
7 * of the License at
8 *
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in
12 * writing, software distributed under the License is
13 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
14 * CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing
16 * permissions and limitations under the License. */
17
18/// @file
19#pragma once
20
23#include "flow/log/log.hpp"
24#include "flow/error/error.hpp"
25
26namespace flow::util
27{
28
29// Free functions: in *_fwd.hpp.
30
31// Template implementations.
32
33template<typename Scheduled_task_handler>
35 const Fine_duration& from_now, bool single_threaded,
36 Task_engine* task_engine,
37 Scheduled_task_handler&& task_body_moved)
38{
39 using boost::chrono::round;
40 using boost::chrono::milliseconds;
42 using boost::asio::bind_executor;
43 using boost::asio::get_associated_executor;
44 using boost::asio::post;
45
46 FLOW_LOG_SET_CONTEXT(logger_ptr, Flow_log_component::S_UTIL);
47
48 // See schedule_task_from_now() doc header for philosophical intro to what we are doing here: the context. Then read:
49
50 const auto task_id = Unique_id_holder::create_unique_id();
51
52 /* Shove all the state here, which is a ref-counted pointer around this simple struct.
53 * Because we copy that ref-counted pointer into any lambda/closure we schedule, the state -- including the Timer
54 * itself! -- will continue existing until none of *our* code needs it, at which point ref-count=0, and it is all
55 * destroyed automatically.
56 *
57 * In addition, though, we return (in opaque fashion) a copy of this pointer in the return value of the present
58 * function. The life-time can thus be extended by the user (not that they need to worry about that internal
59 * detail). This is all trivially safe -- it exists until it doesn't need to, automatically -- but is there a
60 * potential for wasted memory? Answer: Not really; in addition to the struct being fairly light-weight, there's no
61 * reason for user to keep a copy of this handle once the task has fired, or they've had to cancel it.
62 *
63 * ### IMPORTANT SUBTLETY REGARDING THE SAVED LAMBDA IN task->m_body (LAST ARG TO CTOR) ###
64 * Why save a wrapper function around task_body_moved (could just set m_body = task_body_moved), and
65 * why not just execute m_body(<true or false>) synchronously from within the async_wait() handler below or
66 * the handler post()ed by scheduled_task_short_fire(), and moreover why the strange-looking
67 * get_associated_executor()/bind_executor() thing (could just post() task_body)? Answer:
68 *
69 * For that matter, every other API as of this writing seems to take a concrete type, a Function<>, for callbacks --
70 * avoiding templates -- but we take a template-param-typed arg. Why? Answer(s):
71 *
72 * schedule_task_from_now() is, essentially, a boost.asio extension that is akin to its various async_*() methods
73 * and free functions (like Timer::async_wait() itself), in that it takes a completion handler to execute async later.
74 * As such there is the expectation that, if it's associated with a `strand`, it will execute from within that strand
75 * (meaning non-concurrently with any other handlers also so-associated). (More generally, a strand is an "executor",
76 * and one expects it to be executed through that "executor." See boost.asio docs, not that they're super-readable,
77 * especially the link below.)
78 *
79 * Long story short, if they pass in X = boost::asio::bind_executor(some_strand, some_func), where some_func()
80 * takes 1+ args, then to execute it on *task_engine we must do post(*task_engine, F), where F() takes no args
81 * and invokes X(args...) in its { body }. An easy mistake, and bug, is to simply call X(args...).
82 * Per https://www.boost.org/doc/libs/1_77_0/doc/html/boost_asio/overview/core/strands.html
83 * and https://www.boost.org/doc/libs/1_77_0/doc/html/boost_asio/reference/post.html, post(*task_engine, F) picks up
84 * the association with some_strand via get_associated_executor() machinery, internally. If we simply call
85 * X(args...), then that association is ignored. Therefore, when we post the wrapper F(), we have to associate F()
86 * with some_strand. This has 2 implications:
87 * - We have to post() the wrapper around X(), and we must re-bind that wrapper with X()'s associated executor.
88 * This is done via bind_executor(get_associated_executor(X), F).
89 * - Even more subtly, if we took task_body_moved as a Function<> (Scheduled_task) and not a template arg
90 * Scheduled_task_handler, then their X will be beautifully auto-converted into a wrapping Function<>()...
91 * thus making get_associated_executor(Function<>(X)) return the default thing, and not some_strand.
92 * Hence we must carefully take their *actual* function object of indeterminate type, and extract the
93 * associated executor from *that*, not from Function<>(X).
94 *
95 * The rest flows from that. We post() a very simple wrapper (with a bit of logging) through the same executor
96 * (if any; if it's the default one then it still works) as the handler object they passed in. I (ygoldfel) have
97 * written so much text only because this is an easy mistake to make, and I have made it, causing a bug: it still
98 * compiles and works... but not through the strand. Tip: To directly check whether it worked,
99 * `some_strand.running_in_this_thread()` must return true inside some_func(). Beats trying to force a thread-unsafe
100 * situation and try to be satisfied when it doesn't occur (which is luck-based).
101 *
102 * P.S. I have verified, both in theory and in practice, that
103 * `post(some_task_engine, bind_executor(get_associated_executor(F), F)` still executes through some_task_engine,
104 * even when F is a vanilla lambda/function and not associated with any strand. The theoretical reason it works
105 * can be found in the post() doc link above, where it explains how precisely it invokes F() (both
106 * some_task_engine, itself an executor, and g_a_e(F) are involved -- it works out; details omitted here).
107 * The reason one might worry is that g_a_e(F), where F is non-strandy/vanilla, yields system_executor, which -- if
108 * post()ed onto *directly* -- uses some unspecified thread/pool. But together with post(some_task_engine),
109 * it does work.
110 *
111 * P.P.S. Why not execute the async_wait()'s completion handler below through g_a_e(task_body_moved)? 1, we have
112 * our own algorithm for avoiding thread-unsafety and want to use *task_engine directly; it is less entropy-laden
113 * to apply task_body_moved's associated executor only to task_body_moved itself (plus a bit of bracketing logging).
114 * Whatever executor they associated task_body_moved with -- perhaps a strand, or something else -- has unknown
115 * properties, and we have our own algorithm.
116 * 2, then scheduled_task_short_fire() can just reuse task->m_body which already takes care of the
117 * properly-executor-associated post()ing of task_body_moved.
118 */
119 auto task = boost::make_shared<Scheduled_task_handle_state>
120 (task_id, single_threaded, task_engine, [task_id, get_logger, get_log_component,
121 task_engine,
122 task_body = std::move(task_body_moved)]
123 (bool short_fire) mutable
124 {
125 // Not safe to rely on L->R arg evaluation below; get this 1st, when we know task_body hasn't been move()d.
126 const auto executor = get_associated_executor(task_body); // Usually system_executor (vanilla) or a strand.
127 post(*task_engine,
128 bind_executor(executor, [get_logger, get_log_component, task_id,
129 task_body = std::move(task_body), short_fire]()
130 {
131 FLOW_LOG_TRACE("Scheduled task [" << task_id <<"]: Body starting; short-fired? = [" << short_fire << "].");
132 task_body(short_fire); // NOTE: Might throw.
133 FLOW_LOG_TRACE("Scheduled task [" << task_id <<"]: Body finished without throwing exception.");
134 }));
135 });
136 // task_body_moved is now potentially destroyed after move().
137
138 // To be clear: If we want user's handler executed (below, or when short-firing) -- just call task->m_body(...).
139
140 auto& timer = task->m_timer;
141
142 FLOW_LOG_TRACE("Scheduling task [" << task->m_id << "] "
143 "to fire in [" << round<milliseconds>(from_now) << "]; "
144 "task engine [" << static_cast<void*>(task_engine) << "]; "
145 "single-threaded optimizations applicable? = [" << single_threaded << "].");
146
147 timer.expires_after(from_now);
148
149 /* We've prepared it all -- now kick off the state machine by scheduling an actual thing to run at the chosen time.
150 * From this point on, unless single_threaded, the mutex must be used when accessing m_timer, m_canceled, m_ired.
151 *
152 * We don't schedule user-supplied body itself but rather a wrapper lambda around that body. This allows us to
153 * avoid various corner cases and annoyances of Timer -- as explained at length in the
154 * present function's doc header. */
155 timer.async_wait([get_logger, get_log_component, task](const Error_code& sys_err_code)
156 {
157 const bool single_threaded = !task->m_mutex_unless_single_threaded;
158
159 /* operation_aborted only occurs due to m_timer.cancel(). Under our semantics, the only way the latter
160 * happens is if WE INTERNALLY do that, and the only reason we do that is if user did
161 * scheduled_task_cancel() or scheduled_task_short_fire().
162 * - cancel case: They've set m_canceled = true, so below code will detect it and return without m_body() call.
163 * - short-fire case: They've set m_fired = true and posted m_body() call manually, so below code will similarly
164 * detect it and return without m_body() call.
165 *
166 * Hence, operation_aborted is indeed possible, but we use m_fired and m_canceled to clearly record why it
167 * happened. So, we simply allow operation_aborted but subsequently ignore whether it happened. (The most we
168 * could do is a sanity-check that if operation_aborted then m_fired or m_canceled must be true.) */
169
170#ifndef NDEBUG // The aforementioned sanity check.
171 if (sys_err_code == boost::asio::error::operation_aborted)
172 {
173 if (single_threaded)
174 {
175 assert(task->m_fired != task->m_canceled); // I.e., exactly one of them must be true.
176 }
177 else
178 {
179 Lock_guard<Mutex_non_recursive> lock(*task->m_mutex_unless_single_threaded);
180 assert(task->m_fired != task->m_canceled);
181 }
182 }
183#endif
184
185 if (sys_err_code && (sys_err_code != boost::asio::error::operation_aborted)) // As noted, ignore _aborted case now.
186 {
188 FLOW_LOG_WARNING("Timer system error; just logged; totally unexpected; pretending it fired normally.");
189 /* Again, this is totally unexpected. Could throw here, but pretending it fired seems less explosive and
190 * entropy-laden and might work out somehow. Regardless, this is no-man's-land and a best effort. */
191 }
192
193 /* OK, it fired legitimately -- but if we marked it as already short-fired or canceled then we just NOOP.
194 * This helper checks whether we should NOOP or actually run m_body(). Pre-condition: Thread-safe to touch *task.
195 * It's a function due to the single_threaded dichotomy just below. Otherwise it'd just be direct inline code. */
196 auto should_fire = [&]() -> bool // [&] meaning this can only be called synchronously within current function.
197 {
198 if (task->m_fired)
199 {
200 FLOW_LOG_TRACE("Scheduled task [" << task->m_id << "] native timer fired (could be due to cancel() call), "
201 "but user handler already began executing earlier. Done.");
202 return false;
203 }
204 // else
205 if (task->m_canceled)
206 {
207 FLOW_LOG_TRACE("Scheduled task [" << task->m_id << "] native timer fired (could be due to cancel() call), "
208 "but task was already canceled earlier. Done.");
209 return false;
210 }
211 // else
212
213 FLOW_LOG_TRACE("Scheduled task [" << task->m_id << "] native timer fired (could be due to cancel() call), "
214 "and nothing has fired or canceled task already. Proceeding with task (through post()).");
215
216 task->m_fired = true;
217 return true;
218 }; // should_fire =
219
220 bool noop;
221 if (single_threaded) // Single-threaded => pre-condition is trivially true. Else guarantee the pre-condition.
222 {
223 noop = !should_fire();
224 }
225 else
226 {
227 Lock_guard<Mutex_non_recursive> lock(*task->m_mutex_unless_single_threaded);
228 noop = !should_fire();
229 }
230
231 if (noop)
232 {
233 return;
234 }
235 // else
236
237 (task->m_body)(false); // false <=> Regular firing; did not short-circuit.
238 FLOW_LOG_TRACE("Scheduled task [" << task->m_id <<"]: Body-post()ing function finished.");
239 }); // async_wait()
240
241 return task; // If they save a copy of this (which is optional) they can call scheduled_task_cancel(), etc., on it.
242} // schedule_task_from_now()
243
244template<typename Scheduled_task_handler>
246 const Fine_time_pt& at, bool single_threaded,
247 Task_engine* task_engine,
248 Scheduled_task_handler&& task_body_moved)
249{
250 /* The core object, Timer m_timer, has expires_at() and expires_from_now() available. As a black box, perhaps it'd
251 * be best for us to call those respectively from schedule_task_at() and schedule_task_from_now(). However,
252 * certain boost.asio docs suggest that ultimately Timer is built around the "fire T period after now" routine,
253 * so it would ultimately just subtract Fine_clock::now() anyway even in expires_at(). So, for simpler code, we just
254 * do that here and no longer worry about it subsequently.
255 *
256 * This is the reason for the `### Performance note ###` in our doc header. */
257 return schedule_task_from_now<Scheduled_task_handler>(logger_ptr,
258 at - Fine_clock::now(),
259 single_threaded, task_engine, std::move(task_body_moved));
260}
261
262} // namespace flow::util
Interface that the user should implement, passing the implementing Logger into logging classes (Flow'...
Definition: log.hpp:1291
static id_t create_unique_id()
Short-hand for Unique_id_holder().unique_id(); useful when all you want is the unique integer itself.
#define FLOW_ERROR_SYS_ERROR_LOG_WARNING()
Logs a warning about the (often errno-based or from a library) error code in sys_err_code.
Definition: error.hpp:269
#define FLOW_LOG_WARNING(ARG_stream_fragment)
Logs a WARNING message into flow::log::Logger *get_logger() with flow::log::Component get_log_compone...
Definition: log.hpp:152
#define FLOW_LOG_SET_CONTEXT(ARG_logger_ptr, ARG_component_payload)
For the rest of the block within which this macro is instantiated, causes all FLOW_LOG_....
Definition: log.hpp:405
#define FLOW_LOG_TRACE(ARG_stream_fragment)
Logs a TRACE message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
Definition: log.hpp:227
Flow module containing miscellaneous general-use facilities that don't fit into any other Flow module...
Definition: basic_blob.hpp:29
boost::unique_lock< Mutex > Lock_guard
Short-hand for advanced-capability RAII lock guard for any mutex, ensuring exclusive ownership of tha...
Definition: util_fwd.hpp:265
Scheduled_task_handle schedule_task_from_now(log::Logger *logger_ptr, const Fine_duration &from_now, bool single_threaded, Task_engine *task_engine, Scheduled_task_handler &&task_body_moved)
Schedule the given function to execute in a certain amount of time: A handy wrapper around Timer (asi...
Definition: sched_task.hpp:34
boost::shared_ptr< Scheduled_task_handle_state > Scheduled_task_handle
Black-box type that represents a handle to a scheduled task as scheduled by schedule_task_at() or sch...
Scheduled_task_handle schedule_task_at(log::Logger *logger_ptr, const Fine_time_pt &at, bool single_threaded, Task_engine *task_engine, Scheduled_task_handler &&task_body_moved)
Identical to schedule_task_from_now() except the time is specified in absolute terms.
Definition: sched_task.hpp:245
boost::asio::io_service Task_engine
Short-hand for boost.asio event service, the central class of boost.asio.
Definition: util_fwd.hpp:135
boost::system::error_code Error_code
Short-hand for a boost.system error code (which basically encapsulates an integer/enum error code and...
Definition: common.hpp:503
Fine_clock::duration Fine_duration
A high-res time duration as computed from two Fine_time_pts.
Definition: common.hpp:411
Fine_clock::time_point Fine_time_pt
A high-res time point as returned by Fine_clock::now() and suitable for precise time math in general.
Definition: common.hpp:408