Flow 1.0.0
Flow project: Full implementation reference.
event_set.cpp
Go to the documentation of this file.
1/* Flow
2 * Copyright 2023 Akamai Technologies, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the
5 * "License"); you may not use this file except in
6 * compliance with the License. You may obtain a copy
7 * of the License at
8 *
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in
12 * writing, software distributed under the License is
13 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
14 * CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing
16 * permissions and limitations under the License. */
17
18/// @file
22#include "flow/async/util.hpp"
23#include <boost/algorithm/cxx11/all_of.hpp>
24#include <boost/algorithm/cxx11/one_of.hpp>
25#include <boost/functional.hpp>
26
27namespace flow::net_flow
28{
29
30// Event_set static initializations.
31
32const boost::unordered_map<Event_set::Event_type, Function<bool (const Node*, const boost::any&)>>
34 ({
38 });
39
40// Event_set implementations.
41
43 log::Log_context(logger_ptr, Flow_log_component::S_NET_FLOW),
44 m_state(State::S_CLOSED), // Incorrect; set explicitly.
45 m_node(0), // Incorrect; set explicitly.
46 m_want(empty_ev_type_to_socks_map()), // Each event type => empty socket set.
47 m_can(empty_ev_type_to_socks_map()), // Ditto.
48 m_baseline_check_pending(false)
49{
50 // This can be quite frequent if program uses many sync_*() methods. Use TRACE.
51 FLOW_LOG_TRACE("Event_set [" << this << "] created.");
52}
53
55{
56 // This can be quite frequent if program uses many sync_*() methods. Use TRACE.
57 FLOW_LOG_TRACE("Event_set [" << this << "] destroyed.");
58}
59
61{
62 Lock_guard lock(m_mutex);
63 return m_state;
64}
65
67{
68 Lock_guard lock(m_mutex);
69 return m_node;
70}
71
72bool Event_set::async_wait(const Event_handler& on_event, Error_code* err_code)
73{
74 namespace bind_ns = util::bind_ns;
75 FLOW_ERROR_EXEC_AND_THROW_ON_ERROR(bool, Event_set::async_wait, bind_ns::cref(on_event), _1);
76 // ^-- Call ourselves and return if err_code is null. If got to present line, err_code is not null.
77
78 using boost::algorithm::all_of;
79
80 // We are in user thread U != W.
81
82 /* Caller has loaded *this with desired events (socket + event type pairs); e.g., "I want
83 * Peer_socket X to be Readable; I want Server_socket Y to be Acceptabe." Now she wants to be
84 * informed as soon as one or more of those events has occurred (e.g., "Server_socket Y is
85 * Acceptable."). After being so informed, she can check which events did in fact occur and act
86 * accordingly (e.g., "Server_socket Y is Acceptable, so I will call Y->accept()").
87 *
88 * Suppose we lock *this. There are now several possitibilities.
89 *
90 * -1- We are CLOSED. Bail out: everything is blank in CLOSED state, and no further waits are
91 * possible.
92 * -2- We are WAITING. Bail out: cannot wait while already WAITING: the on-event behavior
93 * (i.e., on_event) has already been set, and we do not support dislodging it with another
94 * handler.
95 * -3- We are INACTIVE. The mainstream case. We should enter WAITING and inform the caller
96 * (via on_event() call) as soon as at least one event has occurred.
97 *
98 * Obviously -3- is the interesting case.
99 *
100 * First, some groundwork. The meaning of "As soon as 1+ event has occurred" is not totally
101 * clear-cut. For example, it could mean "the moment we know that exactly 1 event has occurred."
102 * However, a high-performance definition is: "no later than time period E since the moment we
103 * know that exactly 1 event has occurred," where E is defined as "the largest amount of time that
104 * can be considered 'non-blocking.'" Informally, this just means that we should detect and
105 * accumulate (clump together) as many events as possible -- without extending the wait by a
106 * "blocking" time period -- before calling on_event() and thus "giving the results" to the user.
107 * This should result in a faster event loop for the user, but we may want to make it tunable.
108 *
109 * Supposing we want to "clump together" events this way, the problem reduces to 3 conceptual
110 * steps. First, detect that exactly 1 desired event has fired; accumulate it. Second, perform
111 * any immediately pending Node work, and accumulate any events thus fired. Third, once that's
112 * done, call on_event() to inform the user.
113 *
114 * The latter two steps are interesting but can be left as black boxes for the time being. What
115 * about the former, first, step? How to detect that first event has occurred in a timely
116 * fashion?
117 *
118 * What we really are looking for is not so much an "event" but rather a CONDITION (e.g., "Socket
119 * S is Writable"), and we want to know the earliest moment in the future when this CONDITION is
120 * true. (Time is always passing, so I say "future" instead of "present or future." Also we can
121 * assume, for our purposes, that the CONDITION cannot become false after it is true.) One
122 * guaranteed way to detect this, which is what we'll use, is as follows. At the soonest moment
123 * possible from now, check whether the CONDITION is true. At that point, if it is true, we've
124 * found the one "event" that has occurred and can immediately proceed. If, at that point, it is
125 * not true, then we check at every step when the CONDITION *may* become true -- that is, when the
126 * underlying logic changes what the CONDITION is about -- and if it is at that point true, we can
127 * proceed. In other words, establish a baseline ("CONDITION is currently false"), and then
128 * ignore the whole thing except at time points when CONDITION may deviate from that baseline.
129 * This is efficient and works.
130 *
131 * We call the initial check, right after async_wait() is called, the "baseline" check; while the
132 * subsequent checks are called "delta" checks. (Known terms for this are "level" check and
133 * "edge" check, but I didn't know that when originally writing this giant comment. @todo Change
134 * terminology in comments and code.)
135 *
136 * To be specific, consider the CONDITION "Server_socket S is Acceptable" as an example. First we
137 * check ASAP: is S Acceptable -- i.e., does it have at least one Peer_socket on its accept queue?
138 * Yes? Done. No? OK. <time passes> Say we get a valid SYN_ACK_ACK on that port. Since
139 * receiving a SYN_ACK_ACK on socket S' belonging to Server_socket S is the only way S can become
140 * Acceptable, we check it. Is it? Yes. Done. At all other points in time we don't concern
141 * ourselves with the CONDITION/event "Server_socket S is Acceptable."
142 *
143 * The above may be seen as obvious, but it's important to be very explicit for the following
144 * reasoning to work. It's pretty clear that the "delta" checking can only occur on thread W;
145 * only Node on thread W knows about such things as receiving SYN_ACK_ACKs, for example. What
146 * about the "baseline" (initial) check for the CONDITION in question? There are two
147 * possibilities. One is we can do it right here, right now, in thread U != W. The other is we
148 * can post() a task onto thread W and have it happen there ASAP.
149 *
150 * The former (do it here) seems attractive. It's "faster," in that we can just do it now. It
151 * also lets us short-circuit on_event() and just return something to the user right now, so they
152 * can instantly react (in case one of the events is immediately true) instead of having to learn
153 * of the event indirectly. It's also more distributed among threads in some situations (we're
154 * loading less work onto one thread W, which may be busy keeping net_flow working). The negative is
155 * we generally prefer to keep work on W when it doesn't seriously hurt performance, to keep the
156 * synchronization as simple as possible (avoid bugs); for example that's why Node::listen() and
157 * Node::connect() and Event_set::close() work the way they do (using futures). Another negative
158 * is that it introduces a special case to the interface; so the user would need to write code to
159 * handle both an instant, non-error return and one via on_event().
160 *
161 * I thought about this for a while. I'd go for the faster way, but the real crux is how to set
162 * up the synchronization in a way that would not miss events. I 90% convinced myself that a
163 * certain not-hard way would be correct, but it remains difficult to think about; eventually I
164 * decided that it would be best to go with the slower solution (do both "baseline" and "delta"
165 * checks on W), because of its simplicity and elegance. I doubt the performance impact is too
166 * serious. On 5-year old server hardware in 2012, assuming an idle thread W and a ready socket event, a
167 * return through post(W) + on_event() { boost::promise::set_value() } is in the low hundreds of
168 * microseconds. If on_event() instead sends a message through a Unix domain socket, doing that
169 * and detecting and reading that message is in the same ballpark (certainly sub-millisecond).
170 * Effectively adding sub-ms latency to some messages does not seem too bad.
171 *
172 * Conclusion: Lock *this; then filter out CLOSED and WAITING state. Then, assuming INACTIVE
173 * state, proceed to WAITING state. post() onto thread W the "baseline check" task. Finally,
174 * unlock *this. Thread W will then proceed with "baseline" and "delta" checks and will fire
175 * on_event() when ready.
176 *
177 * Also note that poll() provides the "instantly check for any of the desired events and don't
178 * wait" functionality in isolation (something like a select() with a 0 timeout). */
179
180 // Lock everything. We're going to be writing to things other user threads and W will be accessing/writing.
181 Lock_guard lock(m_mutex);
182
183 // Check for invalid arguments, basically.
185 {
187 return false;
188 }
189 // else
190
192 {
193 // Already CLOSED Event_set -- Node has disowned us. Mark error in *err_code and log.
194 assert(!m_node);
195
197 return false;
198 }
199 // else
200
202 {
203 // Someone already started an async_wait().
205 return false;
206 }
207 // else the mainstream case.
208 assert(m_state == State::S_INACTIVE);
209
210 // Forward to Node, as is the general pattern for Event_set method implementations involving Node state.
211 return m_node->event_set_async_wait(shared_from_this(), // Get a Ptr that shares ownership of this.
212 on_event, err_code);
213} // Event_set::async_wait()
214
216{
218 // ^-- Call ourselves and return if err_code is null. If got to present line, err_code is not null.
219
220 // We are in thread U != W.
221
222 /* User wants to move *this to INACTIVE state, so that she can check the results of a wait (in
223 * m_can). If it is already in INACTIVE, then this won't do anything but is allowed (for
224 * reasons that will be very soon clear). If it is CLOSED, then it's just an error like any other
225 * operation.
226 *
227 * If it is WAITING, then there's a race. We're about to lock *this and try to change to INACTIVE
228 * if needed. If we lose the race -- if Node in thread W is able to, while scanning Event_sets
229 * for applicable events that just occurred, lock *this and find some active events in *this and
230 * thus change m_state to INACTIVE -- then that's just the NOOP situation for us (INACTIVE ->
231 * INACTIVE). No harm done; *this is still INACTIVE by the time we return, as advertised. If we
232 * win the race -- if we lock and change state to INACTIVE -- then Node will just ignore *this
233 * when/if it gets to it, as user is no longer interested in *this's events (which is correct). */
234
235 // Lock everything, as we'll be reading/changing m_state at least.
236 Lock_guard lock(m_mutex);
237
239 {
240 // Already CLOSED Event_set -- Node has disowned us. Mark error in *err_code and log.
242 return false;
243 }
244 // else
245
247 {
248 FLOW_LOG_TRACE("Event_set [" << this << "] wait finish requested; moving from "
249 "[" << Event_set::State::S_WAITING << "] to [" << Event_set::State::S_INACTIVE << "].");
250
251 // As in event_set_fire_if_got_events():
252 m_on_event.clear();
254 }
255 else
256 {
257 assert(m_state == State::S_INACTIVE);
258 FLOW_LOG_TRACE("Event_set [" << this << "] wait finish requested, but state was already "
259 "[" << Event_set::State::S_INACTIVE << "].");
260 }
261
262 err_code->clear();
263 return true;
264} // Event_set::async_wait_finish()
265
267{
269 // ^-- Call ourselves and return if err_code is null. If got to present line, err_code is not null.
270
271 // We are in thread U != W.
272
273 /* This method can be thought of as sync_wait() with a zero max_wait (which sync_wait() actually
274 * disallows, so that poll() is used instead), with the minor difference that the "no events ready 'in time'" result
275 * for us means we return `true` (no error) and `!*err_code` (sync_wait() returns a ..._TIMEOUT err_code).
276 *
277 * It assumes INACTIVE state, goes WAITING, checks
278 * for any of the m_want conditions that are true right now (and saves such to m_can), then
279 * immediately goes back to INACTIVE. If you look at the giant comment in async_wait() you'll
280 * note that this is basically equivalent to an async_wait() that performs only the initial
281 * ("baseline") check and then immediately goes back to INACTIVE regardless of whether the
282 * baseline check discovered any active events.
283 *
284 * The only difference is that we do the baseline check directly in thread U != W, as opposed to
285 * post()ing it onto thread W. It is entirely thread-safe to check each event from thread U, as
286 * all the event checking functions (is Readable? is Writable? is Acceptable?) explicitly can
287 * run from any thread. Additionally, the argument made (in the async_wait() comment) against
288 * performing the baseline check in thread U != W does not apply: we're not interested in
289 * performing any blocking, so we can't "miss" any events as long as we simply check immediately.
290 * Of course, the result is faster and simpler code as well (compared to post()ing it). */
291
292 // Lock everything, as we'll be reading/changing much state.
293 Lock_guard lock(m_mutex);
294
296 {
297 // Already CLOSED Event_set -- Node has disowned us. Mark error in *err_code and log.
299 return false;
300 }
301 // else
302
304 {
305 // Fairly similar situation to trying to async_wait() while WAITING.
307 return false;
308 }
309 // else
310 assert(m_state == State::S_INACTIVE);
311
312 /* Perform baseline check (that is to say, simply check each event in m_want and if it holds,
313 * add it to m_can). */
314
315 // This is a formality, as we'll just go back to INACTIVE in a moment, but event_set_baseline_check() expects it.
317
318 // As in a regular async_wait():
321
322#ifndef NDEBUG
323 const bool baseline_check_ok =
324#endif
325 m_node->event_set_check_baseline(shared_from_this());
326 assert(baseline_check_ok); // There is NO adequate reason for it to fail; we've set m_baseline_check_pending = true.
327
328 // As in a regular async_wait_finish() (but no need to mess with m_on_event, as we didn't set it above):
330
331 // Done. m_can sets are what they are.
332 err_code->clear();
333 return true;
334} // Event_set::poll()
335
336bool Event_set::sync_wait_impl(const Fine_duration& max_wait, Error_code* err_code)
337{
338 using boost::promise;
339 using boost::unique_future;
340 using boost::future_status;
341 using boost::chrono::milliseconds;
342 using boost::chrono::round;
343 using boost::algorithm::one_of;
344
345 assert(max_wait.count() > 0);
346
347 // We are in user thread U != W.
348
349 /* This is actually simple. They want us to block until 1+ of the desired events loaded into
350 * m_want has occurred. Therefore we use async_wait(), and have thread W signal us when this
351 * has occurred; then in this thread we wait for that signal. So how to signal? We can use
352 * Boost future (internally condition_variable probably).
353 *
354 * Additional optimization: perform a single poll() right away, in case events are ready now.
355 * This lets us avoid any thread W work and return immediately. */
356
357 {
358 // Lock so that we can safely check result of the poll() without another thread messing with it.
359 Lock_guard lock(m_mutex); // OK because m_mutex is recursive (poll() will also momentarily lock).
360
361 if (!poll(err_code)) // May throw.
362 {
363 return false; // *err_code set (if not null)/logged.
364 }
365 // else see if already a least one exists for at least one of the event types.
366 if (one_of(m_can, boost::not1(ev_type_to_socks_map_entry_is_empty)))
367 {
368 // Boo-ya, immediately found some active events.
369 return true;
370 }
371 // else need to wait.
372 }
373
374 /* We will perform the wait via the future/promise pair. The result of the successful operation is either true or
375 * false: interrupted (true) or condition(s) now true (false). */
376 promise<bool> intr_promise; // This will be touched by thread W when events ready.
377 unique_future<bool> intr_future = intr_promise.get_future(); // We'll use this to wait for that.
378
379 /* The on-event callback is therefore: intr_promise->set_value(interrupted),
380 * where interrupted is boolean value async_wait() will pass in, indicating whether the wait was finished
381 * due to an explicit interruption (true) vs. the waited-on condition(s) actually becoming true (false).
382 * Capture intr_promise by reference, as we will for completion, during which time it'll keep existing. */
383 if (!async_wait([&intr_promise](bool interrupted) { intr_promise.set_value(interrupted); },
384 err_code)) // May throw.
385 {
386 // Was not able to start the asynchronous wait; *err_code set (if not null)/logged.
387 return false;
388 }
389 // else *err_code is success (or untouched if null).
390
391 bool timed_out = false;
392
393 // Block until intr_promise is touched.
394 if (max_wait == Fine_duration::max())
395 {
396 // Infinite wait.
397 intr_future.wait();
398 }
399 else
400 {
401 // Finite wait.
402
403 if (intr_future.wait_for(max_wait) != future_status::ready)
404 {
405 /* Timed out. That's OK: we treat it the same way as if hadn't timed out; we'll simply
406 * return 0 active events, not an error. Therefore we need only finish the wait by going to
407 * INACTIVE state using async_wait_finish(). Then if/when Node decides to actually add an
408 * active event to *this, *this will already be INACTIVE, so Node will ignore it (which is
409 * right).
410 *
411 * Note that in the tiny period of time between wait_for() exiting with false and us
412 * calling async_wait_finish(), Node in thread W may actually be able to detect an event,
413 * add it to m_can, and go to INACTIVE. This would mean that in async_wait_finish() we'll
414 * try to go from INACTIVE to INACTIVE. That is OK; that method specifically supports
415 * that. In that case it won't do anything, still return succes, and we'll happily return 1
416 * or more active events inm_can, as if there was no timeout after all. */
417
418 FLOW_LOG_INFO("Synchronous wait on Event_set [" << this << "] timed out; "
419 "timeout = [" << round<milliseconds>(max_wait) << "].");
420
421 if (!async_wait_finish(err_code)) // May throw.
422 {
423 // Extraordinary situation -- *this was close()d or something. *err_code set (if not null)/logged.
424 return false;
425 }
426 // else *err_code is still success (or untouched if null).
427
428 timed_out = true;
429 } // if (intr_future timed out)
430 } // else if (needed to wait until a deadline)
431
432 if (!timed_out)
433 {
434 /* The wait didn't time out (if that was even possible); but the result may have been that the (successful, from
435 * async_wait()'s point of view) wait finished due an interruption as opposed to the waited-on condition(s) becoming
436 * true. In this case, as advertised, we return a specific error. This is conceptually similar to POSIX's
437 * errno=EINTR semantics. */
438
439 if (intr_future.get())
440 {
441 FLOW_LOG_INFO("Synchronous wait on Event_set [" << this << "] was interrupted.");
442
443 if (err_code)
444 {
446 }
447
448 return false;
449 }
450 // else if (conditions(s) are actually true)
451 }
452
453 // Caller can now examine m_can for fired events (if any).
454 assert((!err_code) || (!*err_code)); // Either null or success.
455 return true;
456} // Event_set::sync_wait_impl()
457
459{
460 using boost::chrono::microseconds;
461 return sync_wait(microseconds(microseconds::max()), err_code); // Wait indefinitely. May throw.
462}
463
465{
466 if (flow::error::exec_void_and_throw_on_error([this](Error_code* actual_err_code) { close(actual_err_code); },
467 err_code, FLOW_UTIL_WHERE_AM_I_STR()))
468 {
469 return;
470 }
471 // else
472
473 // We are in user thread U != W.
474
475 Lock_guard lock(m_mutex); // Lock m_node/m_state; also it's a pre-condition for Node::event_set_close().
476
478 {
479 // Already CLOSED Event_set -- Node has disowned us.
480 assert(!m_node);
481
482 // Mark error in *err_code and log.
484 return;
485 }
486
487 // Forward to Node, as is the general pattern for Event_set method implementations involving Node state.
488 lock.release(); // Let go of the mutex (mutex is still LOCKED). event_set_close is now in charge of unlocking it.
489 m_node->event_set_close(shared_from_this(), // Get a Ptr that shares ownership of this.
490 err_code);
491} // Event_set::close()
492
493bool Event_set::swap_wanted_sockets(Sockets* target_set, Event_type ev_type, Error_code* err_code)
494{
496 target_set, ev_type, _1);
497 // ^-- Call ourselves and return if err_code is null. If got to present line, err_code is not null.
498
499 /* @todo There are about 4 methods that have a pattern similar to below, differing mostly by log message
500 * contents and what happens just before `return true`. Perhaps code reuse? */
501
502 // We are in thread U != W.
503
504 assert(target_set);
505
506 // Accessing m_state, socket sets, etc. which may be written by other threads at any time. Must lock.
507 Lock_guard lock(m_mutex);
508
509 Sockets& want_set = m_want[ev_type];
510
511 FLOW_LOG_TRACE("Wanted set for event type [" << ev_type << "] swapped in Event_set [" << this << "]; pre-swap sizes: "
512 "Event_set [" << want_set.size() << "]; user [" << target_set->size() << "].");
513
514 if (!ok_to_mod_socket_set(err_code)) // Ensure we can modify want_set in this state.
515 {
516 // *err_code is set.
517 return false; // Closed; do not set *target_set (BTW want_set is blank anyway).
518 }
519 // else *err_code is success.
520
521 // This is an O(1) operation.
522 target_set->swap(want_set);
523
524 return true;
525} // Event_set::swap_wanted_sockets()
526
528{
530 // ^-- Call ourselves and return if err_code is null. If got to present line, err_code is not null.
531
532 // We are in thread U != W.
533
534 // Accessing m_state, the sets, etc. which may be written by other threads at any time. Must lock.
535 Lock_guard lock(m_mutex);
536
537 Sockets& want_set = m_want[ev_type];
538
539 FLOW_LOG_TRACE("Wanted set for event type [" << ev_type << "] cleared in Event_set [" << this << "]; "
540 "size [" << want_set.size() << "].");
541
542 if (!ok_to_mod_socket_set(err_code)) // Ensure we can modify can_set in this state.
543 {
544 // *err_code is set.
545 return false; // Closed; do not clear (it should be cleared anyway, BTW).
546 }
547 // else *err_code is success.
548
549 want_set.clear();
550 return true;
551}
552
554{
556 // ^-- Call ourselves and return if err_code is null. If got to present line, err_code is not null.
557
558 using boost::algorithm::one_of;
559
560 // We are in thread U != W.
561
562 // Lock everything, as we'll be reading state and other things.
563 Lock_guard lock(m_mutex);
564
566 {
567 // Already CLOSED Event_set -- Node has disowned us. Mark error in *err_code and log.
569 return false;
570 }
571 // else
572
573 return one_of(m_want, boost::not1(ev_type_to_socks_map_entry_is_empty));
574} // Event_set::events_wanted()
575
577{
579 // ^-- Call ourselves and return if err_code is null. If got to present line, err_code is not null.
580
581 using boost::algorithm::one_of;
582
583 // We are in thread U != qW.
584
585 // Lock everything, as we'll be reading state.
586 Lock_guard lock(m_mutex);
587
589 {
590 // Already CLOSED Event_set -- Node has disowned us. Mark error in *err_code and log.
592 return false;
593 }
594 // else
595
597 {
599 return false;
600 }
601 // else
602 assert(m_state == State::S_INACTIVE);
603
604 // Coolio.
605 err_code->clear();
606
607 FLOW_LOG_TRACE("Wait result set checked for activity in Event_set [" << this << "]; "
608 "results set sizes = [" << ev_type_to_socks_map_sizes_to_str(m_can) << "].");
609
610 return one_of(m_can, boost::not1(ev_type_to_socks_map_entry_is_empty));
611} // Event_set::events_detected()
612
613bool Event_set::emit_result_sockets(Sockets* target_set, Event_type ev_type, Error_code* err_code)
614{
616 // ^-- Call ourselves and return if err_code is null. If got to present line, err_code is not null.
617
618 // We are in thread U != W.
619
620 assert(target_set);
621
622 // Accessing m_state, the sets, etc. which may be written by other threads at any time. Must lock.
623 Lock_guard lock(m_mutex);
624
625 Sockets& can_set = m_can[ev_type];
626
627 FLOW_LOG_TRACE("Wait result set for event type [" << ev_type << "] emitted in Event_set [" << this << "]; "
628 "size [" << can_set.size() << "].");
629
630 if (!ok_to_mod_socket_set(err_code)) // Ensure we can modify can_set in this state.
631 {
632 // *err_code is set.
633 return false; // Closed; do not set *target_set (*want_set is blank anyway).
634 }
635 // else *err_code is success.
636
637 // Efficiently give them the result set (swap() avoids copy).
638
639 // This is an O(1) operation.
640 can_set.swap(*target_set);
641
642 /* Get rid of whatever garbage they had in their target set before calling us.
643 * This is O(n), where n is # of elements in *target_set before the swap. Typically n == 0. */
644 can_set.clear();
645
646 return true;
647} // Event_set::emit_result_sockets()
648
650{
652 // ^-- Call ourselves and return if err_code is null. If got to present line, err_code is not null.
653
654 // We are in thread U != W.
655
656 // Accessing m_state, the sets, etc. which may be written by other threads at any time. Must lock.
657 Lock_guard lock(m_mutex);
658
659 Sockets& can_set = m_can[ev_type];
660
661 FLOW_LOG_TRACE("Wait result set for event type [" << ev_type << "] cleared in Event_set [" << this << "]; "
662 "size [" << can_set.size() << "].");
663
664 if (!ok_to_mod_socket_set(err_code)) // Ensure we can modify can_set in this state.
665 {
666 // *err_code is set.
667 return false; // Closed; do not clear (it should be cleared anyway).
668 }
669 // else *err_code is success.
670
671 can_set.clear();
672 return true;
673}
674
676{
677 // m_mutex should be locked.
678
680 {
681 // Already CLOSED Event_set -- Node has disowned us.
683 return false;
684 }
685 // else
686
688 {
689 /* Event_set is being waited on; in this phase we do not allow changing events for which to
690 * wait. We could probably allow it, but the logic involved would become needlessly complex.
691 * Also, consider a BSD socket select() or a Linux epoll_wait(); once select() or epoll_wait()
692 * is entered, it's probably impossible to add events to the set (or at least it's
693 * undocumented). So let's just not. Similarly, don't allow retrieving results untilt he
694 * wait is finishied. */
696 return false;
697 }
698 // else
699 assert(m_state == State::S_INACTIVE);
700
701 err_code->clear();
702 return true;
703} // Event_set::ok_to_mod_socket_set()
704
706{
707 /* Note this can be (and was) written FAR more consiely in terms of other clear_*() methods; but the cost
708 * was significant: repeated redundant state checks, recursive mutex locks, logging.... The following
709 * is superior. */
710
712 // ^-- Call ourselves and return if err_code is null. If got to present line, err_code is not null.
713
714 // We are in thread U != W.
715
716 // Accessing m_state, the sets, etc. which may be written by other threads at any time. Must lock.
717 Lock_guard lock(m_mutex);
718
719 FLOW_LOG_TRACE("Clearing sets in Event_set [" << this << "]; pre-clear set sizes: "
720 "wanted [" << ev_type_to_socks_map_sizes_to_str(m_want) << "], "
721 "results [" << ev_type_to_socks_map_sizes_to_str(m_can) << "].");
722
723 if (!ok_to_mod_socket_set(err_code)) // Ensure we can modify *want_set in this state.
724 {
725 // *err_code is set.
726 return false;
727 }
728 // else *err_code is success.
729
732
733 return true;
734}
735
737{
739 ({
740 // Linked_hash_map order is significant. Iteration will occur in this canonical order in logs, etc.
744 });
745}
746
748{
749 assert(ev_type_to_socks_map);
750 for (auto& ev_type_and_socks : *ev_type_to_socks_map)
751 {
752 ev_type_and_socks.second.clear();
753 }
754}
755
756bool Event_set::ev_type_to_socks_map_entry_is_empty(const Ev_type_to_socks_map::Value& ev_type_and_socks) // Static.
757{
758 return ev_type_and_socks.second.empty();
759}
760
762{
763 using std::string;
764 string out;
765
766 size_t n_left = ev_type_to_socks_map.size();
767 for (const auto& ev_type_and_socks : ev_type_to_socks_map)
768 {
770 ev_type_and_socks.first, ": ", ev_type_and_socks.second.size(),
771 (((--n_left) == 0) ? "" : ", "));
772 }
773
774 return out;
775}
776
777std::string Event_set::sock_as_any_to_str(const boost::any& sock_as_any) // Static.
778{
779 using boost::any_cast;
780 using std::string;
781
782 string out;
783
784 /* A boost::any can contain an object of almost any type or be empty. Our boost:any's can only be those
785 * from what we load into Sockets sets; currently the following types. So just find the correct type
786 * and then cast to it. */
787 const auto& type_id = sock_as_any.type();
788 if (type_id == typeid(Peer_socket::Ptr))
789 {
790 util::ostream_op_to_string(&out, any_cast<Peer_socket::Ptr>(sock_as_any));
791 }
792 else if (type_id == typeid(Server_socket::Ptr))
793 {
794 util::ostream_op_to_string(&out, any_cast<Server_socket::Ptr>(sock_as_any));
795 }
796 else
797 {
798 assert(sock_as_any.empty());
799 out = "none";
800 }
801
802 return out;
803}
804
805// Event_set::Socket_as_any* implementations.
806
807// Method implementations.
808
809size_t Event_set::Socket_as_any_hash::operator()(const boost::any& sock_as_any) const
810{
811 using boost::any_cast;
812 using boost::hash;
813
814 /* A boost::any can contain an object of almost any type or be empty. Our boost:any's can only be those
815 * from what we load into Sockets sets; currently the following types. So just find the correct type
816 * and then cast to it. Then just apply the default hash operation to that, as would be done if we
817 * were simply storing a set or map with keys of that type.
818 *
819 * Collisions are allowed anyway -- though best avoided for hash lookup performance -- but in this case
820 * hash<> will just use the pointer's raw value anyway, and in practice that should ensure completely
821 * disjoins sets of hash values for each of the possible stored types. */
822 const auto& type_id = sock_as_any.type();
823 if (type_id == typeid(Peer_socket::Ptr))
824 {
825 return hash<Peer_socket::Ptr>()(any_cast<Peer_socket::Ptr>(sock_as_any));
826 }
827 // else
828 if (type_id == typeid(Server_socket::Ptr))
829 {
830 return hash<Server_socket::Ptr>()(any_cast<Server_socket::Ptr>(sock_as_any));
831 }
832 // else
833
834 assert(sock_as_any.empty());
835 return 0;
836}
837
838bool Event_set::Socket_as_any_equals::operator()(const boost::any& sock_as_any1,
839 const boost::any& sock_as_any2) const
840{
841 using boost::any_cast;
842
843 /* A boost::any can contain an object of almost any type or be empty. Our boost:any's can only be those
844 * from what we load into Sockets sets; currently the following types. Firstly, if the two anys' types
845 * are different, they are obviously different objects. (Note, also, that in practice we'll be storing
846 * only objects of the same type in each Sockets structure; thus that case should rarely, if ever,
847 * come into play.)
848 *
849 * Assuming they are of the same type, just find the correct type and then cast both to it. Now that type's
850 * operator==() can do the job, as it would if we were simply storing a set or map with keys of that type. */
851
852 if (sock_as_any1.type() != sock_as_any2.type())
853 {
854 return false;
855 }
856 // else
857
858 const auto& type_id = sock_as_any1.type();
859
860 if (type_id == typeid(Peer_socket::Ptr))
861 {
862 return any_cast<Peer_socket::Ptr>(sock_as_any1) == any_cast<Peer_socket::Ptr>(sock_as_any2);
863 }
864 // else
865 if (type_id == typeid(Server_socket::Ptr))
866 {
867 return any_cast<Server_socket::Ptr>(sock_as_any1) == any_cast<Server_socket::Ptr>(sock_as_any2);
868 }
869 // else
870
871 assert(sock_as_any1.empty()); // Note: type_id == typeid(void), in this case.
872 assert(sock_as_any2.empty());
873
874 return true;
875}
876
877// Node implementations (dealing with individual Event_sets but also Node state).
878
879// Method implementations.
880
882{
884 // ^-- Call ourselves and return if err_code is null. If got to present line, err_code is not null.
885
888
889 // We are in thread U != W.
890
891 if (!running())
892 {
894 return Event_set::Ptr();
895 }
896 // else
897
898 /* Put the rest of the work into thread W. For justification, see big comment in listen().
899 * Addendum regarding performance: event_set_create() should be pretty rare. */
900
901 // Load this body onto thread W boost.asio work queue. event_set_promise captured by reference, as we will wait.
902 Event_set::Ptr event_set(new Event_set(get_logger()));
903 event_set->m_state = Event_set::State::S_INACTIVE;
904 event_set->m_node = this;
905 event_set->m_baseline_check_pending = false;
906 asio_exec_ctx_post(get_logger(), &m_task_engine, Synchronicity::S_ASYNC_AND_AWAIT_CONCURRENT_COMPLETION, [&]()
907 {
908 // We are in thread W.
909 m_event_sets.insert(event_set);
910 });
911 // If got here, the task has completed in thread W and signaled us to that effect.
912
913 err_code->clear();
914 return event_set;
915} // Node::event_set_create()
916
918 Error_code* err_code)
919{
920 using boost::asio::post;
921 using boost::any;
922
923 // We are in thread U != W.
924
925 if (!running())
926 {
928 return false;
929 }
930 // else
931
932 // See giant comment in Event_set::async_wait(), which just called us, before proceeding.
933
934 // event_set is locked (and will be unlocked after we exit).
935
936 // Check explicitly documented pre-condition.
937 assert(event_set->m_state == Event_set::State::S_INACTIVE);
938
939 FLOW_LOG_TRACE("Event_set [" << event_set << "] wait requested; moving from "
940 "[" << Event_set::State::S_INACTIVE << "] to [" << Event_set::State::S_WAITING << "].");
941
942 // Cock the gun....
943 event_set->m_state = Event_set::State::S_WAITING;
944 event_set->m_on_event = on_event;
945
946 // Clear result sets, so when we get back to S_INACTIVE (events fire), only the events that happened are there.
947 Event_set::clear_ev_type_to_socks_map(&event_set->m_can);
948
949 /* As described in Event_set::async_wait() giant comment, thread W must check all events (sockets
950 * + desired states) of interest (in event_set->m_want) as soon as possible, in case those
951 * conditions already hold. If any hold, it should fire m_on_event() right then. For example,
952 * Peer_socket P in m_want[S_PEER_SOCKET_READABLE] may already be Readable, so this would be immediately detected;
953 * if we don't do that, the Readable status would only be detected later (if ever) if, say, more
954 * DATA packets arrive. */
955
956 post(m_task_engine, [this, event_set]() { event_set_check_baseline_assuming_state(event_set); });
957
958 /* Thread W will invoke this->event_set_check_baseline_assuming_state(event_set) the next time it
959 * gets a free moment. (Might be right NOW, though it won't be able to lock and will wait until
960 * we unlock just below.) */
961
962 /* Corner case: Let P be in m_want[PEER_SOCKET_READABLE]. Assume thread W is executing a handler right now which
963 * gets a DATA packet on socket P. Having saved it into P's Receive buffer, it will check each
964 * Event_set in *this Node. We will unlock shortly, at which point W will note that event_set is
965 * S_WAITING, and P is in m_want[P_S_R]. Therefore it will fire event_set->m_on_event() and make
966 * event_set S_INACTIVE again. This will happen before event_set_check_baseline() runs. When
967 * that runs, it will be in INACTIVE state again and thus not do anything.
968 *
969 * Is this bad? Not really. It just so happens that that sole socket P wins the race and
970 * triggers m_on_event(). This doesn't violate the goal: to report the moment 1 or more events
971 * hold; 1 is indeed "1 or more." However, let's try to avoid situation anyway (be "greedy" for
972 * more ready events at a time). Set event_set->m_baseline_check_pending = true. This will be a
973 * sign to thread W that, when it next examines event_set for a "delta" check (such as upon
974 * receiving the DATA packet in this example), it should instead perform the full baseline check
975 * of all events. Then the queued up event_set_check_baseline() can do nothing if that bool is
976 * false by then; or do the work and set it to false if it wins the race and performs the full check,
977 * in which case the aforementioned "delta" check would just do the "delta" check after all. */
978 event_set->m_baseline_check_pending = true;
979
980 /* That's it. event_set_check_baseline() will execute soon. If it detects no events, thread W
981 * will keep checking for relevant events as states become Readable/Writable/Acceptable/etc. on various
982 * sockets. Eventually it will fire and change m_state from S_WAITING to S_INACTIVE. */
983
984 err_code->clear();
985 return true;
986 // Unlock happens right after this returns.
987} // Node::event_set_async_wait()
988
990{
991 // We are in thread W.
992
993 // Imperative to lock all of event_set. Much access possible from user threads.
994 Event_set::Lock_guard lock(event_set->m_mutex);
995
996 /* event_set_async_wait() placed us onto thread W. When it did so, event_set->m_state ==
997 * S_WAITING (waiting for 1+ events to hold, so we can inform user). However that may have
998 * changed by the time boost.asio was able to get to us. E.g., socket was closed due to error, or
999 * an event was detected before we executed, so we're S_INACTIVE again. So check for that. */
1000
1001 if (event_set->m_state != Event_set::State::S_WAITING)
1002 {
1003 // Unlikely but legitimate and kind of interesting.
1004 FLOW_LOG_TRACE("Event_set [" << event_set << "] baseline check ran, but state is no "
1005 "longer [" << Event_set::State::S_WAITING << "] but [" << event_set->m_state << "] instead.");
1006 return;
1007 }
1008 // else state is WAITING: should actually perform the baseline check.
1009
1010 if (event_set_check_baseline(event_set)) // Assumes m_mutex is already locked.
1011 {
1012 // Some fired events may now be recorded inside event_set. Inform user and go to INACTIVE again if so.
1013 event_set_fire_if_got_events(event_set); // This method assumes m_mutex is already locked.
1014 }
1015} // Node::event_set_check_baseline_assuming_state()
1016
1018{
1019 using boost::any;
1020 using boost::any_cast;
1021
1022 // We are in thread W *or* thread U != W. CAUTION!!! Ensure all operations below can be done from thread U!
1023
1024 // event_set is already locked (pre-condition).
1025
1026 // Check explicitly documented pre-condition.
1027 assert(event_set->m_state == Event_set::State::S_WAITING);
1028
1029 /* Our mission is to simply run through every condition in the event_set->m_want sets and
1030 * check whether it holds. For each one that does, save it in the appropriate event_set->m_got
1031 * set. Then if any did apply, run event_set->m_on_event() to inform the user, and go back to
1032 * INACTIVE state. The background for this is in Event_set::async_wait() and friends.
1033 *
1034 * This is a more thorough (and slow) check than that invoked when a condition of interest
1035 * (e.g., new DATA packet added to Receive buffer) arises. */
1036
1037 if (!event_set->m_baseline_check_pending)
1038 {
1039 /* Check already performed once since event_set became WAITING. Therefore all subsequent checks
1040 * are the faster "delta" checks, invoked when individual conditions of interest become true. */
1041 FLOW_LOG_TRACE("Event_set [" << event_set << "] baseline check ran, but skipping because same check already "
1042 "ran since the last time we entered [" << Event_set::State::S_WAITING << "].");
1043 return false;
1044 }
1045 // else if (!event_set->m_baseline_check_pending)
1046
1047 FLOW_LOG_TRACE("Event_set [" << event_set << "] baseline check started.");
1048 event_set->m_baseline_check_pending = false;
1049
1050 /* For each type of event, run through each event we want to know about if it's true, and if
1051 * it's true, add it to the active events list for that event type. */
1052
1054 "Expecting amortized constant time insertion sockets container."); // Ensure fastness.
1055
1056 /* To get the set of Event_type, use S_EV_TYPE_TO_IS_ACTIVE_NODE_MTD which has them as the key set;
1057 * it also stores the functions of the form `bool Node::does_event_hold(const any& sock_as_any)`, where
1058 * sock_as_any is a boost::any wrapping a socket appropriate to the given Event_type, and the function will
1059 * return whether the condition of that type holds for a given socket of interest. */
1060 for (const auto& ev_type_and_is_active_mtd : Event_set::S_EV_TYPE_TO_IS_ACTIVE_NODE_MTD)
1061 {
1062 const Event_set::Event_type ev_type = ev_type_and_is_active_mtd.first;
1063 const auto& does_event_hold = ev_type_and_is_active_mtd.second;
1064 const Event_set::Sockets& want_set = event_set->m_want[ev_type];
1065 Event_set::Sockets& can_set = event_set->m_can[ev_type];
1066
1067 assert(can_set.empty()); // We will now fill it up.
1068 for (const any& sock_as_any : want_set)
1069 {
1070 if (does_event_hold(this, sock_as_any))
1071 {
1072 FLOW_LOG_TRACE("Event of type [" << ev_type << "] for "
1073 "object [" << Event_set::sock_as_any_to_str(sock_as_any) << "] detected in "
1074 "Event_set [" << event_set << "].");
1075
1076 can_set.insert(sock_as_any);
1077 }
1078 }
1079 }
1080
1081 return true; // This only means the check ran, not that it found anything.
1082} // Node::event_set_check_baseline()
1083
1085{
1086 using boost::algorithm::all_of;
1087
1088 // We are in thread W.
1089
1090 // event_set state involved should already be locked.
1091
1092 // Explicitly documented pre-condition.
1093 assert(event_set->m_state == Event_set::State::S_WAITING);
1094
1095 if (all_of(event_set->m_can, Event_set::ev_type_to_socks_map_entry_is_empty))
1096 {
1097 // Nope, no events ready. Carry on in WAITING state.
1098 return;
1099 }
1100 // else
1101
1102 // If editing this, check interrupt_all_waits_worker() also. That's another path to firing m_on_event().
1103
1104 // Yes! At least one event is ready, so inform via the m_on_event() callback.
1105 FLOW_LOG_TRACE("Event_set [" << event_set << "] has ready events; firing and moving from "
1106 "[" << Event_set::State::S_WAITING << "] to [" << Event_set::State::S_INACTIVE << "] again.");
1107
1108 event_set->m_on_event(false);
1109
1110 // Forget the callback. They set the callback each time async_wait() is called (as an argument to that method).
1111 event_set->m_on_event.clear();
1112
1113 // Gun has fired and is thus no longer cocked.
1114 event_set->m_state = Event_set::State::S_INACTIVE;
1115
1116 /* It's important to understand what happens now. m_on_event() just did whatever the user wants
1117 * (presumably signal the user thread(s) (not W) that event_set is back to INACTIVE state and
1118 * currently holds the active events in event_set->m_can). For example it might set a future
1119 * that the user thread is waiting on; or it might send a Unix domain socket message or local TCP
1120 * message, which the user thread is select()ing on and thus wake it up. The point is, the user
1121 * thread U may already be awake RIGHT NOW and trying to check event_set->m_can for activity.
1122 *
1123 * However, we have not unlocked event_set yet. We soon will; until then U will sit there waiting
1124 * for the unlock (any access to event_set must lock event_set mutex; this is enforced by all
1125 * public Event_set methods). Once acquired, it can freely check it and perform actions like
1126 * receive() or accept(). */
1127} // Node::event_set_fire_if_got_events()
1128
1129void Node::event_set_all_check_delta(bool defer_delta_check)
1130{
1131 // We are in thread W.
1132
1133 using boost::any;
1134 using boost::algorithm::all_of;
1135
1136 /* Short-circuit (avoid logging, unneeded locking, etc.) if no delta events have been noted so
1137 * far. (If baseline check has not run yet, it is queued by boost.asio as
1138 * event_set_check_baseline_assuming_state(), so we need not worry about it here.) */
1139 if (all_of(m_sock_events, Event_set::ev_type_to_socks_map_entry_is_empty))
1140 {
1141 return;
1142 }
1143
1144 FLOW_LOG_TRACE("Possible event of interest occurred; "
1145 "defer_delta_check = [" << defer_delta_check << "]; "
1146 "results set sizes = [" << Event_set::ev_type_to_socks_map_sizes_to_str(m_sock_events) << "].");
1147
1148 if (defer_delta_check)
1149 {
1150 /* Caller believes there's an event_set_all_check_delta(false) call within a "non-blocking"
1151 * amount of time, so we should let more possible events accumulate before possibly signaling
1152 * user. Okay. */
1153 return;
1154 }
1155 // else
1156
1157 /* Delta-check not deferred. We should perform it... but there's a possibility that we haven't
1158 * performed the baseline check yet. As explained in giant comment inside
1159 * Event_set::async_wait(), a delta check must be performed after a baseline check (and assuming
1160 * the latter fired no events); otherwise its results would be incomplete. How we can get to a delta check
1161 * before a baseline check is explained in the comment in Node::event_set_async_wait().
1162 *
1163 * So, if we haven't, perform the baseline check first. Basically it is an exhaustive check of
1164 * all desired events, regardless of what's in the delta-check variables Node::m_sock_events. See that
1165 * below. For purposes of discussion, though, assume we do indeed perform only delta checks.
1166 * Then:
1167 *
1168 * Now we check all WAITING Event_sets. For each one, find any waited-on conditions that have
1169 * indeed occurred since the last time we did this; these are accumulated in this->m_sock_events. Any
1170 * that did indicate that at least one event of interest has occurred, and thus we should mark all
1171 * of them down in the appropriate Event_set(s) and signal those waiting on these Event_sets.
1172 *
1173 * Note that we check ALL registered Event_sets. This seems a little crude/inefficient. How to
1174 * avoid it? Basically we'd need to map each (Peer/Server_socket, event type) pair to any
1175 * containing, waiting Event_set(s), so that we can quickly go from the pair to the Event_set.
1176 * The problem is synchronization (since sockets are added to Event_sets, and
1177 * Event_set::async_wait() is called, by user threads, not W). I thought about this pretty hard
1178 * and was unable to come up with anything elegant that would avoid possible deadlocks w/r/t each
1179 * Event_set's individual m_mutex as well. I thought and thought and thought and eventually
1180 * realized that simply iterating over all Event_sets and handling one at a time sidesteps all
1181 * such complexities. Is this slow? Firstly there's the iteration over all Event_sets. How many
1182 * are there? Realistically, there's probably one per user thread, of which there would be about
1183 * as many as CPU cores. That is not a lot. So take one Event_set. For each active socket in
1184 * this->m_sock_events[...], we just look it up in the Event_set's appropriate m_want[...]; since we store the
1185 * latter in a hash table, that's a constant time search. If we did have the socket-to-Event_set
1186 * map (and figured out a working synchronization scheme), that would also be just one
1187 * constant-time search. So without the nice mapping being available, the performance is only
1188 * slowed down by having to look at each Event_set, and there are not many in practice, and an
1189 * extra constant-time lookup beyond that. So it's fine. */
1190
1191 for (Event_set::Ptr event_set : m_event_sets)
1192 {
1193 // As explained above, work on one Event_set at a time. Lock it.
1194 Event_set::Lock_guard lock(event_set->m_mutex);
1195
1196 if (event_set->m_state != Event_set::State::S_WAITING)
1197 {
1198 continue;
1199 }
1200 // else
1201
1202 // As explained above, perform the baseline check if necessary.
1203 if (!event_set_check_baseline(event_set)) // Method assumes m_mutex already locked.
1204 {
1205 // Wasn't necessary (mainstream case). Perform delta check.
1206
1207 FLOW_LOG_TRACE("Event_set [" << event_set << "] delta check started.");
1208
1209 /* For each type of event, run through each event we want to know about if it's true, and if
1210 * it's true (is in Node::m_sock_events), add it to the active events list for that event type.
1211 *
1212 * Note that if some event is desired in two Event_sets, and it has occurred, both Event_sets
1213 * will fire, as advertised. It also means the user is a crazy mother, but that's not our
1214 * problem.... */
1215
1216 for (auto& ev_type_and_socks_from_node : m_sock_events)
1217 {
1218 const Event_set::Event_type ev_type = ev_type_and_socks_from_node.first;
1219 const Event_set::Sockets& all_can_set = ev_type_and_socks_from_node.second;
1220
1221 Event_set::Sockets& can_set = event_set->m_can[ev_type];
1222 Event_set::Sockets& want_set = event_set->m_want[ev_type];
1223
1224 assert(can_set.empty());
1225
1226 /* all_can_set is the set of all sockets for which the event has occurred, since the last baseline
1227 * or delta check on event_set. Our goal is to load can_set (which is a field in
1228 * event_set) with all sockets for which the event has occurred AND are in want_set (a field in
1229 * event_set containing the sockets for which the user is interested in this event). Therefore,
1230 * what we want is the set intersection of want_set and all_can_set; and we want to put the result
1231 * into can_set.
1232 *
1233 * Both want_set and all_can_set have O(1) search. Therefore the fastest way to compute the
1234 * intersection is to pick the smaller set; iterate through each element; and save the given
1235 * element into can_set if it is present in the other set. */
1236
1237 // Ensure fastness just below.
1239 "Expecting amortized constant time search sockets container.");
1241 "Expecting amortized constant time insertion sockets container.");
1242
1243 const bool want_set_smaller = want_set.size() < all_can_set.size();
1244 const Event_set::Sockets& small_set = want_set_smaller ? want_set : all_can_set;
1245 const Event_set::Sockets& large_set = want_set_smaller ? all_can_set : want_set;
1246
1247 for (const any& sock_as_any : small_set)
1248 {
1249 if (util::key_exists(large_set, sock_as_any))
1250 {
1251 FLOW_LOG_TRACE("Event of type [" << ev_type << "] for "
1252 "socket object [" << Event_set::sock_as_any_to_str(sock_as_any) << "] detected in "
1253 "Event_set [" << event_set << "].");
1254
1255 can_set.insert(sock_as_any);
1256 }
1257 }
1258 } // for ((ev_type, all_can_set) : m_sock_events)
1259 } // if (!event_set_check_baseline(event_set))
1260
1261 /* Was necessary to do baseline check. Thus skip delta check. Shouldn't we perform the delta check? No.
1262 * The baseline check is a superset of the delta check: it simply checks each desired event's
1263 * socket for the desired condition. Thus performing a delta check would be useless. */
1264
1265 // If either check detected active events, fire this event_set (and thus move it to INACTIVE).
1266 event_set_fire_if_got_events(event_set); // Method assumes m_mutex already locked.
1267 } // for (all Event_sets in m_event_sets)
1268
1269 // Each Event_set has been handled. So begin a clean slate for the next delta check.
1271} // Node::event_set_all_check_delta()
1272
1274{
1277 using boost::adopt_lock;
1278
1279 // We are in user thread U != W.
1280
1281 if (!running())
1282 {
1284 return;
1285 }
1286 // else
1287
1288 // Pre-condition is that m_mutex is locked already.
1289
1290 {
1291 /* WARNING!!! event_set->m_mutex is locked, but WE must unlock it before returning! Can't
1292 * leave that to the caller, because we must unlock at a specific point below, right before
1293 * post()ing event_set_close_worker() onto thread W. Use a Lock_guard that adopts an
1294 * already-locked mutex. */
1295 Event_set::Lock_guard lock(event_set->m_mutex, adopt_lock);
1296
1297 /* Put the rest of the work into thread W. For justification, see big comment in listen().
1298 * Addendum regarding performance: close_abruptly() is probably called more frequently than
1299 * listen(), but I doubt the performance impact is serious even so. send() and receive() might be
1300 * a different story. */
1301
1302 // We're done -- must unlock so that thread W can do what it wants to with event_set.
1303 } // lock
1304
1305 // Load this bad boy onto thread W boost.asio work queue. Safe to capture by &reference, as we will wait.
1306 asio_exec_ctx_post(get_logger(), &m_task_engine, Synchronicity::S_ASYNC_AND_AWAIT_CONCURRENT_COMPLETION, [&]()
1307 {
1308 // We are in thread W. event_set_close() is waiting for us to set close_promise in thread U.
1309
1310 // Something like async_wait_finish() may be setting event_set->m_state or other things... must lock.
1311 Event_set::Lock_guard lock(event_set->m_mutex);
1312
1313 /* Since we were placed onto thread W, another handler may have been executed before boost.asio
1314 * got to us. Therefore we may already be S_CLOSED. Detect this. */
1315
1316 if (event_set->m_state == Event_set::State::S_CLOSED) // No need to lock: only W can write to this.
1317 {
1318 // Yep, already closed. Done.
1320 return;
1321 }
1322 // else actually do it.
1323
1324 // m_mutex locked (pre-condition).
1325 event_set_close_worker(event_set);
1326 err_code->clear(); // Success.
1327 }); // asio_exec_ctx_post()
1328 // If got here, the task has completed in thread W and signaled us to that effect.
1329
1330} // Node::event_set_close()
1331
1333{
1334 // We are in thread W.
1335
1336 // Everything already locked (explicit pre-condition).
1337
1338 // Check explicitly documented pre-condition.
1339 assert((event_set->m_state == Event_set::State::S_INACTIVE)
1340 || (event_set->m_state == Event_set::State::S_WAITING));
1341
1342 /* If INACTIVE:
1343 * event_set is not currently being waited on; it's in the phase where the set of desired events
1344 * may be being changed, and/or the last set of fired events may be being examined; no on-event
1345 * behavior could be triggered in any case. Therefore we simply go to CLOSED state and remove
1346 * event_set from the master list with no potential for controversy.
1347 *
1348 * If WAITING:
1349 * We advertised in the doc comment, that we will NOT execute any on-event behavior in close().
1350 * Is this OK? There is a chance of controversy. There is a wait happening in some thread U
1351 * != W. We are currently (chronologically speaking) in the middle of some thread U' != W
1352 * calling event_set->close(). Let U = U' (the typical case). Then they (in thread U) must
1353 * have called event_set->async_wait() and then event_set->close(). In that case, unless
1354 * they're dummies, they should know that, since they're closing the thing, the wait() is to be
1355 * abandoned and will never fire on-event behavior. Therefore, as above, we just go straight to
1356 * CLOSED state and not invoke any on-event behavior. Now let U != U' (atypical in general;
1357 * normally one would use a given Event_set in only one thread). Well, one thread executed
1358 * async_wait(), while another executed async_close() afterwards. Could that screw the former?
1359 * If the user is unaware that he might do that to himself, yes... but that is not our problem.
1360 * Therefore, OK to not perform on-event behavior and just close in this case also. */
1361
1362 // First, set various state in *event_set.
1363
1364 // This can be quite frequent if program uses many sync_*() methods. Use TRACE.
1365 FLOW_LOG_TRACE("Closing Event_set [" << event_set << "]: changing state "
1366 "from [" << event_set->m_state << "] to [" << Event_set::State::S_CLOSED << "].");
1367
1368 assert(event_set->m_state != Event_set::State::S_CLOSED);
1369 event_set->m_state = Event_set::State::S_CLOSED;
1370 event_set->m_node = 0; // Maintain invariant.
1371
1372 // Free resources. In particular, after Event_set close, user won't be able to see last set of fired events.
1373 Event_set::clear_ev_type_to_socks_map(&event_set->m_want);
1374 Event_set::clear_ev_type_to_socks_map(&event_set->m_can);
1375
1376 /* Forget the handler callback. What if there's an outstanding async_wait()? Well,
1377 * m_on_event() will NOT be called unless state is WAITING, and it won't be WAITING; it will be
1378 * CLOSED once this critical section is over. So that outstanding async_wait() will never
1379 * "finish." However, this is explicitly documented on the public close() method. Node shutdown
1380 * will avoid this problem by first closing each socket, thus waking up any waits, before
1381 * performing Event_set::close(). */
1382 event_set->m_on_event.clear();
1383
1384 // Then remove it from the master list.
1385#ifndef NDEBUG
1386 const bool erased = 1 ==
1387#endif
1388 m_event_sets.erase(event_set);
1389 assert(erased); // If was not CLOSED, yet was not in m_event_sets. So it is a serious bug somewhere.
1390} // Node::event_set_close_worker()
1391
1393{
1394 using boost::asio::post;
1395
1397 ([this](Error_code* actual_err_code) { interrupt_all_waits(actual_err_code); },
1398 err_code, FLOW_UTIL_WHERE_AM_I_STR()))
1399 {
1400 return;
1401 }
1402 // else
1403
1404 // We are in thread U != W.
1405
1406 if (!running())
1407 {
1409 return;
1410 }
1411 // else
1412
1413 /* Put the rest of the work into thread W. For justification, see big comment in listen().
1414 * Addendum regarding performance: interrupt_all_waits() should be pretty rare. */
1415
1416 post(m_task_engine, [this]() { interrupt_all_waits_worker(); });
1417
1418 err_code->clear();
1419} // Node::interrupt_all_waits()
1420
1422{
1423 // We are in thread W.
1424
1425 FLOW_LOG_INFO("Executing request to interrupt all waiting Event_sets.");
1426
1427 for (Event_set::Ptr event_set : m_event_sets)
1428 {
1429 // Work on one Event_set at a time. Lock it.
1430 Event_set::Lock_guard lock(event_set->m_mutex);
1431
1432 if (event_set->m_state == Event_set::State::S_WAITING)
1433 {
1434 /* The code here is based on event_set_fire_if_got_events(). This is another type of an event being "ready";
1435 * it's just that it's ready because of it being interrupted instead of due to the actual waited-on condition
1436 * becoming true. Keeping comments light to avoid redundancy/maintenance issues.
1437 *
1438 * If editing this, check event_set_fire_if_got_events() also. */
1439
1440 FLOW_LOG_INFO("Event_set [" << event_set << "] is being interrupted; firing and moving from "
1441 "[" << Event_set::State::S_WAITING << "] to [" << Event_set::State::S_INACTIVE << "] again.");
1442
1443 /* As promised, the interrupted case means these will be empty. The actual calls may not be necessary, as maybe
1444 * adding elements to these always immediately (within same m_mutex locking) leads to going to S_INACTIVE state,
1445 * meaning the present code would not be executing. However, no need to rely on that; quite possibly we could
1446 * have code that would load stuff into event_set->m_can and only go to S_WAITING in another callback. */
1447 Event_set::clear_ev_type_to_socks_map(&event_set->m_can);
1448
1449 event_set->m_on_event(true); // true means firing due to interruption, not due to waited-on condition being true.
1450 event_set->m_on_event.clear();
1451 event_set->m_state = Event_set::State::S_INACTIVE;
1452 }
1453 } // for (all Event_sets in m_event_sets)
1454} // Node::interrupt_all_waits_worker()
1455
1456void Node::interrupt_all_waits_internal_sig_handler(const Error_code& sys_err_code, int sig_number)
1457{
1458 // We are in thread W. (Yes. This is a contract of this function.)
1459
1460 if (sys_err_code == boost::asio::error::operation_aborted)
1461 {
1462 return; // Stuff is shutting down; just get out.
1463 }
1464 // else
1465
1466 FLOW_LOG_INFO("Internal interrupt signal handler executed with signal number [" << sig_number << "].");
1467
1468 if (sys_err_code)
1469 {
1470 // This is odd, but there's no need to freak out about anything else. Just log and get out.
1472 FLOW_LOG_WARNING("Internal signal handler executed with unexpected error indicator. Strange! "
1473 "Ignoring and continuing other operation.");
1474 }
1475 else
1476 {
1477 /* To the user, this default signal handler is just supposed to call Node::interrupt_all_waits().
1478 * It's a convenience thing, in case they don't want to set up their own signal handler that'll do that, and also
1479 * they don't need any of their own custom signal handling that may have nothing to do with net_flow at all.
1480 * Actually they can even have the latter -- as long as it uses signal_set also.
1481 * (If they do need some other handling, they MUST disable this feature via Node_options. Then we wouldn't be
1482 * called.) */
1483
1484 // This must be run in thread W, and indeed we are in thread W by contract. It'll log further, so just call it.
1485 interrupt_all_waits_worker();
1486 }
1487
1488 // Wait for it again, or else this will work only the first time signal is sent.
1489 m_signal_set.async_wait([this](const Error_code& sys_err_code, int sig_num)
1490 {
1491 interrupt_all_waits_internal_sig_handler(sys_err_code, sig_num);
1492 });
1493} // Node::interrupt_all_waits_internal_sig_handler()
1494
1495// Free implementations.
1496
1497/// @cond
1498/* -^- Doxygen, please ignore the following. (Don't want docs generated for temp macro; this is more maintainable
1499 * than specifying the macro name to omit it, in Doxygen-config EXCLUDE_SYMBOLS.) */
1500
1501// That's right, I did this. Wanna fight about it?
1502#define STATE_TO_CASE_STATEMENT(ARG_state) \
1503 case Event_set::State::S_##ARG_state: \
1504 return os << #ARG_state
1505
1506// -v- Doxygen, please stop ignoring.
1507/// @endcond
1508
1509std::ostream& operator<<(std::ostream& os, Event_set::State state)
1510{
1511 switch (state)
1512 {
1513 STATE_TO_CASE_STATEMENT(INACTIVE);
1514 STATE_TO_CASE_STATEMENT(WAITING);
1515 STATE_TO_CASE_STATEMENT(CLOSED);
1516 }
1517 return os;
1518#undef STATE_TO_CASE_STATEMENT
1519}
1520
1521/// @cond
1522// -^- Doxygen, please ignore the following. (Same deal as just above.)
1523
1524// That's right, I did this. Wanna fight about it?
1525#define TYPE_TO_CASE_STATEMENT(ARG_type) \
1526 case Event_set::Event_type::S_##ARG_type: \
1527 return os << #ARG_type
1528
1529// -v- Doxygen, please stop ignoring.
1530/// @endcond
1531
1532std::ostream& operator<<(std::ostream& os, Event_set::Event_type ev_type)
1533{
1534
1535 switch (ev_type)
1536 {
1537 TYPE_TO_CASE_STATEMENT(PEER_SOCKET_READABLE);
1538 TYPE_TO_CASE_STATEMENT(PEER_SOCKET_WRITABLE);
1539 TYPE_TO_CASE_STATEMENT(SERVER_SOCKET_ACCEPTABLE);
1540 }
1541 return os;
1542#undef TYPE_TO_CASE_STATEMENT
1543}
1544
1545} // namepace flow::net_flow
Logger * get_logger() const
Returns the stored Logger pointer, particularly as many FLOW_LOG_*() macros expect.
Definition: log.cpp:224
Interface that the user should implement, passing the implementing Logger into logging classes (Flow'...
Definition: log.hpp:1291
bool operator()(const boost::any &sock_as_any1, const boost::any &sock_as_any2) const
Returns whether the two objects, which must be stored in Sockets objects, are equal by value.
Definition: event_set.cpp:838
size_t operator()(const boost::any &sock_as_any) const
Returns hash value of the given object which must be stored in a Sockets object.
Definition: event_set.cpp:809
Mutex m_mutex
Mutex protecting ALL data in this object.
Definition: event_set.hpp:937
static void clear_ev_type_to_socks_map(Ev_type_to_socks_map *ev_type_to_socks_map)
Helper that clears each Sockets set inside an Ev_type_to_socks_map.
Definition: event_set.cpp:747
bool sync_wait(Error_code *err_code=0)
Blocks indefinitely until one or more of the previously described events hold – or the wait is interr...
Definition: event_set.cpp:458
bool sync_wait_impl(const Fine_duration &max_wait, Error_code *err_code)
Same as the public sync_wait(max_wait) but uses a Fine_clock-based Fine_duration non-template type fo...
Definition: event_set.cpp:336
Event_type
Type of event or condition of interest supported by class Event_set.
Definition: event_set.hpp:307
@ S_PEER_SOCKET_WRITABLE
Event type specifying the condition of interest wherein a target Peer_socket sock is such that callin...
@ S_PEER_SOCKET_READABLE
Event type specifying the condition of interest wherein a target Peer_socket sock is such that callin...
@ S_SERVER_SOCKET_ACCEPTABLE
Event type specifying the condition of interest wherein a target Server_socket serv is such that call...
static std::string ev_type_to_socks_map_sizes_to_str(const Ev_type_to_socks_map &ev_type_to_socks_map)
Helper that returns a loggable string summarizing the sizes of the socket sets, by type,...
Definition: event_set.cpp:761
State
A state of an Event_set.
Definition: event_set.hpp:260
@ S_WAITING
Waiting state: valid Event_set that is currently waiting on previously described events.
@ S_CLOSED
Node has disowned the Peer_socket; all further operations will result in error.
@ S_INACTIVE
Default state; valid Event_set that is not currently waiting on events.
bool async_wait_finish(Error_code *err_code=0)
Moves object from State::S_WAITING to State::S_INACTIVE, and forgets any handler saved by async_wait(...
Definition: event_set.cpp:215
bool events_detected(Error_code *err_code=0) const
Returns true if and only if the last wait, if any, detected at least one event.
Definition: event_set.cpp:576
bool poll(Error_code *err_code=0)
Checks for all previously described events that currently hold, saves them for retrieval via emit_res...
Definition: event_set.cpp:266
bool async_wait(const Event_handler &on_event, Error_code *err_code=0)
Moves object to State::S_WAITING state, saves the given handler to be executed later (in a different,...
Definition: event_set.cpp:72
State m_state
See state(). Should be set before user gets access to *this. Must not be modified by non-W threads af...
Definition: event_set.hpp:901
util::Linked_hash_set< boost::any, Socket_as_any_hash, Socket_as_any_equals > Sockets
A set of sockets of one type, used to communicate sets of desired and resulting events in various Eve...
Definition: event_set.hpp:372
Event_handler m_on_event
During State::S_WAITING, stores the handler (a void function with 1 bool argument) that will be calle...
Definition: event_set.hpp:926
bool swap_wanted_sockets(Sockets *target_set, Event_type ev_type, Error_code *err_code)
Efficiently exchanges the current set of sockets we want to know are "ready" by the definiton of the ...
Definition: event_set.cpp:493
bool emit_result_sockets(Sockets *target_set, Event_type ev_type, Error_code *err_code=0)
Gets the sockets that satisfy the condition of the given Event_type detected during the last wait.
Definition: event_set.cpp:613
static Ev_type_to_socks_map empty_ev_type_to_socks_map()
Creates a maximally empty Ev_type_to_socks_map: it will have all possible Event_type as keys but only...
Definition: event_set.cpp:736
Event_set(log::Logger *logger_ptr)
Constructs object; initializes all values to well-defined but possibly meaningless values (0,...
Definition: event_set.cpp:42
util::Lock_guard< Mutex > Lock_guard
Short-hand for RAII lock guard of Mutex. Use instead of boost::lock_guard for release() at least.
Definition: event_set.hpp:791
State state() const
Current State of the Event_set.
Definition: event_set.cpp:60
bool clear_wanted_sockets(Event_type ev_type, Error_code *err_code=0)
Identical to swap_wanted_sockets(&sockets, ev_type, err_code), where originally sockets is empty and ...
Definition: event_set.cpp:527
bool ok_to_mod_socket_set(Error_code *err_code) const
Helper that ensures the state of *this is such that one may modify the m_can and m_want socket sets.
Definition: event_set.cpp:675
bool events_wanted(Error_code *err_code=0) const
Returns true if and only if at least one wanted event for at least one socket is registered (via add_...
Definition: event_set.cpp:553
void close(Error_code *err_code=0)
Clears all stored resources (any desired events, result events, and any handler saved by async_wait()...
Definition: event_set.cpp:464
static const boost::unordered_map< Event_type, Function< bool(const Node *, const boost::any &)> > S_EV_TYPE_TO_IS_ACTIVE_NODE_MTD
Mapping from each possible Event_type to the Node method that determines whether the condition define...
Definition: event_set.hpp:897
Node * node() const
Node that produced this Event_set.
Definition: event_set.cpp:66
bool clear(Error_code *err_code=0)
Forgets all sockets stored in this object in any fashion.
Definition: event_set.cpp:705
Ev_type_to_socks_map m_can
The sockets, categorized by Event_type of interest, that were found to be "ready" (as defined in the ...
Definition: event_set.hpp:920
~Event_set()
Boring destructor. Note that deletion is to be handled exclusively via shared_ptr,...
Definition: event_set.cpp:54
bool m_baseline_check_pending
While in State::S_WAITING, if this is true, an exhaustive check of all desired events is yet to be pe...
Definition: event_set.hpp:934
static std::string sock_as_any_to_str(const boost::any &sock_as_any)
Helper that returns a loggable string representing the socket stored in the given boost::any that sto...
Definition: event_set.cpp:777
std::ostream & operator<<(std::ostream &os, Event_set::State state)
Prints string representation of given Event_set state to given standard ostream and returns the latte...
Definition: event_set.cpp:1509
static bool ev_type_to_socks_map_entry_is_empty(const Ev_type_to_socks_map::Value &ev_type_and_socks)
Functional helper that checks whether a given pair in an Ev_type_to_socks_map contains an empty set o...
Definition: event_set.cpp:756
Ev_type_to_socks_map m_want
The sockets, categorized by Event_type of interest, to check for "ready" status (as defined in the do...
Definition: event_set.hpp:910
Node * m_node
See node(). Should be set before user gets access to *this. Must not be modified by non-W threads aft...
Definition: event_set.hpp:904
bool clear_result_sockets(Event_type ev_type, Error_code *err_code=0)
Identical to emit_result_sockets(&sockets, ev_type, err_code), where originally sockets is empty and ...
Definition: event_set.cpp:649
util::Linked_hash_map< Event_type, Sockets > Ev_type_to_socks_map
Short-hand for type storing a set of socket sets – one per possible Event_type enum value.
Definition: event_set.hpp:798
An object of this class is a single Flow-protocol networking node, in the sense that: (1) it has a di...
Definition: node.hpp:937
bool sock_is_writable(const boost::any &sock_as_any) const
Returns true if and only if calling sock->send() with at least some arguments would return either non...
void interrupt_all_waits(Error_code *err_code=0)
Interrupts any blocking operation, a/k/a wait, and informs the invoker of that operation that the blo...
Definition: event_set.cpp:1392
bool event_set_check_baseline(Event_set::Ptr event_set)
Checks each desired (Event_set::m_want) event in event_set; any that holds true is saved into event_s...
Definition: event_set.cpp:1017
void event_set_close(Event_set::Ptr event_set, Error_code *err_code)
Implementation of Event_set::close() when Event_set::state() != Event_set::State::S_CLOSED for event_...
Definition: event_set.cpp:1273
void event_set_check_baseline_assuming_state(Event_set::Ptr event_set)
Helper placed by event_set_async_wait() onto thread W to invoke event_set_check_baseline() but first ...
Definition: event_set.cpp:989
bool serv_is_acceptable(const boost::any &serv_as_any) const
Returns true if and only if calling serv->accept() with at least some arguments would return either n...
void event_set_close_worker(Event_set::Ptr event_set)
The guts of event_set_close_worker_check_state(): same thing, but assumes Event_set::state() == Event...
Definition: event_set.cpp:1332
void event_set_all_check_delta(bool defer_delta_check)
For each WAITING Event_set within the Node: checks for any events that hold, and if any do hold,...
Definition: event_set.cpp:1129
bool event_set_async_wait(Event_set::Ptr event_set, const Event_set::Event_handler &on_event, Error_code *err_code)
Implementation of Event_set::async_wait() when Event_set::state() == Event_set::State::S_INACTIVE.
Definition: event_set.cpp:917
bool sock_is_readable(const boost::any &sock_as_any) const
Returns true if and only if calling sock->receive() with at least some arguments would return either ...
Event_set::Ptr event_set_create(Error_code *err_code=0)
Creates a new Event_set in Event_set::State::S_INACTIVE state with no sockets/events stored; returns ...
Definition: event_set.cpp:881
void interrupt_all_waits_worker()
Thread W implementation of interrupt_all_waits().
Definition: event_set.cpp:1421
void interrupt_all_waits_internal_sig_handler(const Error_code &sys_err_code, int sig_number)
signal_set handler, executed on SIGINT and SIGTERM, if user has enabled this feature: causes interrup...
Definition: event_set.cpp:1456
void event_set_fire_if_got_events(Event_set::Ptr event_set)
Check whether given Event_set contains any active sockets (Event_set::m_can); if so,...
Definition: event_set.cpp:1084
Properties of various container types.
Definition: traits.hpp:43
size_type size() const
Returns number of elements stored.
An object of this class is a set that combines the lookup speed of an unordered_set<> and ordering an...
bool empty() const
Returns true if and only if container is empty.
void swap(Linked_hash_set &other)
Swaps the contents of this structure and other.
std::pair< Iterator, bool > insert(Value const &key)
Attempts to insert the given key into the set.
void clear()
Makes it so that size() == 0.
size_type size() const
Returns number of elements stored.
boost::shared_ptr< Peer_socket > Ptr
Short-hand for ref-counted pointer to mutable values of type Target_type::element_type (a-la T*).
#define FLOW_ERROR_SYS_ERROR_LOG_WARNING()
Logs a warning about the (often errno-based or from a library) error code in sys_err_code.
Definition: error.hpp:269
#define FLOW_ERROR_EXEC_AND_THROW_ON_ERROR(ARG_ret_type, ARG_method_name,...)
Narrow-use macro that implements the error code/exception semantics expected of most public-facing Fl...
Definition: error.hpp:357
#define FLOW_ERROR_EMIT_ERROR(ARG_val)
Sets *err_code to ARG_val and logs a warning about the error using FLOW_LOG_WARNING().
Definition: error.hpp:202
#define FLOW_LOG_INFO(ARG_stream_fragment)
Logs an INFO message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
Definition: log.hpp:197
#define FLOW_LOG_WARNING(ARG_stream_fragment)
Logs a WARNING message into flow::log::Logger *get_logger() with flow::log::Component get_log_compone...
Definition: log.hpp:152
#define FLOW_LOG_TRACE(ARG_stream_fragment)
Logs a TRACE message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
Definition: log.hpp:227
Synchronicity
Enumeration indicating the manner in which asio_exec_ctx_post(), and various boost....
Definition: async_fwd.hpp:223
void asio_exec_ctx_post(log::Logger *logger_ptr, Execution_context *exec_ctx, Synchronicity synchronicity, Task &&task)
An extension of boost.asio's post() and dispatch() free function templates, this free function templa...
Definition: util.hpp:31
bool exec_void_and_throw_on_error(const Func &func, Error_code *err_code, util::String_view context)
Equivalent of exec_and_throw_on_error() for operations with void return type.
Definition: error.hpp:168
@ S_EVENT_SET_RESULT_CHECK_WHEN_WAITING
Attempted to check wait results while still waiting.
@ S_EVENT_SET_IMMUTABLE_WHEN_WAITING
Attempted to write to an event set, while a wait operation was pending on that event set.
@ S_EVENT_SET_NO_EVENTS
Attempted to wait on an event set without specifying event on which to wait.
@ S_WAIT_INTERRUPTED
A blocking (sync_) or background-blocking (async_) operation was interrupted, such as by a signal.
@ S_EVENT_SET_DOUBLE_WAIT_OR_POLL
Attempted to wait on or poll an event set while already waiting on that event set.
@ S_EVENT_SET_CLOSED
Attempted operation on an event set, when that event set was closed.
@ S_NODE_NOT_RUNNING
Node not running.
Flow module containing the API and implementation of the Flow network protocol, a TCP-inspired stream...
Definition: node.cpp:25
bool key_exists(const Container &container, const typename Container::key_type &key)
Returns true if and only if the given key is present at least once in the given associative container...
Definition: util.hpp:276
void ostream_op_to_string(std::string *target_str, T const &... ostream_args)
Writes to the specified string, as if the given arguments were each passed, via << in sequence,...
Definition: util.hpp:342
boost::system::error_code Error_code
Short-hand for a boost.system error code (which basically encapsulates an integer/enum error code and...
Definition: common.hpp:502
Flow_log_component
The flow::log::Component payload enumeration comprising various log components used by Flow's own int...
Definition: common.hpp:632
Fine_clock::duration Fine_duration
A high-res time duration as computed from two Fine_time_pts.
Definition: common.hpp:410
#define FLOW_UTIL_WHERE_AM_I_STR()
Same as FLOW_UTIL_WHERE_AM_I() but evaluates to an std::string.
Definition: util_fwd.hpp:971