Flow 1.0.0
Flow project: Full implementation reference.
sched_task.cpp
Go to the documentation of this file.
1/* Flow
2 * Copyright 2023 Akamai Technologies, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the
5 * "License"); you may not use this file except in
6 * compliance with the License. You may obtain a copy
7 * of the License at
8 *
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in
12 * writing, software distributed under the License is
13 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
14 * CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing
16 * permissions and limitations under the License. */
17
18/// @file
20
21namespace flow::util
22{
23
24// Implementations.
25
27{
28 using boost::chrono::round;
29 using boost::chrono::milliseconds;
31
32 FLOW_LOG_SET_CONTEXT(logger_ptr, Flow_log_component::S_UTIL);
33
34 auto& timer = task->m_timer;
35 const bool single_threaded = !task->m_mutex_unless_single_threaded;
36
37 /* OK, they want to cancel it -- but if we marked it as already [short-]fired or canceled then we just NOOP.
38 * This helper checks whether we should NOOP or actually prevent m_body(). Pre-condition: Thread-safe to touch *task.
39 * It's a function due to the single_threaded dichotomy just below. Otherwise it'd just be direct inline code. */
40 auto cancel_if_should = [&]() -> bool // [&] meaning this can only be called synchronously within current function.
41 {
42 /* This is inside cancel_if_should() because it accesses `timer.*()` potentially concurrently with another thread:
43 * e.g., another scheduled_task_cancel() call. */
44 FLOW_LOG_TRACE("Canceling scheduled task [" << task->m_id << "] "
45 "that was set to fire in [" << round<milliseconds>(timer.expires_from_now()) << "]; "
46 "single-threaded optimizations applicable? = [" << single_threaded << "].");
47
48 if (task->m_fired)
49 {
50 FLOW_LOG_TRACE("Was going to cancel but user handler already began executing earlier. Done.");
51 return false;
52 }
53 // else
54 if (task->m_canceled)
55 {
56 FLOW_LOG_TRACE("Was going to cancel but task was already canceled earlier. Done.");
57 return false;
58 }
59
60 task->m_canceled = true;
61 // The body will never run now, as it checks m_canceled before executing m_body (see schedule_task_from_now()).
62
63 /* For cleanliness, short-circuit the scheduled function to run now with Error_code `operation_aborted`.
64 * We could also let it run harmlessly as scheduled, but this is less entropy-laden and a bit more
65 * resource-frugal, in that *task might be destroyed earlier this way. */
66 try
67 {
68 timer.cancel();
69 }
70 catch (const system_error& exc) // See similar `catch` in schedule_task_from_now().
71 {
72 const Error_code sys_err_code = exc.code();
74 // Do not re-throw: Even if our scheduled function still runs, it'll only log, as `(m_canceled == true)`.
75 }
76
77 return true;
78 }; // cancel_if_should =
79
80 bool canceled;
81 if (single_threaded)
82 {
83 canceled = cancel_if_should();
84 }
85 else
86 {
87 Lock_guard<Mutex_non_recursive> lock(*task->m_mutex_unless_single_threaded);
88 canceled = cancel_if_should();
89 }
90
91 // At this point cancel_if_should() already either did something or NOOPed. So we just inform user which one.
92 return canceled;
93} // scheduled_task_cancel()
94
96{
97 using boost::asio::post;
98 using boost::chrono::round;
99 using boost::chrono::milliseconds;
101
102 FLOW_LOG_SET_CONTEXT(logger_ptr, Flow_log_component::S_UTIL);
103
104 /* This function is very similar to scheduled_task_cancel() but different enough to copy/paste some comments
105 * for clarity at the expense of some brevity. */
106
107 auto& timer = task->m_timer;
108 const bool single_threaded = !task->m_mutex_unless_single_threaded;
109
110 /* OK, they want to fire it early -- but if we marked it as already [short-]fired or canceled then we just NOOP.
111 * This helper checks whether we should NOOP or actually prevent scheduled m_body() and run m_body now.
112 * Pre-condition: Thread-safe to touch *task.
113 * It's a function due to the single_threaded dichotomy just below. Otherwise it'd just be direct inline code. */
114 auto fire_if_should = [&]()
115 {
116 FLOW_LOG_TRACE("Short-firing scheduled task [" << task->m_id << "] "
117 "that was set to fire in [" << round<milliseconds>(timer.expires_from_now()) << "]; "
118 "single-threaded optimizations applicable? = [" << single_threaded << "].");
119
120 if (task->m_fired)
121 {
122 FLOW_LOG_TRACE("Was going to short-fire but user handler already began executing earlier. Done.");
123 return false;
124 }
125 // else
126 if (task->m_canceled)
127 {
128 FLOW_LOG_TRACE("Was going to short-fire but task was already canceled earlier. Done.");
129 return false;
130 }
131
132 task->m_fired = true;
133 // Body won't run (as scheduled) now, as it checks m_fired before executing m_body (see schedule_task_from_now()).
134
135 /* For cleanliness, short-circuit the scheduled function to run now with Error_code `operation_aborted`.
136 * We will post() the body directly, below. We could have instead done it through the operation_aborted
137 * path, and that would've also been fine; prefer this by a tiny bit because it's more tightly under our control --
138 * e.g., no need to worry about even the possibility of the officially-possible situation wherein cancel() fails to
139 * succeed, because it happened to *just* fire around the same time. Also there's a pleasing similarity between
140 * this logic and that in scheduled_task_cancel() which, in some sense, reduces entropy also. */
141 try
142 {
143 timer.cancel();
144 }
145 catch (const system_error& exc) // See similar `catch` in scheduled_task_cancel().
146 {
147 const Error_code sys_err_code = exc.code();
149 }
150
151 return true;
152 }; // fire_if_should =
153
154 bool noop;
155 if (single_threaded)
156 {
157 noop = !fire_if_should();
158 }
159 else
160 {
161 Lock_guard<Mutex_non_recursive> lock(*task->m_mutex_unless_single_threaded);
162 noop = !fire_if_should();
163 }
164
165 if (noop)
166 {
167 return false;
168 }
169 // else
170
171 /* Cool -- we've made sure m_body() won't be called as scheduled. Since we are to short-fire -- not merely cancel --
172 * it falls to us to early-execute m_body() (as rationalized above).
173 * If we'd used Timer::cancel(), Timer would do this for us instead. */
174 post(*task->m_task_engine, [get_logger, get_log_component, task]() // Capture copy of `log*` so logging just works.
175 {
176 const bool single_threaded = !task->m_mutex_unless_single_threaded;
177
178 FLOW_LOG_TRACE("Short-executing recently-short-fired scheduled task [" << task->m_id << "]; "
179 "single-threaded optimizations applicable? = [" << single_threaded << "].");
180
181#ifndef NDEBUG // Sanity-check how we got here. Other than this, all our wrapper adds is some logging.
182 if (single_threaded)
183 {
184 assert(task->m_fired && (!task->m_canceled));
185 }
186 else
187 {
188 Lock_guard<Mutex_non_recursive> lock(*task->m_mutex_unless_single_threaded);
189 assert(task->m_fired && (!task->m_canceled));
190 }
191#endif
192
193 (task->m_body)(true); // true <=> NOT regular firing; we short-circuited.
194 FLOW_LOG_TRACE("Short-fired scheduled task [" << task->m_id << "]: Body-post()ing function finished.");
195 }); // post()
196
197 return true;
198} // scheduled_task_short_fire()
199
201{
202 using boost::chrono::milliseconds;
203 using boost::chrono::round;
204
205 FLOW_LOG_SET_CONTEXT(logger_ptr, Flow_log_component::S_UTIL);
206
207 const auto do_it = [&]() -> Fine_duration
208 {
209 if (task->m_canceled)
210 {
211 FLOW_LOG_TRACE("Scheduled task [" << task->m_id <<"]: Returning special value for fires-from-now because "
212 "task canceled.");
213 return Fine_duration::max();
214 }
215 // else
216 const auto& from_now = task->m_timer.expires_from_now();
217 FLOW_LOG_TRACE("Scheduled task [" << task->m_id <<"]: Returning fires-from-now value "
218 "[" << round<milliseconds>(from_now) << "].");
219
220 return from_now;
221 };
222
223 const auto& mtx = task->m_mutex_unless_single_threaded;
224 if (mtx)
225 {
227 return do_it();
228 }
229 // else
230 return do_it();
231
232 /* Note: I haven't added a similar function that would return an absolute time point. The real argument for it
233 * is consistency, but there are some nuanced ambiguities about how/whether to return a consistent value each time;
234 * or just use now(); etc. Not worth getting into it here; but ultimately it should be trivial for the caller
235 * to just use now() themselves; or save the value at scheduling time; or.... */
236}
237
239{
240 FLOW_LOG_SET_CONTEXT(logger_ptr, Flow_log_component::S_UTIL);
241
242 const auto do_it = [&]() -> bool
243 {
244 const bool is_it = task->m_fired;
245 FLOW_LOG_TRACE("Scheduled task [" << task->m_id <<"]: Returning fired? = [" << is_it << "].");
246 return is_it;
247 };
248
249 const auto& mtx = task->m_mutex_unless_single_threaded;
250 if (mtx)
251 {
253 return do_it();
254 }
255 // else
256 return do_it();
257}
258
260{
261 FLOW_LOG_SET_CONTEXT(logger_ptr, Flow_log_component::S_UTIL);
262
263 const auto do_it = [&]() -> bool
264 {
265 const bool is_it = task->m_canceled;
266 FLOW_LOG_TRACE("Scheduled task [" << task->m_id <<"]: Returning canceled? = [" << is_it << "].");
267 return is_it;
268 };
269
270 const auto& mtx = task->m_mutex_unless_single_threaded;
271 if (mtx)
272 {
274 return do_it();
275 }
276 // else
277 return do_it();
278}
279
280} // namespace flow::util
Interface that the user should implement, passing the implementing Logger into logging classes (Flow'...
Definition: log.hpp:1291
#define FLOW_ERROR_SYS_ERROR_LOG_WARNING()
Logs a warning about the (often errno-based or from a library) error code in sys_err_code.
Definition: error.hpp:269
#define FLOW_LOG_SET_CONTEXT(ARG_logger_ptr, ARG_component_payload)
For the rest of the block within which this macro is instantiated, causes all FLOW_LOG_....
Definition: log.hpp:405
#define FLOW_LOG_TRACE(ARG_stream_fragment)
Logs a TRACE message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
Definition: log.hpp:227
Flow module containing miscellaneous general-use facilities that don't fit into any other Flow module...
Definition: basic_blob.hpp:29
boost::unique_lock< Mutex > Lock_guard
Short-hand for advanced-capability RAII lock guard for any mutex, ensuring exclusive ownership of tha...
Definition: util_fwd.hpp:265
bool scheduled_task_fired(log::Logger *logger_ptr, Scheduled_task_const_handle task)
Returns whether a previously scheduled (by schedule_task_from_now() or similar) task has already fire...
Definition: sched_task.cpp:238
Fine_duration scheduled_task_fires_from_now_or_canceled(log::Logger *logger_ptr, Scheduled_task_const_handle task)
Returns how long remains until a previously scheduled (by schedule_task_from_now() or similar) task f...
Definition: sched_task.cpp:200
boost::shared_ptr< Scheduled_task_handle_state > Scheduled_task_handle
Black-box type that represents a handle to a scheduled task as scheduled by schedule_task_at() or sch...
bool scheduled_task_short_fire(log::Logger *logger_ptr, Scheduled_task_handle task)
Attempts to reschedule a previously scheduled (by schedule_task_from_now() or similar) task to fire i...
Definition: sched_task.cpp:95
bool scheduled_task_canceled(log::Logger *logger_ptr, Scheduled_task_const_handle task)
Returns whether a previously scheduled (by schedule_task_from_now() or similar) task has been cancele...
Definition: sched_task.cpp:259
bool scheduled_task_cancel(log::Logger *logger_ptr, Scheduled_task_handle task)
Attempts to prevent the execution of a previously scheduled (by schedule_task_from_now() or similar) ...
Definition: sched_task.cpp:26
boost::shared_ptr< const Scheduled_task_handle_state > Scheduled_task_const_handle
Equivalent to Scheduled_task_handle but refers to immutable version of a task.
boost::system::error_code Error_code
Short-hand for a boost.system error code (which basically encapsulates an integer/enum error code and...
Definition: common.hpp:502
Fine_clock::duration Fine_duration
A high-res time duration as computed from two Fine_time_pts.
Definition: common.hpp:410