Flow 2.0.0
Flow project: Full implementation reference.
event_set.cpp
Go to the documentation of this file.
1/* Flow
2 * Copyright 2023 Akamai Technologies, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the
5 * "License"); you may not use this file except in
6 * compliance with the License. You may obtain a copy
7 * of the License at
8 *
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in
12 * writing, software distributed under the License is
13 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
14 * CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing
16 * permissions and limitations under the License. */
17
18/// @file
22#include "flow/async/util.hpp"
23#include <boost/algorithm/cxx11/all_of.hpp>
24#include <boost/algorithm/cxx11/one_of.hpp>
25#include <boost/functional.hpp>
26
27namespace flow::net_flow
28{
29
30// Event_set static initializations.
31
32const boost::unordered_map<Event_set::Event_type, Function<bool (const Node*, const boost::any&)>>
34 ({
38 });
39
40// Event_set implementations.
41
43 log::Log_context(logger_ptr, Flow_log_component::S_NET_FLOW),
44 m_state(State::S_CLOSED), // Incorrect; set explicitly.
45 m_node(0), // Incorrect; set explicitly.
46 m_want(empty_ev_type_to_socks_map()), // Each event type => empty socket set.
47 m_can(empty_ev_type_to_socks_map()), // Ditto.
48 m_baseline_check_pending(false)
49{
50 // This can be quite frequent if program uses many sync_*() methods. Use TRACE.
51 FLOW_LOG_TRACE("Event_set [" << this << "] created.");
52}
53
55{
56 // This can be quite frequent if program uses many sync_*() methods. Use TRACE.
57 FLOW_LOG_TRACE("Event_set [" << this << "] destroyed.");
58}
59
61{
62 Lock_guard lock(m_mutex);
63 return m_state;
64}
65
67{
68 Lock_guard lock(m_mutex);
69 return m_node;
70}
71
72bool Event_set::async_wait(const Event_handler& on_event, Error_code* err_code)
73{
75 // ^-- Call ourselves and return if err_code is null. If got to present line, err_code is not null.
76
77 using boost::algorithm::all_of;
78
79 // We are in user thread U != W.
80
81 /* Caller has loaded *this with desired events (socket + event type pairs); e.g., "I want
82 * Peer_socket X to be Readable; I want Server_socket Y to be Acceptabe." Now she wants to be
83 * informed as soon as one or more of those events has occurred (e.g., "Server_socket Y is
84 * Acceptable."). After being so informed, she can check which events did in fact occur and act
85 * accordingly (e.g., "Server_socket Y is Acceptable, so I will call Y->accept()").
86 *
87 * Suppose we lock *this. There are now several possitibilities.
88 *
89 * -1- We are CLOSED. Bail out: everything is blank in CLOSED state, and no further waits are
90 * possible.
91 * -2- We are WAITING. Bail out: cannot wait while already WAITING: the on-event behavior
92 * (i.e., on_event) has already been set, and we do not support dislodging it with another
93 * handler.
94 * -3- We are INACTIVE. The mainstream case. We should enter WAITING and inform the caller
95 * (via on_event() call) as soon as at least one event has occurred.
96 *
97 * Obviously -3- is the interesting case.
98 *
99 * First, some groundwork. The meaning of "As soon as 1+ event has occurred" is not totally
100 * clear-cut. For example, it could mean "the moment we know that exactly 1 event has occurred."
101 * However, a high-performance definition is: "no later than time period E since the moment we
102 * know that exactly 1 event has occurred," where E is defined as "the largest amount of time that
103 * can be considered 'non-blocking.'" Informally, this just means that we should detect and
104 * accumulate (clump together) as many events as possible -- without extending the wait by a
105 * "blocking" time period -- before calling on_event() and thus "giving the results" to the user.
106 * This should result in a faster event loop for the user, but we may want to make it tunable.
107 *
108 * Supposing we want to "clump together" events this way, the problem reduces to 3 conceptual
109 * steps. First, detect that exactly 1 desired event has fired; accumulate it. Second, perform
110 * any immediately pending Node work, and accumulate any events thus fired. Third, once that's
111 * done, call on_event() to inform the user.
112 *
113 * The latter two steps are interesting but can be left as black boxes for the time being. What
114 * about the former, first, step? How to detect that first event has occurred in a timely
115 * fashion?
116 *
117 * What we really are looking for is not so much an "event" but rather a CONDITION (e.g., "Socket
118 * S is Writable"), and we want to know the earliest moment in the future when this CONDITION is
119 * true. (Time is always passing, so I say "future" instead of "present or future." Also we can
120 * assume, for our purposes, that the CONDITION cannot become false after it is true.) One
121 * guaranteed way to detect this, which is what we'll use, is as follows. At the soonest moment
122 * possible from now, check whether the CONDITION is true. At that point, if it is true, we've
123 * found the one "event" that has occurred and can immediately proceed. If, at that point, it is
124 * not true, then we check at every step when the CONDITION *may* become true -- that is, when the
125 * underlying logic changes what the CONDITION is about -- and if it is at that point true, we can
126 * proceed. In other words, establish a baseline ("CONDITION is currently false"), and then
127 * ignore the whole thing except at time points when CONDITION may deviate from that baseline.
128 * This is efficient and works.
129 *
130 * We call the initial check, right after async_wait() is called, the "baseline" check; while the
131 * subsequent checks are called "delta" checks. (Known terms for this are "level" check and
132 * "edge" check, but I didn't know that when originally writing this giant comment. @todo Change
133 * terminology in comments and code.)
134 *
135 * To be specific, consider the CONDITION "Server_socket S is Acceptable" as an example. First we
136 * check ASAP: is S Acceptable -- i.e., does it have at least one Peer_socket on its accept queue?
137 * Yes? Done. No? OK. <time passes> Say we get a valid SYN_ACK_ACK on that port. Since
138 * receiving a SYN_ACK_ACK on socket S' belonging to Server_socket S is the only way S can become
139 * Acceptable, we check it. Is it? Yes. Done. At all other points in time we don't concern
140 * ourselves with the CONDITION/event "Server_socket S is Acceptable."
141 *
142 * The above may be seen as obvious, but it's important to be very explicit for the following
143 * reasoning to work. It's pretty clear that the "delta" checking can only occur on thread W;
144 * only Node on thread W knows about such things as receiving SYN_ACK_ACKs, for example. What
145 * about the "baseline" (initial) check for the CONDITION in question? There are two
146 * possibilities. One is we can do it right here, right now, in thread U != W. The other is we
147 * can post() a task onto thread W and have it happen there ASAP.
148 *
149 * The former (do it here) seems attractive. It's "faster," in that we can just do it now. It
150 * also lets us short-circuit on_event() and just return something to the user right now, so they
151 * can instantly react (in case one of the events is immediately true) instead of having to learn
152 * of the event indirectly. It's also more distributed among threads in some situations (we're
153 * loading less work onto one thread W, which may be busy keeping net_flow working). The negative is
154 * we generally prefer to keep work on W when it doesn't seriously hurt performance, to keep the
155 * synchronization as simple as possible (avoid bugs); for example that's why Node::listen() and
156 * Node::connect() and Event_set::close() work the way they do (using futures). Another negative
157 * is that it introduces a special case to the interface; so the user would need to write code to
158 * handle both an instant, non-error return and one via on_event().
159 *
160 * I thought about this for a while. I'd go for the faster way, but the real crux is how to set
161 * up the synchronization in a way that would not miss events. I 90% convinced myself that a
162 * certain not-hard way would be correct, but it remains difficult to think about; eventually I
163 * decided that it would be best to go with the slower solution (do both "baseline" and "delta"
164 * checks on W), because of its simplicity and elegance. I doubt the performance impact is too
165 * serious. On 5-year old server hardware in 2012, assuming an idle thread W and a ready socket event, a
166 * return through post(W) + on_event() { boost::promise::set_value() } is in the low hundreds of
167 * microseconds. If on_event() instead sends a message through a Unix domain socket, doing that
168 * and detecting and reading that message is in the same ballpark (certainly sub-millisecond).
169 * Effectively adding sub-ms latency to some messages does not seem too bad.
170 *
171 * Conclusion: Lock *this; then filter out CLOSED and WAITING state. Then, assuming INACTIVE
172 * state, proceed to WAITING state. post() onto thread W the "baseline check" task. Finally,
173 * unlock *this. Thread W will then proceed with "baseline" and "delta" checks and will fire
174 * on_event() when ready.
175 *
176 * Also note that poll() provides the "instantly check for any of the desired events and don't
177 * wait" functionality in isolation (something like a select() with a 0 timeout). */
178
179 // Lock everything. We're going to be writing to things other user threads and W will be accessing/writing.
180 Lock_guard lock(m_mutex);
181
182 // Check for invalid arguments, basically.
184 {
186 return false;
187 }
188 // else
189
191 {
192 // Already CLOSED Event_set -- Node has disowned us. Mark error in *err_code and log.
193 assert(!m_node);
194
196 return false;
197 }
198 // else
199
201 {
202 // Someone already started an async_wait().
204 return false;
205 }
206 // else the mainstream case.
207 assert(m_state == State::S_INACTIVE);
208
209 // Forward to Node, as is the general pattern for Event_set method implementations involving Node state.
210 return m_node->event_set_async_wait(shared_from_this(), // Get a Ptr that shares ownership of this.
211 on_event, err_code);
212} // Event_set::async_wait()
213
215{
217 // ^-- Call ourselves and return if err_code is null. If got to present line, err_code is not null.
218
219 // We are in thread U != W.
220
221 /* User wants to move *this to INACTIVE state, so that she can check the results of a wait (in
222 * m_can). If it is already in INACTIVE, then this won't do anything but is allowed (for
223 * reasons that will be very soon clear). If it is CLOSED, then it's just an error like any other
224 * operation.
225 *
226 * If it is WAITING, then there's a race. We're about to lock *this and try to change to INACTIVE
227 * if needed. If we lose the race -- if Node in thread W is able to, while scanning Event_sets
228 * for applicable events that just occurred, lock *this and find some active events in *this and
229 * thus change m_state to INACTIVE -- then that's just the NOOP situation for us (INACTIVE ->
230 * INACTIVE). No harm done; *this is still INACTIVE by the time we return, as advertised. If we
231 * win the race -- if we lock and change state to INACTIVE -- then Node will just ignore *this
232 * when/if it gets to it, as user is no longer interested in *this's events (which is correct). */
233
234 // Lock everything, as we'll be reading/changing m_state at least.
235 Lock_guard lock(m_mutex);
236
238 {
239 // Already CLOSED Event_set -- Node has disowned us. Mark error in *err_code and log.
241 return false;
242 }
243 // else
244
246 {
247 FLOW_LOG_TRACE("Event_set [" << this << "] wait finish requested; moving from "
248 "[" << Event_set::State::S_WAITING << "] to [" << Event_set::State::S_INACTIVE << "].");
249
250 // As in event_set_fire_if_got_events():
251 m_on_event.clear();
253 }
254 else
255 {
256 assert(m_state == State::S_INACTIVE);
257 FLOW_LOG_TRACE("Event_set [" << this << "] wait finish requested, but state was already "
258 "[" << Event_set::State::S_INACTIVE << "].");
259 }
260
261 err_code->clear();
262 return true;
263} // Event_set::async_wait_finish()
264
266{
268 // ^-- Call ourselves and return if err_code is null. If got to present line, err_code is not null.
269
270 // We are in thread U != W.
271
272 /* This method can be thought of as sync_wait() with a zero max_wait (which sync_wait() actually
273 * disallows, so that poll() is used instead), with the minor difference that the "no events ready 'in time'" result
274 * for us means we return `true` (no error) and `!*err_code` (sync_wait() returns a ..._TIMEOUT err_code).
275 *
276 * It assumes INACTIVE state, goes WAITING, checks
277 * for any of the m_want conditions that are true right now (and saves such to m_can), then
278 * immediately goes back to INACTIVE. If you look at the giant comment in async_wait() you'll
279 * note that this is basically equivalent to an async_wait() that performs only the initial
280 * ("baseline") check and then immediately goes back to INACTIVE regardless of whether the
281 * baseline check discovered any active events.
282 *
283 * The only difference is that we do the baseline check directly in thread U != W, as opposed to
284 * post()ing it onto thread W. It is entirely thread-safe to check each event from thread U, as
285 * all the event checking functions (is Readable? is Writable? is Acceptable?) explicitly can
286 * run from any thread. Additionally, the argument made (in the async_wait() comment) against
287 * performing the baseline check in thread U != W does not apply: we're not interested in
288 * performing any blocking, so we can't "miss" any events as long as we simply check immediately.
289 * Of course, the result is faster and simpler code as well (compared to post()ing it). */
290
291 // Lock everything, as we'll be reading/changing much state.
292 Lock_guard lock(m_mutex);
293
295 {
296 // Already CLOSED Event_set -- Node has disowned us. Mark error in *err_code and log.
298 return false;
299 }
300 // else
301
303 {
304 // Fairly similar situation to trying to async_wait() while WAITING.
306 return false;
307 }
308 // else
309 assert(m_state == State::S_INACTIVE);
310
311 /* Perform baseline check (that is to say, simply check each event in m_want and if it holds,
312 * add it to m_can). */
313
314 // This is a formality, as we'll just go back to INACTIVE in a moment, but event_set_baseline_check() expects it.
316
317 // As in a regular async_wait():
320
321#ifndef NDEBUG
322 const bool baseline_check_ok =
323#endif
324 m_node->event_set_check_baseline(shared_from_this());
325 assert(baseline_check_ok); // There is NO adequate reason for it to fail; we've set m_baseline_check_pending = true.
326
327 // As in a regular async_wait_finish() (but no need to mess with m_on_event, as we didn't set it above):
329
330 // Done. m_can sets are what they are.
331 err_code->clear();
332 return true;
333} // Event_set::poll()
334
335bool Event_set::sync_wait_impl(const Fine_duration& max_wait, Error_code* err_code)
336{
337 using boost::promise;
338 using boost::unique_future;
339 using boost::future_status;
340 using boost::chrono::milliseconds;
341 using boost::chrono::round;
342 using boost::algorithm::one_of;
343
344 assert(max_wait.count() > 0);
345
346 // We are in user thread U != W.
347
348 /* This is actually simple. They want us to block until 1+ of the desired events loaded into
349 * m_want has occurred. Therefore we use async_wait(), and have thread W signal us when this
350 * has occurred; then in this thread we wait for that signal. So how to signal? We can use
351 * Boost future (internally condition_variable probably).
352 *
353 * Additional optimization: perform a single poll() right away, in case events are ready now.
354 * This lets us avoid any thread W work and return immediately. */
355
356 {
357 // Lock so that we can safely check result of the poll() without another thread messing with it.
358 Lock_guard lock(m_mutex); // OK because m_mutex is recursive (poll() will also momentarily lock).
359
360 if (!poll(err_code)) // May throw.
361 {
362 return false; // *err_code set (if not null)/logged.
363 }
364 // else see if already a least one exists for at least one of the event types.
365 if (one_of(m_can, boost::not1(ev_type_to_socks_map_entry_is_empty)))
366 {
367 // Boo-ya, immediately found some active events.
368 return true;
369 }
370 // else need to wait.
371 }
372
373 /* We will perform the wait via the future/promise pair. The result of the successful operation is either true or
374 * false: interrupted (true) or condition(s) now true (false). */
375 promise<bool> intr_promise; // This will be touched by thread W when events ready.
376 unique_future<bool> intr_future = intr_promise.get_future(); // We'll use this to wait for that.
377
378 /* The on-event callback is therefore: intr_promise->set_value(interrupted),
379 * where interrupted is boolean value async_wait() will pass in, indicating whether the wait was finished
380 * due to an explicit interruption (true) vs. the waited-on condition(s) actually becoming true (false).
381 * Capture intr_promise by reference, as we will for completion, during which time it'll keep existing. */
382 if (!async_wait([&intr_promise](bool interrupted) { intr_promise.set_value(interrupted); },
383 err_code)) // May throw.
384 {
385 // Was not able to start the asynchronous wait; *err_code set (if not null)/logged.
386 return false;
387 }
388 // else *err_code is success (or untouched if null).
389
390 bool timed_out = false;
391
392 // Block until intr_promise is touched.
393 if (max_wait == Fine_duration::max())
394 {
395 // Infinite wait.
396 intr_future.wait();
397 }
398 else
399 {
400 // Finite wait.
401
402 if (intr_future.wait_for(max_wait) != future_status::ready)
403 {
404 /* Timed out. That's OK: we treat it the same way as if hadn't timed out; we'll simply
405 * return 0 active events, not an error. Therefore we need only finish the wait by going to
406 * INACTIVE state using async_wait_finish(). Then if/when Node decides to actually add an
407 * active event to *this, *this will already be INACTIVE, so Node will ignore it (which is
408 * right).
409 *
410 * Note that in the tiny period of time between wait_for() exiting with false and us
411 * calling async_wait_finish(), Node in thread W may actually be able to detect an event,
412 * add it to m_can, and go to INACTIVE. This would mean that in async_wait_finish() we'll
413 * try to go from INACTIVE to INACTIVE. That is OK; that method specifically supports
414 * that. In that case it won't do anything, still return succes, and we'll happily return 1
415 * or more active events inm_can, as if there was no timeout after all. */
416
417 FLOW_LOG_INFO("Synchronous wait on Event_set [" << this << "] timed out; "
418 "timeout = [" << round<milliseconds>(max_wait) << "].");
419
420 if (!async_wait_finish(err_code)) // May throw.
421 {
422 // Extraordinary situation -- *this was close()d or something. *err_code set (if not null)/logged.
423 return false;
424 }
425 // else *err_code is still success (or untouched if null).
426
427 timed_out = true;
428 } // if (intr_future timed out)
429 } // else if (needed to wait until a deadline)
430
431 if (!timed_out)
432 {
433 /* The wait didn't time out (if that was even possible); but the result may have been that the (successful, from
434 * async_wait()'s point of view) wait finished due an interruption as opposed to the waited-on condition(s) becoming
435 * true. In this case, as advertised, we return a specific error. This is conceptually similar to POSIX's
436 * errno=EINTR semantics. */
437
438 if (intr_future.get())
439 {
440 FLOW_LOG_INFO("Synchronous wait on Event_set [" << this << "] was interrupted.");
441
442 if (err_code)
443 {
445 }
446
447 return false;
448 }
449 // else if (conditions(s) are actually true)
450 }
451
452 // Caller can now examine m_can for fired events (if any).
453 assert((!err_code) || (!*err_code)); // Either null or success.
454 return true;
455} // Event_set::sync_wait_impl()
456
458{
459 using boost::chrono::microseconds;
460 return sync_wait(microseconds(microseconds::max()), err_code); // Wait indefinitely. May throw.
461}
462
464{
465 if (flow::error::exec_void_and_throw_on_error([this](Error_code* actual_err_code) { close(actual_err_code); },
466 err_code, FLOW_UTIL_WHERE_AM_I_STR()))
467 {
468 return;
469 }
470 // else
471
472 // We are in user thread U != W.
473
474 Lock_guard lock(m_mutex); // Lock m_node/m_state; also it's a pre-condition for Node::event_set_close().
475
477 {
478 // Already CLOSED Event_set -- Node has disowned us.
479 assert(!m_node);
480
481 // Mark error in *err_code and log.
483 return;
484 }
485
486 // Forward to Node, as is the general pattern for Event_set method implementations involving Node state.
487 lock.release(); // Let go of the mutex (mutex is still LOCKED). event_set_close is now in charge of unlocking it.
488 m_node->event_set_close(shared_from_this(), // Get a Ptr that shares ownership of this.
489 err_code);
490} // Event_set::close()
491
492bool Event_set::swap_wanted_sockets(Sockets* target_set, Event_type ev_type, Error_code* err_code)
493{
494 FLOW_ERROR_EXEC_AND_THROW_ON_ERROR(bool, swap_wanted_sockets, target_set, ev_type, _1);
495 // ^-- Call ourselves and return if err_code is null. If got to present line, err_code is not null.
496
497 /* @todo There are about 4 methods that have a pattern similar to below, differing mostly by log message
498 * contents and what happens just before `return true`. Perhaps code reuse? */
499
500 // We are in thread U != W.
501
502 assert(target_set);
503
504 // Accessing m_state, socket sets, etc. which may be written by other threads at any time. Must lock.
505 Lock_guard lock(m_mutex);
506
507 Sockets& want_set = m_want[ev_type];
508
509 FLOW_LOG_TRACE("Wanted set for event type [" << ev_type << "] swapped in Event_set [" << this << "]; pre-swap sizes: "
510 "Event_set [" << want_set.size() << "]; user [" << target_set->size() << "].");
511
512 if (!ok_to_mod_socket_set(err_code)) // Ensure we can modify want_set in this state.
513 {
514 // *err_code is set.
515 return false; // Closed; do not set *target_set (BTW want_set is blank anyway).
516 }
517 // else *err_code is success.
518
519 // This is an O(1) operation.
520 target_set->swap(want_set);
521
522 return true;
523} // Event_set::swap_wanted_sockets()
524
526{
528 // ^-- Call ourselves and return if err_code is null. If got to present line, err_code is not null.
529
530 // We are in thread U != W.
531
532 // Accessing m_state, the sets, etc. which may be written by other threads at any time. Must lock.
533 Lock_guard lock(m_mutex);
534
535 Sockets& want_set = m_want[ev_type];
536
537 FLOW_LOG_TRACE("Wanted set for event type [" << ev_type << "] cleared in Event_set [" << this << "]; "
538 "size [" << want_set.size() << "].");
539
540 if (!ok_to_mod_socket_set(err_code)) // Ensure we can modify can_set in this state.
541 {
542 // *err_code is set.
543 return false; // Closed; do not clear (it should be cleared anyway, BTW).
544 }
545 // else *err_code is success.
546
547 want_set.clear();
548 return true;
549}
550
552{
554 // ^-- Call ourselves and return if err_code is null. If got to present line, err_code is not null.
555
556 using boost::algorithm::one_of;
557
558 // We are in thread U != W.
559
560 // Lock everything, as we'll be reading state and other things.
561 Lock_guard lock(m_mutex);
562
564 {
565 // Already CLOSED Event_set -- Node has disowned us. Mark error in *err_code and log.
567 return false;
568 }
569 // else
570
571 return one_of(m_want, boost::not1(ev_type_to_socks_map_entry_is_empty));
572} // Event_set::events_wanted()
573
575{
577 // ^-- Call ourselves and return if err_code is null. If got to present line, err_code is not null.
578
579 using boost::algorithm::one_of;
580
581 // We are in thread U != qW.
582
583 // Lock everything, as we'll be reading state.
584 Lock_guard lock(m_mutex);
585
587 {
588 // Already CLOSED Event_set -- Node has disowned us. Mark error in *err_code and log.
590 return false;
591 }
592 // else
593
595 {
597 return false;
598 }
599 // else
600 assert(m_state == State::S_INACTIVE);
601
602 // Coolio.
603 err_code->clear();
604
605 FLOW_LOG_TRACE("Wait result set checked for activity in Event_set [" << this << "]; "
606 "results set sizes = [" << ev_type_to_socks_map_sizes_to_str(m_can) << "].");
607
608 return one_of(m_can, boost::not1(ev_type_to_socks_map_entry_is_empty));
609} // Event_set::events_detected()
610
611bool Event_set::emit_result_sockets(Sockets* target_set, Event_type ev_type, Error_code* err_code)
612{
613 FLOW_ERROR_EXEC_AND_THROW_ON_ERROR(bool, emit_result_sockets, target_set, ev_type, _1);
614 // ^-- Call ourselves and return if err_code is null. If got to present line, err_code is not null.
615
616 // We are in thread U != W.
617
618 assert(target_set);
619
620 // Accessing m_state, the sets, etc. which may be written by other threads at any time. Must lock.
621 Lock_guard lock(m_mutex);
622
623 Sockets& can_set = m_can[ev_type];
624
625 FLOW_LOG_TRACE("Wait result set for event type [" << ev_type << "] emitted in Event_set [" << this << "]; "
626 "size [" << can_set.size() << "].");
627
628 if (!ok_to_mod_socket_set(err_code)) // Ensure we can modify can_set in this state.
629 {
630 // *err_code is set.
631 return false; // Closed; do not set *target_set (*want_set is blank anyway).
632 }
633 // else *err_code is success.
634
635 // Efficiently give them the result set (swap() avoids copy).
636
637 // This is an O(1) operation.
638 can_set.swap(*target_set);
639
640 /* Get rid of whatever garbage they had in their target set before calling us.
641 * This is O(n), where n is # of elements in *target_set before the swap. Typically n == 0. */
642 can_set.clear();
643
644 return true;
645} // Event_set::emit_result_sockets()
646
648{
650 // ^-- Call ourselves and return if err_code is null. If got to present line, err_code is not null.
651
652 // We are in thread U != W.
653
654 // Accessing m_state, the sets, etc. which may be written by other threads at any time. Must lock.
655 Lock_guard lock(m_mutex);
656
657 Sockets& can_set = m_can[ev_type];
658
659 FLOW_LOG_TRACE("Wait result set for event type [" << ev_type << "] cleared in Event_set [" << this << "]; "
660 "size [" << can_set.size() << "].");
661
662 if (!ok_to_mod_socket_set(err_code)) // Ensure we can modify can_set in this state.
663 {
664 // *err_code is set.
665 return false; // Closed; do not clear (it should be cleared anyway).
666 }
667 // else *err_code is success.
668
669 can_set.clear();
670 return true;
671}
672
674{
675 // m_mutex should be locked.
676
678 {
679 // Already CLOSED Event_set -- Node has disowned us.
681 return false;
682 }
683 // else
684
686 {
687 /* Event_set is being waited on; in this phase we do not allow changing events for which to
688 * wait. We could probably allow it, but the logic involved would become needlessly complex.
689 * Also, consider a BSD socket select() or a Linux epoll_wait(); once select() or epoll_wait()
690 * is entered, it's probably impossible to add events to the set (or at least it's
691 * undocumented). So let's just not. Similarly, don't allow retrieving results untilt he
692 * wait is finishied. */
694 return false;
695 }
696 // else
697 assert(m_state == State::S_INACTIVE);
698
699 err_code->clear();
700 return true;
701} // Event_set::ok_to_mod_socket_set()
702
704{
705 /* Note this can be (and was) written FAR more consiely in terms of other clear_*() methods; but the cost
706 * was significant: repeated redundant state checks, recursive mutex locks, logging.... The following
707 * is superior. */
708
710 // ^-- Call ourselves and return if err_code is null. If got to present line, err_code is not null.
711
712 // We are in thread U != W.
713
714 // Accessing m_state, the sets, etc. which may be written by other threads at any time. Must lock.
715 Lock_guard lock(m_mutex);
716
717 FLOW_LOG_TRACE("Clearing sets in Event_set [" << this << "]; pre-clear set sizes: "
718 "wanted [" << ev_type_to_socks_map_sizes_to_str(m_want) << "], "
719 "results [" << ev_type_to_socks_map_sizes_to_str(m_can) << "].");
720
721 if (!ok_to_mod_socket_set(err_code)) // Ensure we can modify *want_set in this state.
722 {
723 // *err_code is set.
724 return false;
725 }
726 // else *err_code is success.
727
730
731 return true;
732}
733
735{
737 ({
738 // Linked_hash_map order is significant. Iteration will occur in this canonical order in logs, etc.
742 });
743}
744
746{
747 assert(ev_type_to_socks_map);
748 for (auto& ev_type_and_socks : *ev_type_to_socks_map)
749 {
750 ev_type_and_socks.second.clear();
751 }
752}
753
754bool Event_set::ev_type_to_socks_map_entry_is_empty(const Ev_type_to_socks_map::Value& ev_type_and_socks) // Static.
755{
756 return ev_type_and_socks.second.empty();
757}
758
760{
761 using std::string;
762 string out;
763
764 size_t n_left = ev_type_to_socks_map.size();
765 for (const auto& ev_type_and_socks : ev_type_to_socks_map)
766 {
768 ev_type_and_socks.first, ": ", ev_type_and_socks.second.size(),
769 (((--n_left) == 0) ? "" : ", "));
770 }
771
772 return out;
773}
774
775std::string Event_set::sock_as_any_to_str(const boost::any& sock_as_any) // Static.
776{
777 using boost::any_cast;
778 using std::string;
779
780 string out;
781
782 /* A boost::any can contain an object of almost any type or be empty. Our boost:any's can only be those
783 * from what we load into Sockets sets; currently the following types. So just find the correct type
784 * and then cast to it. */
785 const auto& type_id = sock_as_any.type();
786 if (type_id == typeid(Peer_socket::Ptr))
787 {
788 util::ostream_op_to_string(&out, any_cast<Peer_socket::Ptr>(sock_as_any));
789 }
790 else if (type_id == typeid(Server_socket::Ptr))
791 {
792 util::ostream_op_to_string(&out, any_cast<Server_socket::Ptr>(sock_as_any));
793 }
794 else
795 {
796 assert(sock_as_any.empty());
797 out = "none";
798 }
799
800 return out;
801}
802
803// Event_set::Socket_as_any* implementations.
804
805// Method implementations.
806
807size_t Event_set::Socket_as_any_hash::operator()(const boost::any& sock_as_any) const
808{
809 using boost::any_cast;
810 using boost::hash;
811
812 /* A boost::any can contain an object of almost any type or be empty. Our boost:any's can only be those
813 * from what we load into Sockets sets; currently the following types. So just find the correct type
814 * and then cast to it. Then just apply the default hash operation to that, as would be done if we
815 * were simply storing a set or map with keys of that type.
816 *
817 * Collisions are allowed anyway -- though best avoided for hash lookup performance -- but in this case
818 * hash<> will just use the pointer's raw value anyway, and in practice that should ensure completely
819 * disjoins sets of hash values for each of the possible stored types. */
820 const auto& type_id = sock_as_any.type();
821 if (type_id == typeid(Peer_socket::Ptr))
822 {
823 return hash<Peer_socket::Ptr>()(any_cast<Peer_socket::Ptr>(sock_as_any));
824 }
825 // else
826 if (type_id == typeid(Server_socket::Ptr))
827 {
828 return hash<Server_socket::Ptr>()(any_cast<Server_socket::Ptr>(sock_as_any));
829 }
830 // else
831
832 assert(sock_as_any.empty());
833 return 0;
834}
835
836bool Event_set::Socket_as_any_equals::operator()(const boost::any& sock_as_any1,
837 const boost::any& sock_as_any2) const
838{
839 using boost::any_cast;
840
841 /* A boost::any can contain an object of almost any type or be empty. Our boost:any's can only be those
842 * from what we load into Sockets sets; currently the following types. Firstly, if the two anys' types
843 * are different, they are obviously different objects. (Note, also, that in practice we'll be storing
844 * only objects of the same type in each Sockets structure; thus that case should rarely, if ever,
845 * come into play.)
846 *
847 * Assuming they are of the same type, just find the correct type and then cast both to it. Now that type's
848 * operator==() can do the job, as it would if we were simply storing a set or map with keys of that type. */
849
850 if (sock_as_any1.type() != sock_as_any2.type())
851 {
852 return false;
853 }
854 // else
855
856 const auto& type_id = sock_as_any1.type();
857
858 if (type_id == typeid(Peer_socket::Ptr))
859 {
860 return any_cast<Peer_socket::Ptr>(sock_as_any1) == any_cast<Peer_socket::Ptr>(sock_as_any2);
861 }
862 // else
863 if (type_id == typeid(Server_socket::Ptr))
864 {
865 return any_cast<Server_socket::Ptr>(sock_as_any1) == any_cast<Server_socket::Ptr>(sock_as_any2);
866 }
867 // else
868
869 assert(sock_as_any1.empty()); // Note: type_id == typeid(void), in this case.
870 assert(sock_as_any2.empty());
871
872 return true;
873}
874
875// Node implementations (dealing with individual Event_sets but also Node state).
876
877// Method implementations.
878
880{
882 // ^-- Call ourselves and return if err_code is null. If got to present line, err_code is not null.
883
886
887 // We are in thread U != W.
888
889 if (!running())
890 {
892 return Event_set::Ptr();
893 }
894 // else
895
896 /* Put the rest of the work into thread W. For justification, see big comment in listen().
897 * Addendum regarding performance: event_set_create() should be pretty rare. */
898
899 // Load this body onto thread W boost.asio work queue. event_set_promise captured by reference, as we will wait.
900 Event_set::Ptr event_set(new Event_set(get_logger()));
901 event_set->m_state = Event_set::State::S_INACTIVE;
902 event_set->m_node = this;
903 event_set->m_baseline_check_pending = false;
904 asio_exec_ctx_post(get_logger(), &m_task_engine, Synchronicity::S_ASYNC_AND_AWAIT_CONCURRENT_COMPLETION, [&]()
905 {
906 // We are in thread W.
907 m_event_sets.insert(event_set);
908 });
909 // If got here, the task has completed in thread W and signaled us to that effect.
910
911 err_code->clear();
912 return event_set;
913} // Node::event_set_create()
914
916 Error_code* err_code)
917{
918 using boost::asio::post;
919 using boost::any;
920
921 // We are in thread U != W.
922
923 if (!running())
924 {
926 return false;
927 }
928 // else
929
930 // See giant comment in Event_set::async_wait(), which just called us, before proceeding.
931
932 // event_set is locked (and will be unlocked after we exit).
933
934 // Check explicitly documented pre-condition.
935 assert(event_set->m_state == Event_set::State::S_INACTIVE);
936
937 FLOW_LOG_TRACE("Event_set [" << event_set << "] wait requested; moving from "
938 "[" << Event_set::State::S_INACTIVE << "] to [" << Event_set::State::S_WAITING << "].");
939
940 // Cock the gun....
941 event_set->m_state = Event_set::State::S_WAITING;
942 event_set->m_on_event = on_event;
943
944 // Clear result sets, so when we get back to S_INACTIVE (events fire), only the events that happened are there.
945 Event_set::clear_ev_type_to_socks_map(&event_set->m_can);
946
947 /* As described in Event_set::async_wait() giant comment, thread W must check all events (sockets
948 * + desired states) of interest (in event_set->m_want) as soon as possible, in case those
949 * conditions already hold. If any hold, it should fire m_on_event() right then. For example,
950 * Peer_socket P in m_want[S_PEER_SOCKET_READABLE] may already be Readable, so this would be immediately detected;
951 * if we don't do that, the Readable status would only be detected later (if ever) if, say, more
952 * DATA packets arrive. */
953
954 post(m_task_engine, [this, event_set]() { event_set_check_baseline_assuming_state(event_set); });
955
956 /* Thread W will invoke this->event_set_check_baseline_assuming_state(event_set) the next time it
957 * gets a free moment. (Might be right NOW, though it won't be able to lock and will wait until
958 * we unlock just below.) */
959
960 /* Corner case: Let P be in m_want[PEER_SOCKET_READABLE]. Assume thread W is executing a handler right now which
961 * gets a DATA packet on socket P. Having saved it into P's Receive buffer, it will check each
962 * Event_set in *this Node. We will unlock shortly, at which point W will note that event_set is
963 * S_WAITING, and P is in m_want[P_S_R]. Therefore it will fire event_set->m_on_event() and make
964 * event_set S_INACTIVE again. This will happen before event_set_check_baseline() runs. When
965 * that runs, it will be in INACTIVE state again and thus not do anything.
966 *
967 * Is this bad? Not really. It just so happens that that sole socket P wins the race and
968 * triggers m_on_event(). This doesn't violate the goal: to report the moment 1 or more events
969 * hold; 1 is indeed "1 or more." However, let's try to avoid situation anyway (be "greedy" for
970 * more ready events at a time). Set event_set->m_baseline_check_pending = true. This will be a
971 * sign to thread W that, when it next examines event_set for a "delta" check (such as upon
972 * receiving the DATA packet in this example), it should instead perform the full baseline check
973 * of all events. Then the queued up event_set_check_baseline() can do nothing if that bool is
974 * false by then; or do the work and set it to false if it wins the race and performs the full check,
975 * in which case the aforementioned "delta" check would just do the "delta" check after all. */
976 event_set->m_baseline_check_pending = true;
977
978 /* That's it. event_set_check_baseline() will execute soon. If it detects no events, thread W
979 * will keep checking for relevant events as states become Readable/Writable/Acceptable/etc. on various
980 * sockets. Eventually it will fire and change m_state from S_WAITING to S_INACTIVE. */
981
982 err_code->clear();
983 return true;
984 // Unlock happens right after this returns.
985} // Node::event_set_async_wait()
986
988{
989 // We are in thread W.
990
991 // Imperative to lock all of event_set. Much access possible from user threads.
992 Event_set::Lock_guard lock(event_set->m_mutex);
993
994 /* event_set_async_wait() placed us onto thread W. When it did so, event_set->m_state ==
995 * S_WAITING (waiting for 1+ events to hold, so we can inform user). However that may have
996 * changed by the time boost.asio was able to get to us. E.g., socket was closed due to error, or
997 * an event was detected before we executed, so we're S_INACTIVE again. So check for that. */
998
999 if (event_set->m_state != Event_set::State::S_WAITING)
1000 {
1001 // Unlikely but legitimate and kind of interesting.
1002 FLOW_LOG_TRACE("Event_set [" << event_set << "] baseline check ran, but state is no "
1003 "longer [" << Event_set::State::S_WAITING << "] but [" << event_set->m_state << "] instead.");
1004 return;
1005 }
1006 // else state is WAITING: should actually perform the baseline check.
1007
1008 if (event_set_check_baseline(event_set)) // Assumes m_mutex is already locked.
1009 {
1010 // Some fired events may now be recorded inside event_set. Inform user and go to INACTIVE again if so.
1011 event_set_fire_if_got_events(event_set); // This method assumes m_mutex is already locked.
1012 }
1013} // Node::event_set_check_baseline_assuming_state()
1014
1016{
1017 using boost::any;
1018 using boost::any_cast;
1019
1020 // We are in thread W *or* thread U != W. CAUTION!!! Ensure all operations below can be done from thread U!
1021
1022 // event_set is already locked (pre-condition).
1023
1024 // Check explicitly documented pre-condition.
1025 assert(event_set->m_state == Event_set::State::S_WAITING);
1026
1027 /* Our mission is to simply run through every condition in the event_set->m_want sets and
1028 * check whether it holds. For each one that does, save it in the appropriate event_set->m_got
1029 * set. Then if any did apply, run event_set->m_on_event() to inform the user, and go back to
1030 * INACTIVE state. The background for this is in Event_set::async_wait() and friends.
1031 *
1032 * This is a more thorough (and slow) check than that invoked when a condition of interest
1033 * (e.g., new DATA packet added to Receive buffer) arises. */
1034
1035 if (!event_set->m_baseline_check_pending)
1036 {
1037 /* Check already performed once since event_set became WAITING. Therefore all subsequent checks
1038 * are the faster "delta" checks, invoked when individual conditions of interest become true. */
1039 FLOW_LOG_TRACE("Event_set [" << event_set << "] baseline check ran, but skipping because same check already "
1040 "ran since the last time we entered [" << Event_set::State::S_WAITING << "].");
1041 return false;
1042 }
1043 // else if (!event_set->m_baseline_check_pending)
1044
1045 FLOW_LOG_TRACE("Event_set [" << event_set << "] baseline check started.");
1046 event_set->m_baseline_check_pending = false;
1047
1048 /* For each type of event, run through each event we want to know about if it's true, and if
1049 * it's true, add it to the active events list for that event type. */
1050
1052 "Expecting amortized constant time insertion sockets container."); // Ensure fastness.
1053
1054 /* To get the set of Event_type, use S_EV_TYPE_TO_IS_ACTIVE_NODE_MTD which has them as the key set;
1055 * it also stores the functions of the form `bool Node::does_event_hold(const any& sock_as_any)`, where
1056 * sock_as_any is a boost::any wrapping a socket appropriate to the given Event_type, and the function will
1057 * return whether the condition of that type holds for a given socket of interest. */
1058 for (const auto& ev_type_and_is_active_mtd : Event_set::S_EV_TYPE_TO_IS_ACTIVE_NODE_MTD)
1059 {
1060 const Event_set::Event_type ev_type = ev_type_and_is_active_mtd.first;
1061 const auto& does_event_hold = ev_type_and_is_active_mtd.second;
1062 const Event_set::Sockets& want_set = event_set->m_want[ev_type];
1063 Event_set::Sockets& can_set = event_set->m_can[ev_type];
1064
1065 assert(can_set.empty()); // We will now fill it up.
1066 for (const any& sock_as_any : want_set)
1067 {
1068 if (does_event_hold(this, sock_as_any))
1069 {
1070 FLOW_LOG_TRACE("Event of type [" << ev_type << "] for "
1071 "object [" << Event_set::sock_as_any_to_str(sock_as_any) << "] detected in "
1072 "Event_set [" << event_set << "].");
1073
1074 can_set.insert(sock_as_any);
1075 }
1076 }
1077 }
1078
1079 return true; // This only means the check ran, not that it found anything.
1080} // Node::event_set_check_baseline()
1081
1083{
1084 using boost::algorithm::all_of;
1085
1086 // We are in thread W.
1087
1088 // event_set state involved should already be locked.
1089
1090 // Explicitly documented pre-condition.
1091 assert(event_set->m_state == Event_set::State::S_WAITING);
1092
1093 if (all_of(event_set->m_can, Event_set::ev_type_to_socks_map_entry_is_empty))
1094 {
1095 // Nope, no events ready. Carry on in WAITING state.
1096 return;
1097 }
1098 // else
1099
1100 // If editing this, check interrupt_all_waits_worker() also. That's another path to firing m_on_event().
1101
1102 // Yes! At least one event is ready, so inform via the m_on_event() callback.
1103 FLOW_LOG_TRACE("Event_set [" << event_set << "] has ready events; firing and moving from "
1104 "[" << Event_set::State::S_WAITING << "] to [" << Event_set::State::S_INACTIVE << "] again.");
1105
1106 event_set->m_on_event(false);
1107
1108 // Forget the callback. They set the callback each time async_wait() is called (as an argument to that method).
1109 event_set->m_on_event.clear();
1110
1111 // Gun has fired and is thus no longer cocked.
1112 event_set->m_state = Event_set::State::S_INACTIVE;
1113
1114 /* It's important to understand what happens now. m_on_event() just did whatever the user wants
1115 * (presumably signal the user thread(s) (not W) that event_set is back to INACTIVE state and
1116 * currently holds the active events in event_set->m_can). For example it might set a future
1117 * that the user thread is waiting on; or it might send a Unix domain socket message or local TCP
1118 * message, which the user thread is select()ing on and thus wake it up. The point is, the user
1119 * thread U may already be awake RIGHT NOW and trying to check event_set->m_can for activity.
1120 *
1121 * However, we have not unlocked event_set yet. We soon will; until then U will sit there waiting
1122 * for the unlock (any access to event_set must lock event_set mutex; this is enforced by all
1123 * public Event_set methods). Once acquired, it can freely check it and perform actions like
1124 * receive() or accept(). */
1125} // Node::event_set_fire_if_got_events()
1126
1127void Node::event_set_all_check_delta(bool defer_delta_check)
1128{
1129 // We are in thread W.
1130
1131 using boost::any;
1132 using boost::algorithm::all_of;
1133
1134 /* Short-circuit (avoid logging, unneeded locking, etc.) if no delta events have been noted so
1135 * far. (If baseline check has not run yet, it is queued by boost.asio as
1136 * event_set_check_baseline_assuming_state(), so we need not worry about it here.) */
1137 if (all_of(m_sock_events, Event_set::ev_type_to_socks_map_entry_is_empty))
1138 {
1139 return;
1140 }
1141
1142 FLOW_LOG_TRACE("Possible event of interest occurred; "
1143 "defer_delta_check = [" << defer_delta_check << "]; "
1144 "results set sizes = [" << Event_set::ev_type_to_socks_map_sizes_to_str(m_sock_events) << "].");
1145
1146 if (defer_delta_check)
1147 {
1148 /* Caller believes there's an event_set_all_check_delta(false) call within a "non-blocking"
1149 * amount of time, so we should let more possible events accumulate before possibly signaling
1150 * user. Okay. */
1151 return;
1152 }
1153 // else
1154
1155 /* Delta-check not deferred. We should perform it... but there's a possibility that we haven't
1156 * performed the baseline check yet. As explained in giant comment inside
1157 * Event_set::async_wait(), a delta check must be performed after a baseline check (and assuming
1158 * the latter fired no events); otherwise its results would be incomplete. How we can get to a delta check
1159 * before a baseline check is explained in the comment in Node::event_set_async_wait().
1160 *
1161 * So, if we haven't, perform the baseline check first. Basically it is an exhaustive check of
1162 * all desired events, regardless of what's in the delta-check variables Node::m_sock_events. See that
1163 * below. For purposes of discussion, though, assume we do indeed perform only delta checks.
1164 * Then:
1165 *
1166 * Now we check all WAITING Event_sets. For each one, find any waited-on conditions that have
1167 * indeed occurred since the last time we did this; these are accumulated in this->m_sock_events. Any
1168 * that did indicate that at least one event of interest has occurred, and thus we should mark all
1169 * of them down in the appropriate Event_set(s) and signal those waiting on these Event_sets.
1170 *
1171 * Note that we check ALL registered Event_sets. This seems a little crude/inefficient. How to
1172 * avoid it? Basically we'd need to map each (Peer/Server_socket, event type) pair to any
1173 * containing, waiting Event_set(s), so that we can quickly go from the pair to the Event_set.
1174 * The problem is synchronization (since sockets are added to Event_sets, and
1175 * Event_set::async_wait() is called, by user threads, not W). I thought about this pretty hard
1176 * and was unable to come up with anything elegant that would avoid possible deadlocks w/r/t each
1177 * Event_set's individual m_mutex as well. I thought and thought and thought and eventually
1178 * realized that simply iterating over all Event_sets and handling one at a time sidesteps all
1179 * such complexities. Is this slow? Firstly there's the iteration over all Event_sets. How many
1180 * are there? Realistically, there's probably one per user thread, of which there would be about
1181 * as many as CPU cores. That is not a lot. So take one Event_set. For each active socket in
1182 * this->m_sock_events[...], we just look it up in the Event_set's appropriate m_want[...]; since we store the
1183 * latter in a hash table, that's a constant time search. If we did have the socket-to-Event_set
1184 * map (and figured out a working synchronization scheme), that would also be just one
1185 * constant-time search. So without the nice mapping being available, the performance is only
1186 * slowed down by having to look at each Event_set, and there are not many in practice, and an
1187 * extra constant-time lookup beyond that. So it's fine. */
1188
1189 for (Event_set::Ptr event_set : m_event_sets)
1190 {
1191 // As explained above, work on one Event_set at a time. Lock it.
1192 Event_set::Lock_guard lock(event_set->m_mutex);
1193
1194 if (event_set->m_state != Event_set::State::S_WAITING)
1195 {
1196 continue;
1197 }
1198 // else
1199
1200 // As explained above, perform the baseline check if necessary.
1201 if (!event_set_check_baseline(event_set)) // Method assumes m_mutex already locked.
1202 {
1203 // Wasn't necessary (mainstream case). Perform delta check.
1204
1205 FLOW_LOG_TRACE("Event_set [" << event_set << "] delta check started.");
1206
1207 /* For each type of event, run through each event we want to know about if it's true, and if
1208 * it's true (is in Node::m_sock_events), add it to the active events list for that event type.
1209 *
1210 * Note that if some event is desired in two Event_sets, and it has occurred, both Event_sets
1211 * will fire, as advertised. It also means the user is a crazy mother, but that's not our
1212 * problem.... */
1213
1214 for (auto& ev_type_and_socks_from_node : m_sock_events)
1215 {
1216 const Event_set::Event_type ev_type = ev_type_and_socks_from_node.first;
1217 const Event_set::Sockets& all_can_set = ev_type_and_socks_from_node.second;
1218
1219 Event_set::Sockets& can_set = event_set->m_can[ev_type];
1220 Event_set::Sockets& want_set = event_set->m_want[ev_type];
1221
1222 assert(can_set.empty());
1223
1224 /* all_can_set is the set of all sockets for which the event has occurred, since the last baseline
1225 * or delta check on event_set. Our goal is to load can_set (which is a field in
1226 * event_set) with all sockets for which the event has occurred AND are in want_set (a field in
1227 * event_set containing the sockets for which the user is interested in this event). Therefore,
1228 * what we want is the set intersection of want_set and all_can_set; and we want to put the result
1229 * into can_set.
1230 *
1231 * Both want_set and all_can_set have O(1) search. Therefore the fastest way to compute the
1232 * intersection is to pick the smaller set; iterate through each element; and save the given
1233 * element into can_set if it is present in the other set. */
1234
1235 // Ensure fastness just below.
1237 "Expecting amortized constant time search sockets container.");
1239 "Expecting amortized constant time insertion sockets container.");
1240
1241 const bool want_set_smaller = want_set.size() < all_can_set.size();
1242 const Event_set::Sockets& small_set = want_set_smaller ? want_set : all_can_set;
1243 const Event_set::Sockets& large_set = want_set_smaller ? all_can_set : want_set;
1244
1245 for (const any& sock_as_any : small_set)
1246 {
1247 if (util::key_exists(large_set, sock_as_any))
1248 {
1249 FLOW_LOG_TRACE("Event of type [" << ev_type << "] for "
1250 "socket object [" << Event_set::sock_as_any_to_str(sock_as_any) << "] detected in "
1251 "Event_set [" << event_set << "].");
1252
1253 can_set.insert(sock_as_any);
1254 }
1255 }
1256 } // for ((ev_type, all_can_set) : m_sock_events)
1257 } // if (!event_set_check_baseline(event_set))
1258
1259 /* Was necessary to do baseline check. Thus skip delta check. Shouldn't we perform the delta check? No.
1260 * The baseline check is a superset of the delta check: it simply checks each desired event's
1261 * socket for the desired condition. Thus performing a delta check would be useless. */
1262
1263 // If either check detected active events, fire this event_set (and thus move it to INACTIVE).
1264 event_set_fire_if_got_events(event_set); // Method assumes m_mutex already locked.
1265 } // for (all Event_sets in m_event_sets)
1266
1267 // Each Event_set has been handled. So begin a clean slate for the next delta check.
1269} // Node::event_set_all_check_delta()
1270
1272{
1275 using boost::adopt_lock;
1276
1277 // We are in user thread U != W.
1278
1279 if (!running())
1280 {
1282 return;
1283 }
1284 // else
1285
1286 // Pre-condition is that m_mutex is locked already.
1287
1288 {
1289 /* WARNING!!! event_set->m_mutex is locked, but WE must unlock it before returning! Can't
1290 * leave that to the caller, because we must unlock at a specific point below, right before
1291 * post()ing event_set_close_worker() onto thread W. Use a Lock_guard that adopts an
1292 * already-locked mutex. */
1293 Event_set::Lock_guard lock(event_set->m_mutex, adopt_lock);
1294
1295 /* Put the rest of the work into thread W. For justification, see big comment in listen().
1296 * Addendum regarding performance: close_abruptly() is probably called more frequently than
1297 * listen(), but I doubt the performance impact is serious even so. send() and receive() might be
1298 * a different story. */
1299
1300 // We're done -- must unlock so that thread W can do what it wants to with event_set.
1301 } // lock
1302
1303 // Load this bad boy onto thread W boost.asio work queue. Safe to capture by &reference, as we will wait.
1304 asio_exec_ctx_post(get_logger(), &m_task_engine, Synchronicity::S_ASYNC_AND_AWAIT_CONCURRENT_COMPLETION, [&]()
1305 {
1306 // We are in thread W. event_set_close() is waiting for us to set close_promise in thread U.
1307
1308 // Something like async_wait_finish() may be setting event_set->m_state or other things... must lock.
1309 Event_set::Lock_guard lock(event_set->m_mutex);
1310
1311 /* Since we were placed onto thread W, another handler may have been executed before boost.asio
1312 * got to us. Therefore we may already be S_CLOSED. Detect this. */
1313
1314 if (event_set->m_state == Event_set::State::S_CLOSED) // No need to lock: only W can write to this.
1315 {
1316 // Yep, already closed. Done.
1318 return;
1319 }
1320 // else actually do it.
1321
1322 // m_mutex locked (pre-condition).
1323 event_set_close_worker(event_set);
1324 err_code->clear(); // Success.
1325 }); // asio_exec_ctx_post()
1326 // If got here, the task has completed in thread W and signaled us to that effect.
1327
1328} // Node::event_set_close()
1329
1331{
1332 // We are in thread W.
1333
1334 // Everything already locked (explicit pre-condition).
1335
1336 // Check explicitly documented pre-condition.
1337 assert((event_set->m_state == Event_set::State::S_INACTIVE)
1338 || (event_set->m_state == Event_set::State::S_WAITING));
1339
1340 /* If INACTIVE:
1341 * event_set is not currently being waited on; it's in the phase where the set of desired events
1342 * may be being changed, and/or the last set of fired events may be being examined; no on-event
1343 * behavior could be triggered in any case. Therefore we simply go to CLOSED state and remove
1344 * event_set from the master list with no potential for controversy.
1345 *
1346 * If WAITING:
1347 * We advertised in the doc comment, that we will NOT execute any on-event behavior in close().
1348 * Is this OK? There is a chance of controversy. There is a wait happening in some thread U
1349 * != W. We are currently (chronologically speaking) in the middle of some thread U' != W
1350 * calling event_set->close(). Let U = U' (the typical case). Then they (in thread U) must
1351 * have called event_set->async_wait() and then event_set->close(). In that case, unless
1352 * they're dummies, they should know that, since they're closing the thing, the wait() is to be
1353 * abandoned and will never fire on-event behavior. Therefore, as above, we just go straight to
1354 * CLOSED state and not invoke any on-event behavior. Now let U != U' (atypical in general;
1355 * normally one would use a given Event_set in only one thread). Well, one thread executed
1356 * async_wait(), while another executed async_close() afterwards. Could that screw the former?
1357 * If the user is unaware that he might do that to himself, yes... but that is not our problem.
1358 * Therefore, OK to not perform on-event behavior and just close in this case also. */
1359
1360 // First, set various state in *event_set.
1361
1362 // This can be quite frequent if program uses many sync_*() methods. Use TRACE.
1363 FLOW_LOG_TRACE("Closing Event_set [" << event_set << "]: changing state "
1364 "from [" << event_set->m_state << "] to [" << Event_set::State::S_CLOSED << "].");
1365
1366 assert(event_set->m_state != Event_set::State::S_CLOSED);
1367 event_set->m_state = Event_set::State::S_CLOSED;
1368 event_set->m_node = 0; // Maintain invariant.
1369
1370 // Free resources. In particular, after Event_set close, user won't be able to see last set of fired events.
1371 Event_set::clear_ev_type_to_socks_map(&event_set->m_want);
1372 Event_set::clear_ev_type_to_socks_map(&event_set->m_can);
1373
1374 /* Forget the handler callback. What if there's an outstanding async_wait()? Well,
1375 * m_on_event() will NOT be called unless state is WAITING, and it won't be WAITING; it will be
1376 * CLOSED once this critical section is over. So that outstanding async_wait() will never
1377 * "finish." However, this is explicitly documented on the public close() method. Node shutdown
1378 * will avoid this problem by first closing each socket, thus waking up any waits, before
1379 * performing Event_set::close(). */
1380 event_set->m_on_event.clear();
1381
1382 // Then remove it from the master list.
1383#ifndef NDEBUG
1384 const bool erased = 1 ==
1385#endif
1386 m_event_sets.erase(event_set);
1387 assert(erased); // If was not CLOSED, yet was not in m_event_sets. So it is a serious bug somewhere.
1388} // Node::event_set_close_worker()
1389
1391{
1392 using boost::asio::post;
1393
1395 ([this](Error_code* actual_err_code) { interrupt_all_waits(actual_err_code); },
1396 err_code, FLOW_UTIL_WHERE_AM_I_STR()))
1397 {
1398 return;
1399 }
1400 // else
1401
1402 // We are in thread U != W.
1403
1404 if (!running())
1405 {
1407 return;
1408 }
1409 // else
1410
1411 /* Put the rest of the work into thread W. For justification, see big comment in listen().
1412 * Addendum regarding performance: interrupt_all_waits() should be pretty rare. */
1413
1414 post(m_task_engine, [this]() { interrupt_all_waits_worker(); });
1415
1416 err_code->clear();
1417} // Node::interrupt_all_waits()
1418
1420{
1421 // We are in thread W.
1422
1423 FLOW_LOG_INFO("Executing request to interrupt all waiting Event_sets.");
1424
1425 for (Event_set::Ptr event_set : m_event_sets)
1426 {
1427 // Work on one Event_set at a time. Lock it.
1428 Event_set::Lock_guard lock(event_set->m_mutex);
1429
1430 if (event_set->m_state == Event_set::State::S_WAITING)
1431 {
1432 /* The code here is based on event_set_fire_if_got_events(). This is another type of an event being "ready";
1433 * it's just that it's ready because of it being interrupted instead of due to the actual waited-on condition
1434 * becoming true. Keeping comments light to avoid redundancy/maintenance issues.
1435 *
1436 * If editing this, check event_set_fire_if_got_events() also. */
1437
1438 FLOW_LOG_INFO("Event_set [" << event_set << "] is being interrupted; firing and moving from "
1439 "[" << Event_set::State::S_WAITING << "] to [" << Event_set::State::S_INACTIVE << "] again.");
1440
1441 /* As promised, the interrupted case means these will be empty. The actual calls may not be necessary, as maybe
1442 * adding elements to these always immediately (within same m_mutex locking) leads to going to S_INACTIVE state,
1443 * meaning the present code would not be executing. However, no need to rely on that; quite possibly we could
1444 * have code that would load stuff into event_set->m_can and only go to S_WAITING in another callback. */
1445 Event_set::clear_ev_type_to_socks_map(&event_set->m_can);
1446
1447 event_set->m_on_event(true); // true means firing due to interruption, not due to waited-on condition being true.
1448 event_set->m_on_event.clear();
1449 event_set->m_state = Event_set::State::S_INACTIVE;
1450 }
1451 } // for (all Event_sets in m_event_sets)
1452} // Node::interrupt_all_waits_worker()
1453
1454void Node::interrupt_all_waits_internal_sig_handler(const Error_code& sys_err_code, int sig_number)
1455{
1456 // We are in thread W. (Yes. This is a contract of this function.)
1457
1458 if (sys_err_code == boost::asio::error::operation_aborted)
1459 {
1460 return; // Stuff is shutting down; just get out.
1461 }
1462 // else
1463
1464 FLOW_LOG_INFO("Internal interrupt signal handler executed with signal number [" << sig_number << "].");
1465
1466 if (sys_err_code)
1467 {
1468 // This is odd, but there's no need to freak out about anything else. Just log and get out.
1470 FLOW_LOG_WARNING("Internal signal handler executed with unexpected error indicator. Strange! "
1471 "Ignoring and continuing other operation.");
1472 }
1473 else
1474 {
1475 /* To the user, this default signal handler is just supposed to call Node::interrupt_all_waits().
1476 * It's a convenience thing, in case they don't want to set up their own signal handler that'll do that, and also
1477 * they don't need any of their own custom signal handling that may have nothing to do with net_flow at all.
1478 * Actually they can even have the latter -- as long as it uses signal_set also.
1479 * (If they do need some other handling, they MUST disable this feature via Node_options. Then we wouldn't be
1480 * called.) */
1481
1482 // This must be run in thread W, and indeed we are in thread W by contract. It'll log further, so just call it.
1483 interrupt_all_waits_worker();
1484 }
1485
1486 // Wait for it again, or else this will work only the first time signal is sent.
1487 m_signal_set.async_wait([this](const Error_code& sys_err_code, int sig_num)
1488 {
1489 interrupt_all_waits_internal_sig_handler(sys_err_code, sig_num);
1490 });
1491} // Node::interrupt_all_waits_internal_sig_handler()
1492
1493// Free implementations.
1494
1495/// @cond
1496/* -^- Doxygen, please ignore the following. (Don't want docs generated for temp macro; this is more maintainable
1497 * than specifying the macro name to omit it, in Doxygen-config EXCLUDE_SYMBOLS.) */
1498
1499// That's right, I did this. Wanna fight about it?
1500#define STATE_TO_CASE_STATEMENT(ARG_state) \
1501 case Event_set::State::S_##ARG_state: \
1502 return os << #ARG_state
1503
1504// -v- Doxygen, please stop ignoring.
1505/// @endcond
1506
1507std::ostream& operator<<(std::ostream& os, Event_set::State state)
1508{
1509 switch (state)
1510 {
1511 STATE_TO_CASE_STATEMENT(INACTIVE);
1512 STATE_TO_CASE_STATEMENT(WAITING);
1513 STATE_TO_CASE_STATEMENT(CLOSED);
1514 }
1515 return os;
1516#undef STATE_TO_CASE_STATEMENT
1517}
1518
1519/// @cond
1520// -^- Doxygen, please ignore the following. (Same deal as just above.)
1521
1522// That's right, I did this. Wanna fight about it?
1523#define TYPE_TO_CASE_STATEMENT(ARG_type) \
1524 case Event_set::Event_type::S_##ARG_type: \
1525 return os << #ARG_type
1526
1527// -v- Doxygen, please stop ignoring.
1528/// @endcond
1529
1530std::ostream& operator<<(std::ostream& os, Event_set::Event_type ev_type)
1531{
1532
1533 switch (ev_type)
1534 {
1535 TYPE_TO_CASE_STATEMENT(PEER_SOCKET_READABLE);
1536 TYPE_TO_CASE_STATEMENT(PEER_SOCKET_WRITABLE);
1537 TYPE_TO_CASE_STATEMENT(SERVER_SOCKET_ACCEPTABLE);
1538 }
1539 return os;
1540#undef TYPE_TO_CASE_STATEMENT
1541}
1542
1543} // namepace flow::net_flow
Logger * get_logger() const
Returns the stored Logger pointer, particularly as many FLOW_LOG_*() macros expect.
Definition: log.cpp:217
Interface that the user should implement, passing the implementing Logger into logging classes (Flow'...
Definition: log.hpp:1284
bool operator()(const boost::any &sock_as_any1, const boost::any &sock_as_any2) const
Returns whether the two objects, which must be stored in Sockets objects, are equal by value.
Definition: event_set.cpp:836
size_t operator()(const boost::any &sock_as_any) const
Returns hash value of the given object which must be stored in a Sockets object.
Definition: event_set.cpp:807
Mutex m_mutex
Mutex protecting ALL data in this object.
Definition: event_set.hpp:937
static void clear_ev_type_to_socks_map(Ev_type_to_socks_map *ev_type_to_socks_map)
Helper that clears each Sockets set inside an Ev_type_to_socks_map.
Definition: event_set.cpp:745
bool sync_wait(Error_code *err_code=0)
Blocks indefinitely until one or more of the previously described events hold – or the wait is interr...
Definition: event_set.cpp:457
bool sync_wait_impl(const Fine_duration &max_wait, Error_code *err_code)
Same as the public sync_wait(max_wait) but uses a Fine_clock-based Fine_duration non-template type fo...
Definition: event_set.cpp:335
Event_type
Type of event or condition of interest supported by class Event_set.
Definition: event_set.hpp:307
@ S_PEER_SOCKET_WRITABLE
Event type specifying the condition of interest wherein a target Peer_socket sock is such that callin...
@ S_PEER_SOCKET_READABLE
Event type specifying the condition of interest wherein a target Peer_socket sock is such that callin...
@ S_SERVER_SOCKET_ACCEPTABLE
Event type specifying the condition of interest wherein a target Server_socket serv is such that call...
static std::string ev_type_to_socks_map_sizes_to_str(const Ev_type_to_socks_map &ev_type_to_socks_map)
Helper that returns a loggable string summarizing the sizes of the socket sets, by type,...
Definition: event_set.cpp:759
State
A state of an Event_set.
Definition: event_set.hpp:260
@ S_WAITING
Waiting state: valid Event_set that is currently waiting on previously described events.
@ S_CLOSED
Node has disowned the Peer_socket; all further operations will result in error.
@ S_INACTIVE
Default state; valid Event_set that is not currently waiting on events.
bool async_wait_finish(Error_code *err_code=0)
Moves object from State::S_WAITING to State::S_INACTIVE, and forgets any handler saved by async_wait(...
Definition: event_set.cpp:214
bool events_detected(Error_code *err_code=0) const
Returns true if and only if the last wait, if any, detected at least one event.
Definition: event_set.cpp:574
bool poll(Error_code *err_code=0)
Checks for all previously described events that currently hold, saves them for retrieval via emit_res...
Definition: event_set.cpp:265
bool async_wait(const Event_handler &on_event, Error_code *err_code=0)
Moves object to State::S_WAITING state, saves the given handler to be executed later (in a different,...
Definition: event_set.cpp:72
State m_state
See state(). Should be set before user gets access to *this. Must not be modified by non-W threads af...
Definition: event_set.hpp:901
util::Linked_hash_set< boost::any, Socket_as_any_hash, Socket_as_any_equals > Sockets
A set of sockets of one type, used to communicate sets of desired and resulting events in various Eve...
Definition: event_set.hpp:372
Event_handler m_on_event
During State::S_WAITING, stores the handler (a void function with 1 bool argument) that will be calle...
Definition: event_set.hpp:926
bool swap_wanted_sockets(Sockets *target_set, Event_type ev_type, Error_code *err_code)
Efficiently exchanges the current set of sockets we want to know are "ready" by the definiton of the ...
Definition: event_set.cpp:492
bool emit_result_sockets(Sockets *target_set, Event_type ev_type, Error_code *err_code=0)
Gets the sockets that satisfy the condition of the given Event_type detected during the last wait.
Definition: event_set.cpp:611
static Ev_type_to_socks_map empty_ev_type_to_socks_map()
Creates a maximally empty Ev_type_to_socks_map: it will have all possible Event_type as keys but only...
Definition: event_set.cpp:734
Event_set(log::Logger *logger_ptr)
Constructs object; initializes all values to well-defined but possibly meaningless values (0,...
Definition: event_set.cpp:42
util::Lock_guard< Mutex > Lock_guard
Short-hand for RAII lock guard of Mutex. Use instead of boost::lock_guard for release() at least.
Definition: event_set.hpp:791
State state() const
Current State of the Event_set.
Definition: event_set.cpp:60
bool clear_wanted_sockets(Event_type ev_type, Error_code *err_code=0)
Identical to swap_wanted_sockets(&sockets, ev_type, err_code), where originally sockets is empty and ...
Definition: event_set.cpp:525
bool ok_to_mod_socket_set(Error_code *err_code) const
Helper that ensures the state of *this is such that one may modify the m_can and m_want socket sets.
Definition: event_set.cpp:673
bool events_wanted(Error_code *err_code=0) const
Returns true if and only if at least one wanted event for at least one socket is registered (via add_...
Definition: event_set.cpp:551
void close(Error_code *err_code=0)
Clears all stored resources (any desired events, result events, and any handler saved by async_wait()...
Definition: event_set.cpp:463
static const boost::unordered_map< Event_type, Function< bool(const Node *, const boost::any &)> > S_EV_TYPE_TO_IS_ACTIVE_NODE_MTD
Mapping from each possible Event_type to the Node method that determines whether the condition define...
Definition: event_set.hpp:897
Node * node() const
Node that produced this Event_set.
Definition: event_set.cpp:66
bool clear(Error_code *err_code=0)
Forgets all sockets stored in this object in any fashion.
Definition: event_set.cpp:703
Ev_type_to_socks_map m_can
The sockets, categorized by Event_type of interest, that were found to be "ready" (as defined in the ...
Definition: event_set.hpp:920
~Event_set()
Boring destructor. Note that deletion is to be handled exclusively via shared_ptr,...
Definition: event_set.cpp:54
bool m_baseline_check_pending
While in State::S_WAITING, if this is true, an exhaustive check of all desired events is yet to be pe...
Definition: event_set.hpp:934
static std::string sock_as_any_to_str(const boost::any &sock_as_any)
Helper that returns a loggable string representing the socket stored in the given boost::any that sto...
Definition: event_set.cpp:775
std::ostream & operator<<(std::ostream &os, Event_set::State state)
Prints string representation of given Event_set state to given standard ostream and returns the latte...
Definition: event_set.cpp:1507
static bool ev_type_to_socks_map_entry_is_empty(const Ev_type_to_socks_map::Value &ev_type_and_socks)
Functional helper that checks whether a given pair in an Ev_type_to_socks_map contains an empty set o...
Definition: event_set.cpp:754
Ev_type_to_socks_map m_want
The sockets, categorized by Event_type of interest, to check for "ready" status (as defined in the do...
Definition: event_set.hpp:910
Node * m_node
See node(). Should be set before user gets access to *this. Must not be modified by non-W threads aft...
Definition: event_set.hpp:904
bool clear_result_sockets(Event_type ev_type, Error_code *err_code=0)
Identical to emit_result_sockets(&sockets, ev_type, err_code), where originally sockets is empty and ...
Definition: event_set.cpp:647
util::Linked_hash_map< Event_type, Sockets > Ev_type_to_socks_map
Short-hand for type storing a set of socket sets – one per possible Event_type enum value.
Definition: event_set.hpp:798
An object of this class is a single Flow-protocol networking node, in the sense that: (1) it has a di...
Definition: node.hpp:934
bool sock_is_writable(const boost::any &sock_as_any) const
Returns true if and only if calling sock->send() with at least some arguments would return either non...
void interrupt_all_waits(Error_code *err_code=0)
Interrupts any blocking operation, a/k/a wait, and informs the invoker of that operation that the blo...
Definition: event_set.cpp:1390
bool event_set_check_baseline(Event_set::Ptr event_set)
Checks each desired (Event_set::m_want) event in event_set; any that holds true is saved into event_s...
Definition: event_set.cpp:1015
void event_set_close(Event_set::Ptr event_set, Error_code *err_code)
Implementation of Event_set::close() when Event_set::state() != Event_set::State::S_CLOSED for event_...
Definition: event_set.cpp:1271
void event_set_check_baseline_assuming_state(Event_set::Ptr event_set)
Helper placed by event_set_async_wait() onto thread W to invoke event_set_check_baseline() but first ...
Definition: event_set.cpp:987
bool serv_is_acceptable(const boost::any &serv_as_any) const
Returns true if and only if calling serv->accept() with at least some arguments would return either n...
void event_set_close_worker(Event_set::Ptr event_set)
The guts of event_set_close_worker_check_state(): same thing, but assumes Event_set::state() == Event...
Definition: event_set.cpp:1330
void event_set_all_check_delta(bool defer_delta_check)
For each WAITING Event_set within the Node: checks for any events that hold, and if any do hold,...
Definition: event_set.cpp:1127
bool event_set_async_wait(Event_set::Ptr event_set, const Event_set::Event_handler &on_event, Error_code *err_code)
Implementation of Event_set::async_wait() when Event_set::state() == Event_set::State::S_INACTIVE.
Definition: event_set.cpp:915
bool sock_is_readable(const boost::any &sock_as_any) const
Returns true if and only if calling sock->receive() with at least some arguments would return either ...
Event_set::Ptr event_set_create(Error_code *err_code=0)
Creates a new Event_set in Event_set::State::S_INACTIVE state with no sockets/events stored; returns ...
Definition: event_set.cpp:879
void interrupt_all_waits_worker()
Thread W implementation of interrupt_all_waits().
Definition: event_set.cpp:1419
void interrupt_all_waits_internal_sig_handler(const Error_code &sys_err_code, int sig_number)
signal_set handler, executed on SIGINT and SIGTERM, if user has enabled this feature: causes interrup...
Definition: event_set.cpp:1454
void event_set_fire_if_got_events(Event_set::Ptr event_set)
Check whether given Event_set contains any active sockets (Event_set::m_can); if so,...
Definition: event_set.cpp:1082
Properties of various container types.
Definition: traits.hpp:43
size_type size() const
Returns number of elements stored.
An object of this class is a set that combines the lookup speed of an unordered_set<> and ordering an...
bool empty() const
Returns true if and only if container is empty.
void swap(Linked_hash_set &other)
Swaps the contents of this structure and other.
std::pair< Iterator, bool > insert(Value const &key)
Attempts to insert the given key into the set.
void clear()
Makes it so that size() == 0.
size_type size() const
Returns number of elements stored.
boost::shared_ptr< Peer_socket > Ptr
Short-hand for ref-counted pointer to mutable values of type Target_type::element_type (a-la T*).
#define FLOW_ERROR_SYS_ERROR_LOG_WARNING()
Logs a warning about the (often errno-based or from a library) error code in sys_err_code.
Definition: error.hpp:269
#define FLOW_ERROR_EXEC_AND_THROW_ON_ERROR(ARG_ret_type, ARG_function_name,...)
Narrow-use macro that implements the error code/exception semantics expected of most public-facing Fl...
Definition: error.hpp:363
#define FLOW_ERROR_EMIT_ERROR(ARG_val)
Sets *err_code to ARG_val and logs a warning about the error using FLOW_LOG_WARNING().
Definition: error.hpp:202
#define FLOW_LOG_INFO(ARG_stream_fragment)
Logs an INFO message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
Definition: log.hpp:197
#define FLOW_LOG_WARNING(ARG_stream_fragment)
Logs a WARNING message into flow::log::Logger *get_logger() with flow::log::Component get_log_compone...
Definition: log.hpp:152
#define FLOW_LOG_TRACE(ARG_stream_fragment)
Logs a TRACE message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
Definition: log.hpp:227
Synchronicity
Enumeration indicating the manner in which asio_exec_ctx_post(), and various boost....
Definition: async_fwd.hpp:223
void asio_exec_ctx_post(log::Logger *logger_ptr, Execution_context *exec_ctx, Synchronicity synchronicity, Task &&task)
An extension of boost.asio's post() and dispatch() free function templates, this free function templa...
Definition: util.hpp:31
bool exec_void_and_throw_on_error(const Func &func, Error_code *err_code, util::String_view context)
Equivalent of exec_and_throw_on_error() for operations with void return type.
Definition: error.hpp:168
@ S_EVENT_SET_RESULT_CHECK_WHEN_WAITING
Attempted to check wait results while still waiting.
@ S_EVENT_SET_IMMUTABLE_WHEN_WAITING
Attempted to write to an event set, while a wait operation was pending on that event set.
@ S_EVENT_SET_NO_EVENTS
Attempted to wait on an event set without specifying event on which to wait.
@ S_WAIT_INTERRUPTED
A blocking (sync_) or background-blocking (async_) operation was interrupted, such as by a signal.
@ S_EVENT_SET_DOUBLE_WAIT_OR_POLL
Attempted to wait on or poll an event set while already waiting on that event set.
@ S_EVENT_SET_CLOSED
Attempted operation on an event set, when that event set was closed.
@ S_NODE_NOT_RUNNING
Node not running.
Flow module containing the API and implementation of the Flow network protocol, a TCP-inspired stream...
Definition: node.cpp:25
bool key_exists(const Container &container, const typename Container::key_type &key)
Returns true if and only if the given key is present at least once in the given associative container...
Definition: util.hpp:301
void ostream_op_to_string(std::string *target_str, T const &... ostream_args)
Writes to the specified string, as if the given arguments were each passed, via << in sequence,...
Definition: util.hpp:367
boost::system::error_code Error_code
Short-hand for a boost.system error code (which basically encapsulates an integer/enum error code and...
Definition: common.hpp:508
Flow_log_component
The flow::log::Component payload enumeration comprising various log components used by Flow's own int...
Definition: common.hpp:638
Fine_clock::duration Fine_duration
A high-res time duration as computed from two Fine_time_pts.
Definition: common.hpp:416
#define FLOW_UTIL_WHERE_AM_I_STR()
Same as FLOW_UTIL_WHERE_AM_I() but evaluates to an std::string.
Definition: util_fwd.hpp:971