Flow 1.0.0
Flow project: Full implementation reference.
event_set.hpp
Go to the documentation of this file.
1/* Flow
2 * Copyright 2023 Akamai Technologies, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the
5 * "License"); you may not use this file except in
6 * compliance with the License. You may obtain a copy
7 * of the License at
8 *
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in
12 * writing, software distributed under the License is
13 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
14 * CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing
16 * permissions and limitations under the License. */
17
18/// @file
19#pragma once
20
23#include "flow/util/traits.hpp"
25#include <boost/shared_ptr.hpp>
26#include <boost/enable_shared_from_this.hpp>
27#include <boost/any.hpp>
28
29namespace flow::net_flow
30{
31// Types.
32
33/**
34 * A user-set collection of sockets and desired conditions on those sockets (such as: "socket has data
35 * to read"), with the ability to wait for those conditions to become true and signal the user when so.
36 *
37 * @note Important: With the somewhat more recent availability of the asio::Node hierarchy, which slickly adds
38 * boost.asio-event-loop-integration to the Node hierarchy, public use of this class Event_set should be avoided
39 * for those already using a boost.asio event loop. It is both significantly simpler -- and a bit faster --
40 * to use Peer_socket::async_send(), Server_socket::async_accept(), etc. -- as well as a bit faster.
41 * IN PARTICULAR, the complex discussion in Event_set doc header regarding async_wait() with a glue socket
42 * is something one should skip over if one could just integrate directly into a boost.asio loop.
43 * @note Advice: Even if you don't already use a boost.asio event loop, consider doing so before giving up and going for
44 * an Event_set glue pattern as described in this class doc header. Your application may be sigificantly simpler
45 * and more maintainable as a result.
46 * @note Advice: In general, please read the bird's eye view of the entire set of I/O ops in `net_flow`; this is found
47 * in asio::Node doc header. This should help hugely in choosing the right type of operating mode in the context
48 * of a non-trivial application, possibly featuring other types of I/O in addition to the Flow protocol.
49 * @note All that said, Event_set is a very important facility and is, at the very least, built-upon internally for all
50 * but the basic non-blocking ops. Advanced users should understand it (even if they don't use it directly).
51 *
52 * This fulfills the same role as BSD sockets' `select()` (and its more powerful non-portable
53 * equivalents like epoll, kqueue, etc.). In addition to the feature set provided by functions such
54 * as `select()`, Event_set provides a way to work together with the OS's built-in `select()`
55 * equivalent; that is to say it provides a way to add Flow-protocol sockets into an event loop written in
56 * terms of `select()` or spritiually similar facilities (epoll, kqueue, etc.).
57 *
58 * The simplest way to use Event_set is to not use it directly at all. Instead, use the blocking
59 * methods such as Peer_socket::sync_receive(), Peer_socket::sync_send(), Node::sync_connect(), and
60 * Server_socket::sync_accept(). Such methods will internally construct an Event_set with the
61 * target socket and perform work (namely some Event_set::sync_wait() calls) on it, hidden from the caller.
62 * This mode of operation is analogous to BSD sockets' blocking functions (e.g., `recv()` in blocking
63 * mode -- POSIX `O_NONBLOCK` bit set / WinSock `FIONBIO` mode is enabled).
64 *
65 * This is often insufficient, as in event loops of any complexity. The next simplest way to use
66 * Event_set is to use a synchronous wait. To do so, construct an Event_set, add the desired
67 * <em>(socket, desired event on that socket)</em> pairs to it (see the swap_wanted_sockets(), add_wanted_socket(),
68 * remove_wanted_socket() methods), then perform sync_wait() (with an optional timeout). Once that method
69 * returns (which happens once either 1 or more events are active, there is a timeout, or wait is
70 * interrupted via Node::interrupt_all_waits()), examine which events (if any) were found to be active, then perform
71 * the appropriate operation (e.g., `serv->accept()` if Acceptable, `sock->send()` if Writable) on each active
72 * socket/event pair. This mode of operation is analogous to `FD_SET(); select(); FD_ISSET()` or
73 * `epoll_ctl(); epoll_wait()`. The key feature is that the sync_wait() (like `select()` or `epoll_wait()`) call blocks
74 * the calling thread until there are events or a timeout or a global interrupt (the latter similar to POSIX `EINTR`
75 * caused by a signal). Additionally, the Event_set::poll() method is analogous to a `select()` call with a 0 timeout.
76 *
77 * The above is sufficient when writing an event loop that works *only* with Flow-protocol sockets. However,
78 * many practical applications will use an event loop that works with Flow-protocol sockets and other
79 * resources (e.g., TCP sockets, pipes) simultaneously. Then what?
80 * - If you have a boost.asio event loop, then (again) the best way to use Event_set is by not using it directly;
81 * instead use asio::Node hierarchy to use Flow-protocol sockets as 1st-class-citizen boost.asio-style I/O objects.
82 * Internally, again, an Event_set will be indirectly used; but one would not use it directly.
83 * - Otherwise -- if there is no boost.asio event loop, and it's impractical to convert to using one (generally a
84 * a good idea but may not be practical under time constraints), then direct use the core Event_set feature
85 * (namely Event_set::async_wait()) is just the ticket.
86 *
87 * We now discuss Event_set::async_wait(). The basic deficiency of sync_wait() is simply that it will only wake up
88 * if there's a Flow-protocol event and knows nothing about anything
89 * else. Similarly, your `select()`-or-equivalent-of-choice
90 * will know nothing of Flow-protocol events, since it can only work with standard file descriptors
91 * (or whatever), not Flow sockets. For this case Event_set provides asynchronous waiting. As
92 * before, you'll construct an Event_set and add some desired sockets/events. Then, call
93 * `async_wait(F)`, where `F()` is a function. async_wait() will IMMEDIATELY return. However, another,
94 * unspecified, thread will wait for the desired condition(s) to become true. Once that occurs,
95 * that unspecified thread will call `F()`. `F()` can do whatever you want (but see restrictions
96 * documented on async_wait()); however in the use case described in this paragraph it will probably
97 * want to do something that will wake up the `select()` (or `epoll_wait()`, or ...) of your main event
98 * loop. For example, you can set up a Unix domain socket or pipe and add it into your `select()` (or
99 * equivalent's) event set; and then have `F()` write 1 byte to it. Once this occurs, that `select()`
100 * (etc.) will wake up; you can call `async_wait_finish()` and then check the active Flow-protocol events using
101 * the same techniques as shown in the preceding paragraph, after having called sync_wait().
102 *
103 * The following illustrates the preceding paragraph. In this example, we are interested in a single TCP
104 * socket from which to read; and a single Flow socket from which to read. This can be extended to more
105 * native and Flow socket types, more sockets, and more sophisticated native mechanisms than `select()`;
106 * but basically this is it. (However, in case of `select()` and its peculiar `FD_SET()` semantics, please
107 * note that this examples combines `sel_read_set` to handle both the special local "linking" socket `comm_sock`
108 * and the TCP-readable socket `tcp_sock`. If you want to check for another type of event than being "readable" --
109 * e.g., "writable" -- then with `select()` you'd need to make a separate event set, perhaps named `sel_write_set`,
110 * and it would not be handling `comm_sock`, in whose case we only care about readability. Note also that
111 * the example event loop iteration shown below models situation where `tcp_sock` and `flow_sock` happen to
112 * both become readable just about at the same time, causing `select()` to return with 2 sockets in readable state
113 * simultaneously for illustration purposes. Realistically probably one would become readable; then in another
114 * loop iteration the other would... but this would be annoyingly lengthy to show; and though this is less
115 * probable, it's still possible.
116 *
117 * ~~~
118 * // NOTE: ASCII art arrows |--------> (etc.) indicate thread execution. "U" is the user thread.
119 * // "K" is unspecified kernel "thread." "W" is unspecified `net_flow` working thread.
120 * // T (cont.) just means thread T was already running up to the point we are illustrating.
121 *
122 * U (cont.)
123 * | Set up local socket (e.g., Unix domain socket or localhost TCP socket) comm_sock.
124 * | Set up select() native socket set sel_read_set, for native "readable" events.
125 * | Set up net_flow::Event_set flow_set.
126 * | FD_SET(comm_sock, sel_read_set);
127 * | Set up native TCP peer socket tcp_sock.
128 * | Set up Flow peer socket flow_sock.
129 * | FD_SET(tcp_sock, sel_read_set);
130 * | flow_set.add_wanted_socket(flow_sock, Event_type::S_PEER_SOCKET_READABLE);
131 * | Event loop begins here. <--------------------------------------+ K (cont.) W (cont.)
132 * | flow_set.async_wait(handle_flow_ev); | | ... | ...
133 * | Inform W. -> | | ... | flow_set -> WAITING.
134 * | select(2, sel_read_set, NULL, NULL, ...positive-timeout...); | | ... | ...
135 * | ...select() blocking... | | ... | ...
136 * | ... | | ... | ...
137 * | ... | | tcp_sock readable! | ...
138 * | ... | | <- Tell select(). |
139 * | K says tcp_sock readable! Get ready to return! | | ... | flow_sock readable!
140 * | | | | handle_flow_ev():
141 * | | | | Write 1 -> comm_sock.
142 * | | | comm_sock readable! |
143 * | | | <- Tell select(). |
144 * | K says comm_sock readable, and that's it. Return! | | ... | ...
145 * | flow_set.async_wait_finish(); | v | ...
146 * | Inform W. -> | | ...
147 * | if (FD_ISSET(tcp_sock, sel_read_set)) | | flow_set -> INACTIVE.
148 * | Non-blocking recv(tcp_sock) until exhausted! | v
149 * | if (FD_ISSET(comm_sock, sel_read_set)) |
150 * | In this example we've only 1 Flow socket and event, so: \ |
151 * | flow_sock.async_receive() until exhausted! |
152 * | Event loop ends here. -----------------------------------------+
153 * v
154 * ~~~
155 *
156 * The above are informal suggestions for use. Here is the more formal description of operation of
157 * an Event_set. Event_set is a simple state machine with three states: INACTIVE, WAITING, CLOSED.
158 * After construction (which is done by Node in a factory fashion), Event_set is in INACTIVE state.
159 * CLOSED means the Event_set has been Event_set::close()d, or the originating Node has been destroyed. It is
160 * not possible to perform any operations on a CLOSED Event_set, nor is it possible to exit the CLOSED
161 * state. A CLOSED Event_set stores no resources and will be deleted via `shared_ptr<>` mechanics
162 * once all user references to it are gone.
163 *
164 * The rest of the time, Event_set switches back and forth between INACTIVE and WAITING. In
165 * INACTIVE, you may set and examine the desired sockets/events. The following are supported (see also
166 * Event_set::Event_type `enum` values)
167 *
168 * - <em>Peer_socket Readable</em>: true if and only if `sock->receive()` with unlimited target buffer
169 * space would return either a non-zero number of bytes or indicate an error. (Therefore "not Readable"
170 * means it would return 0 but no error.)
171 * - <em>Peer_socket Writable</em>: same but with `sock->send()`. Note, however, that typically this is
172 * more likely to be immediately true on a given socket; if there's space in the Send buffer for a certain
173 * small amount of data, then the socket owning it is Writable; whereas data have to actually have arrived
174 * from the other side for it to be Readable. Nevertheless, if network conditions are such that the
175 * Send buffer cannot be purged via sending its contents to the other side, then eventually it won't be
176 * Writable either (so don't count on writability without checking).
177 * - <em>Server_socket Acceptable</em>: true if and only if `serv->accept()` would return either a non-null
178 * Server_socket::Ptr or indicate an error. (Therefore "not Acceptable" means it would return
179 * null but no error.)
180 *
181 * The desired events can be specified with swap_wanted_sockets(), add_wanted_socket(),
182 * and remove_wanted_socket(). Note that all of these methods are efficient (no copying
183 * of socket sets involved). Also note that all the socket objects passed to an Event_set must come
184 * from the same Node as that Event_set; else behavior is undefined.
185 *
186 * Also in INACTIVE, you may examine the results of the last wait, if any. Do this using
187 * emit_result_sockets(), etc. Note that these methods are also efficient (no copying) and can
188 * only be used once for each wait (all subsequent uses yield empty socket sets).
189 *
190 * In WAITING state, a wait for the specified events has started. async_wait(), sync_wait(), and
191 * poll() change state from INACTIVE to WAITING. In WAITING state, all of the above methods return
192 * errors. The following pieces of code change state back to INACTIVE: async_wait_finish(),
193 * sync_wait() (once an event is true, or timeout), poll() (immediately), and an unspecified
194 * non-user thread if async_wait() was used, and an event has been asynchronously detected.
195 *
196 * In particular, if `Event_set e` is used by 1 thread, and that thread performs only sync_wait()s for
197 * waiting, user code will never be able to observe the WAITING state, as sync_wait() will go
198 * INACTIVE->WAITING->INACTIVE internally.
199 *
200 * ### Relationship between 2 different Event_set objects ###
201 * They are entirely independent. In particular, you
202 * may put the same socket+event combo into 2 different Event_set objects and wait on both Event_sets
203 * simultaneously. If that socket/event holds for both Event_set objects at the same time, both will
204 * be signalled. I informally recommend against using this. For example, if the event is "sock is
205 * Readable," and you will `receive()` as a result, which of the two `receive()` calls gets what data?
206 * However there may be a reasonable use case for the Acceptable event.
207 *
208 * ### Signals and interruption ###
209 * Any POSIX blocking function, once it has started waiting, will exit with `errno == EINTR`
210 * if a signal is delivered. (Sometimes this is just annoying, but sometimes it's useful: if SIGTERM
211 * handler sets some global terminate flag, then the signal delivery will immediately break out of
212 * the blocking call, so the flag can be immediately checked by the main code (and then the program
213 * can cleanly exit, for example).) Event_set::sync_wait() and Event_set::async_wait() support
214 * similar functionality. Call Node::interrupt_all_waits() to interrupt any Event_set object(s) within
215 * that Node currently in WAITING state. They enter INACTIVE state, and it is indicated to the
216 * user of each sync_wait() or async_wait() that the reason for the wait's finish was an interruption
217 * (see those functions for details on how this is communicated). Conceptually this is similar to
218 * POSIX blocking `select()` or blocking `recv()` returning -1/`EINTR`.
219 *
220 * To actually cause signals to trigger Node::interrupt_all_waits() (as occurs by default,
221 * conceptually, in POSIX programs w/r/t `EINTR`), the user has two basic options. They can either
222 * register signal handlers that'll explicitly invoke that method; or they can let Node do so
223 * automatically for SIGINT and SIGTERM. This is controlled by the Node option
224 * Node_options::m_st_capture_interrupt_signals_internally.
225 *
226 * ### Thread safety ###
227 * Same as for Peer_socket. (Briefly: all operations safe for simultaneous execution
228 * on separate or the same object.) An informal recommendation, however, is to only use a given
229 * Event_set in one thread. Otherwise things will get confusing, quickly, with no practical benefit of which
230 * I, at least, am aware.
231 *
232 * @internal
233 *
234 * ### Implementation notes ###
235 * The crux of the implementation is in async_wait(), as all other wait types are built on it.
236 * Therefore there is a giant comment inside that method that is required reading for understanding
237 * this class's innards (which, IMO, are some of the trickiest logic in all of the library).
238 *
239 * As with Peer_socket and Server_socket, much of the implementation (as opposed to interface) of
240 * Event_set functionality resides in class Node. In particular anything to do with thread W is in
241 * Node. The actual code is in event_set.cpp, but since the logic involves Node state, and
242 * Event_set is subservient to the Node, in terms of objects it makes more sense to keep it in
243 * class Node. However, when no Node state is involved, Event_set logic is actually typically
244 * coded in Event_set methods, to a greater extent than in Peer_socket or Server_socket, which are
245 * mere data stores in comparison.
246 */
248 // Endow us with shared_ptr<>s ::Ptr and ::Const_ptr (syntactic sugar).
249 public util::Shared_ptr_alias_holder<boost::shared_ptr<Event_set>>,
250 // Allow access to Ptr(this) from inside Event_set methods. Just call shared_from_this().
251 public boost::enable_shared_from_this<Event_set>,
252 public log::Log_context,
253 private boost::noncopyable
254{
255public:
256 // Types.
257
258 /// A state of an Event_set.
259 enum class State
260 {
261 /**
262 * Default state; valid Event_set that is not currently waiting on events. All user
263 * operations are valid in this state.
264 */
265 S_INACTIVE,
266 /**
267 * Waiting state: valid Event_set that is currently waiting on previously described events. In
268 * this state only async_wait_finish() may be called without resulting in an error.
269 */
270 S_WAITING,
271 /// Node has disowned the Peer_socket; all further operations will result in error.
272 S_CLOSED
273 }; // enum class State
274
275 /**
276 * Type of event or condition of interest supported by class Event_set. When specifying interest in some socket
277 * reaching a certain condition, or when requesting the results of that interest, the user essentially specifies
278 * a pair of data: an enumeration value of this type and a socket of a certain type (that type is specified in
279 * the very name of the #Event_type). For example, I may be interested in Event_type::S_PEER_SOCKET_READABLE
280 * becoming true for Peer_socket::Ptr `sock`. The precise meaning is documented for each enumeration value.
281 *
282 * @internal
283 *
284 * ### Implementation notes ###
285 * You will note a few different structures, such as Event_set::m_want and Event_set::m_can, are keyed by this
286 * type. Doing it that way, instead of simply having a simpler Event_set::m_want-like structure (etc.) exist 3x,
287 * has its plusses and minuses. Historically, this was actually implemented that other way first, but I wanted
288 * to play with `boost::any` to see if it makes the code more elegant. It has certainly resulted in significantly
289 * fewer methods and less code; and in code that is more generic (and, in principle, extensible if we somehow
290 * come up with another event type of interest); and less bug-prone. The code is also arguably more opaque and
291 * harder to grok right off the bat, as it's more abstract. Finally, from the user's point of view, it is slightly
292 * harder to use in that when emitting results of an `Event_set::[a]sync_wait()`,
293 * one must `any_cast<>` to the proper socket type -- easy but requires a small learning curve. I'll
294 * say overall it is better this way than the other way.
295 *
296 * In terms of extending this `enum`, which seems unlikely, the following would be involved. Add a value for
297 * the `enum`; then add clauses for it in Socket_as_any_hash::operator(), Socket_as_any_equals::operator(),
298 * Event_set::sock_as_any_to_str(). Extend the structures in Event_set::empty_ev_type_to_socks_map() and
299 * S_EV_TYPE_TO_IS_ACTIVE_NODE_MTD. The latter will need a new method written in Node that checks for
300 * whether the condition of this type currently holds for the given socket -- whose type, by the way, you will
301 * need to decide and document in the doc header for the new `enum` value. Finally, you'll need to find all
302 * situations throughout the code where the condition may change from not holding to holding and possibly
303 * save it into Node::m_sock_events when detected (for example, see when `m_sock_events[S_PEER_SOCKET_READABLE]`
304 * is `insert()`ed into).
305 */
306 enum class Event_type
307 {
308 /**
309 * Event type specifying the condition of interest wherein a target `Peer_socket sock` is such that
310 * calling `sock->receive()` would yield either non-zero ("successfully dequeued
311 * received data") or zero and an error (but not zero and NO error). In other words, specifies
312 * the condition where a Peer_socket is Readable.
313 *
314 * In Event_set::Sockets structures associated with this Event_set::Event_type, `boost::any` elements
315 * wrap the type: Peer_socket::Ptr. `boost::any_cast<>` to that type to obtain the socket.
316 */
317 S_PEER_SOCKET_READABLE,
318
319 /**
320 * Event type specifying the condition of interest wherein a target `Peer_socket sock` is such that
321 * calling `sock->send()` with a non-empty buffer would yield either non-zero ("successfully enqueued
322 * data to be sent") or zero and an error (but not zero and NO error). In other words, specifies
323 * the condition where a Peer_socket is Writable.
324 *
325 * In Event_set::Sockets structures associated with this Event_set::Event_type, `boost::any` elements
326 * wrap the type: Peer_socket::Ptr. `boost::any_cast<>` to that type to obtain the socket.
327 */
328 S_PEER_SOCKET_WRITABLE,
329
330 /**
331 * Event type specifying the condition of interest wherein a target `Server_socket serv` is such that
332 * calling `serv->accept()` would yield either non-null ("successfully accepted a ready conneection")
333 * or null and an error (but not null and NO error). In other words, specifies
334 * the condition where a Server_socket is Acceptable.
335 *
336 * In Event_set::Sockets structures associated with this Event_set::Event_type, `boost::any` elements
337 * wrap the type: Server_socket::Ptr. `boost::any_cast<>` to that type to obtain the socket.
338 */
339 S_SERVER_SOCKET_ACCEPTABLE
340 }; // enum class Event_type
341
342 class Socket_as_any_hash;
343 class Socket_as_any_equals;
344
345 /**
346 * A set of sockets of one type, used to communicate sets of desired and resulting events in various Event_set
347 * APIs. As a rule, a given #Sockets object will store sockets of one underlying type; meaning
348 * the `boost::any`s stored inside one such set can ALL be `boost::any_cast<>` to the *same* type. Which type that
349 * is is usually determined by the associated #Event_type value, typically supplied alongside a #Sockets set
350 * in another argument. For example, if `ev_type == Event_type::S_PEER_SOCKET_READABLE`, then every `boost::any`
351 * in the set can be decoded as follows: `Peer_socket::Ptr sock = any_cast<Peer_socket::Ptr>(sock_as_any)`, where
352 * `sock_as_any` is an element in a #Sockets.
353 *
354 * As of this writing, the type is chronologically ordered; meaning sockets will be stored in order from latest to
355 * oldest to be inserted into the structure. E.g., emit_result_sockets() will produce a set containing of
356 * readable sockets, in reverse chronological order in which they were detected as being readable. This may or may
357 * not be useful in debugging.
358 *
359 * @internal
360 *
361 * This type is also used internally extensively.
362 *
363 * @todo Is it necessary to have Event_set::Sockets be aliased to util::Linked_hash_set? `unordered_set` would also
364 * work and take somewhat less memory and computational overhead. It would become unordered, instead of ordered
365 * chronologically, but that seems like a price possibly worth paying.
366 *
367 * Note, also, the to-do in emit_result_sockets() doc header, regarding using `variant` instead of `any`.
368 * Not only would that be faster and arguably safer, being compile-time in nature; but we'd get
369 * hashing/equality for free -- hence the code would be much pithier (no need for Socket_as_any_hash or
370 * Socket_as_any_equals).
371 */
373
374 /**
375 * The type for custom handler passed to async_wait(), which is executed when one or more active events detected, or
376 * interrupted as if by signal.
377 *
378 * @internal
379 *
380 * @todo Perhaps async_wait() and other APIs of Event_set taking handlers should be templated on handler type
381 * for syntactic sugar. Though, currently, this would be no faster (it is internally stored as this type anyway and
382 * must be so), nor would it actually improve call code which needs no explicit cast (an object of this type will
383 * implicitly be substituted as a conversion from whatever compatible-with-this-signature construction they used).
384 * So on balance, this currently appears superior. After all writing non-template bodies is easier/nicer.
385 */
386 using Event_handler = Function<void (bool)>;
387
388 // Constructors/destructor.
389
390 /// Boring destructor. Note that deletion is to be handled exclusively via `shared_ptr`, never explicitly.
391 ~Event_set();
392
393 // Methods.
394
395 /**
396 * Current State of the Event_set. Note that this may change the moment the method returns.
397 *
398 * @return Ditto.
399 */
400 State state() const;
401
402 /**
403 * Node that produced this Event_set. Note that this may change the moment the method returns
404 * (but only to null).
405 *
406 * @return Pointer to (guaranteed valid) Node; null if state() is S_CLOSED.
407 */
408 Node* node() const;
409
410 /**
411 * Clears all stored resources (any desired events, result events, and any handler saved by
412 * async_wait()) and moves state to State::S_CLOSED. In particular Node will have disowned this object
413 * by the time close() returns.
414 *
415 * You might call close() while state is State::S_WAITING, but if a timeout-less sync_wait() is executing
416 * in another thread, it will NEVER return. Similarly, if state is currently WAITING due to
417 * async_wait(), the handler saved by async_wait() will NEVER be called. So don't do that.
418 * However, first closing one or more sockets being waited on by those calls and THEN calling
419 * `this->close()` is perfectly safe, in that sync_wait() will exit, or the handler will be called.
420 * (In fact when Node shuts down it does just that.)
421 *
422 * @param err_code
423 * See flow::Error_code docs for error reporting semantics. Generated codes:
424 * error::Code::S_EVENT_SET_CLOSED.
425 */
426 void close(Error_code* err_code = 0);
427
428 /**
429 * Adds the given socket to the set of sockets we want to know are "ready" by the definition of
430 * the given event type. See individual Event_type enumeration members' doc comments for exact
431 * definition of readiness for each #Event_type. For example, Event_type::S_PEER_SOCKET_READABLE
432 * means we want to know when `sock->receive()` would yield either some data or an error, but not
433 * no data and no error.
434 *
435 * @tparam Socket type to which `ev_type` applies. E.g., Peer_socket for PEER_SOCKET_READABLE.
436 * @param sock
437 * Socket to add. Must be from the same Node as the one originating this Event_set.
438 * @param ev_type
439 * The condition we are interested in `sock` reaching.
440 * @param err_code
441 * See flow::Error_code docs for error reporting semantics. Generated codes:
442 * error::Code::S_EVENT_SET_ALREADY_EXISTS, error::Code::S_EVENT_SET_CLOSED,
443 * error::Code::S_EVENT_SET_IMMUTABLE_WHEN_WAITING.
444 * @return `true` if and only if no error occurred (`*err_code` is success).
445 */
446 template<typename Socket>
447 bool add_wanted_socket(typename Socket::Ptr sock, Event_type ev_type, Error_code* err_code = 0);
448
449 /**
450 * Opposite of add_wanted_socket().
451 *
452 * @tparam See add_wanted_socket().
453 * @param sock
454 * Socket to remove.
455 * @param ev_type
456 * See add_wanted_socket().
457 * @param err_code
458 * See flow::Error_code docs for error reporting semantics. Generated codes:
459 * error::Code::S_EVENT_SET_EVENT_DOES_NOT_EXIST, error::Code::S_EVENT_SET_CLOSED,
460 * error::Code::S_EVENT_SET_IMMUTABLE_WHEN_WAITING.
461 * @return `true` if and only if no error occurred (`*err_code` is success).
462 */
463 template<typename Socket>
464 bool remove_wanted_socket(typename Socket::Ptr sock, Event_type ev_type, Error_code* err_code = 0);
465
466 /**
467 * Efficiently exchanges the current set of sockets we want to know are "ready" by the definiton of
468 * the given event type. See individual Event_type enumeration members' doc comments for exact
469 * definition of readiness for each #Event_type. For example, Event_type::S_PEER_SOCKET_READABLE
470 * means we want to know when `sock->receive()` would yield either some data or an error, but not
471 * no data and no error. Use this to perform arbitrarily complex operations on the internal set
472 * storing sockets of interest for the given event type `ev_type`, when the add_wanted_socket() and
473 * remove_wanted_socket() methods are insufficient. For example:
474 *
475 * ~~~
476 * Event_set::Sockets socks;
477 * // Exchange of our empty `socks` with what's in `es`.
478 * es->swap_wanted_sockets(&socks, Event_type::S_PEER_SOCKET_READBLE);
479 * // ...Remove every 3rd socket from `socks`, and log `socks.size()`....
480 * // Now put the modified socket set back into `es`.
481 * es->swap_wanted_sockets(&socks, Event_type::S_PEER_SOCKET_READBLE);
482 * ~~~
483 *
484 * ### Rationale ###
485 * The swap paradigm (precursor to the "move" paradigm added in C++11) allows
486 * arbitrarily complex operations without sacrificing performance or thread safety.
487 *
488 * @param target_set
489 * Pointer to set of sockets to which to load the current internal set. Currently
490 * contained sockets must be from the same Node that originated this Event_set.
491 * @param ev_type
492 * See add_wanted_socket().
493 * @param err_code
494 * See flow::Error_code docs for error reporting semantics. Generated codes:
495 * error::Code::S_EVENT_SET_CLOSED, error::Code::S_EVENT_SET_IMMUTABLE_WHEN_WAITING.
496 * @return `true` if and only if no error occurred (`*err_code` is success).
497 */
498 bool swap_wanted_sockets(Sockets* target_set, Event_type ev_type, Error_code* err_code);
499
500 /**
501 * Identical to `swap_wanted_sockets(&sockets, ev_type, err_code)`, where originally `sockets` is empty and
502 * is afterwards cleared; but more efficient.
503 *
504 * @param ev_type
505 * See swap_wanted_sockets().
506 * @param err_code
507 * See flow::Error_code docs for error reporting semantics. Generated codes:
508 * error::Code::S_EVENT_SET_CLOSED, error::Code::S_EVENT_SET_IMMUTABLE_WHEN_WAITING.
509 * @return `true` if and only if no error occurred (`*err_code` is success).
510 */
511 bool clear_wanted_sockets(Event_type ev_type, Error_code* err_code = 0);
512
513 /**
514 * Returns `true` if and only if at least one wanted event for at least one socket is registered
515 * (via add_wanted_socket(), swap_wanted_sockets(), etc.).
516 *
517 * @param err_code
518 * See flow::Error_code docs for error reporting semantics. Generated codes:
519 * error::Code::S_EVENT_SET_CLOSED.
520 * @return `true` if there are wanted events; `false` if there are no wanted events (then `*err_code` is
521 * success) or there was an error (`*err_code` is failure; i.e., `bool(*err_code) == true`).
522 */
523 bool events_wanted(Error_code* err_code = 0) const;
524
525 /**
526 * Checks for all previously described events that currently hold, saves them for retrieval via
527 * emit_result_sockets(), etc., and returns. This is akin to a non-blocking sync_wait() (which does
528 * not exist; poll() does), a/k/a a `select()` with a timeout of zero.
529 *
530 * In a non-error invocation, state() will be State::S_INACTIVE before and after the call. In an error
531 * invocation, state() will not change.
532 *
533 * @param err_code
534 * See flow::Error_code docs for error reporting semantics. Generated codes:
535 * error::Code::S_EVENT_SET_CLOSED, error::Code::S_EVENT_SET_DOUBLE_WAIT_OR_POLL.
536 * @return `true` if and only if no error occurred (`*err_code` is success).
537 */
538 bool poll(Error_code* err_code = 0);
539
540 /**
541 * Blocks indefinitely until one or more of the previously described events hold -- or the wait
542 * is interrupted; saves them for retrieval via emit_result_sockets(), etc.; and returns.
543 * This is akin to a `select()` call with no (i.e., infinite) timeout.
544 *
545 * The special case of Node::interrupt_all_waits() interrupting this wait -- which is conceptually similar
546 * to `EINTR` in POSIX -- manifests itself as error::Code::S_WAIT_INTERRUPTED. In this case,
547 * emit_result_sockets() (etc.) should not be used, as one should assume no events are active due to the
548 * interruption.
549 *
550 * In a non-error invocation, state() will be State::S_INACTIVE before and after the call, unless
551 * underlying Node is destroyed, in which case the final state may be State::S_CLOSED. In an error
552 * invocation, state() will not change, and the method will return immediately. Additionally, the
553 * method will NEVER return, if another thread calls `this->close()` or `this->async_wait_finish()`
554 * during this sync_wait() (so don't do that).
555 *
556 * @param err_code
557 * See flow::Error_code docs for error reporting semantics. Generated codes:
558 * error::Code::S_WAIT_INTERRUPTED, error::Code::S_EVENT_SET_NO_EVENTS,
559 * error::Code::S_EVENT_SET_DOUBLE_WAIT_OR_POLL, error::Code::S_EVENT_SET_CLOSED.
560 * @return `true` if and only if no error occurred (`*err_code` is success).
561 */
562 bool sync_wait(Error_code* err_code = 0);
563
564 /**
565 * Same as the other sync_wait() but will stop waiting if the timeout given as argument
566 * expires. If the timeout expires, it is the error code error::Code::S_WAIT_USER_TIMEOUT. This is akin
567 * to a `select()` call with a finite timeout.
568 *
569 * An additional error situation is possible in addition to that described in the 2nd/3rd paragraphs of
570 * the other sync_wait()'s doc header: if this is `close()`d during the wait, and the wait
571 * times out, error::Code::S_EVENT_SET_CLOSED will be emitted when this timeout is detected. On the
572 * positive side, that means `sync_wait(timeout)` will eventually exit no matter what.
573 *
574 * Tip: Typical types you might use for `max_wait`: `boost::chrono::milliseconds`,
575 * `boost::chrono::seconds`, `boost::chrono::high_resolution_clock::duration`.
576 *
577 * No guarantees are made as to the accuracy of the timeout timer, although you can optimistically
578 * provide arbitrarily precise values for `max_wait`.
579 *
580 * @tparam Rep
581 * See `boost::chrono::duration` documentation (and see above tip).
582 * @tparam Period
583 * See `boost::chrono::duration` documentation (and see above tip).
584 * @param max_wait
585 * The maximum amount of time from now to wait before giving up on the wait and returning.
586 * `"duration<Rep, Period>::max()"` will eliminate the time limit and cause indefinite wait.
587 * @param err_code
588 * See flow::Error_code docs for error reporting semantics. Generated codes:
589 * Same as the other sync_wait() plus: error::Code::S_WAIT_USER_TIMEOUT.
590 * @return `true` if and only if no error occurred (`*err_code` is success). Timeout expiring IS
591 * an error, in particular.
592 */
593 template<typename Rep, typename Period>
594 bool sync_wait(const boost::chrono::duration<Rep, Period>& max_wait, Error_code* err_code = 0);
595
596 /**
597 * Moves object to State::S_WAITING state, saves the given handler to be executed later (in a different,
598 * unspecified thread), when one or more of the previously described events hold; and immediately
599 * returns. State will go back to State::S_INACTIVE when the handler fires; or when async_wait_finish() is
600 * called by the user (whichever happens first). State may also change to State::S_CLOSED if `this->close()` is
601 * called, or the Node is destroyed. The saved handler will be forgotten at that time. Once
602 * INACTIVE is entered, emit_result_sockets(), etc., are to be used to access the detected
603 * events. on_event() must take one bool argument. If this argument is `false`, use emit_result_sockets(),
604 * etc., to access the detected events. If this argument is `true`, then Node::interrupt_all_waits() was
605 * invoked and has interrupted this wait (conceptually similar to `EINTR` in POSIX). In the latter case,
606 * do no use emit_result_sockets(), etc., as no events are active due to the interruption.
607 *
608 * In a non-error invocation, state() will be INACTIVE before the call and WAITING after it.
609 * In an error invocation, state() will not change, and the method will return immediately.
610 * Additionally, on_event() will NEVER be called, if another thread calls `this->close()` before
611 * async_wait_finish() is called, and before the Node is destroyed. So don't do that.
612 *
613 * Restrictions on what `on_event()` is allowed to do: It is allowed to do anything except make any
614 * `net_flow` call related to the net_flow::Node originating the current Event_set; doing so results in
615 * undefined behavior. Informally, it also must not block; spending significant time in
616 * `on_event()` will disrupt the functionality of the Node. Even more informally, the goal of
617 * `on_event()` should be to quickly signal the user's thread(s) that the events hold (using the
618 * technique of the user's choice) and then return -- beyond that, the handling of the ready events
619 * should be in the user's thread(s).
620 *
621 * Tip: Call async_wait_finish() before checking the saved results, using emit_result_sockets(). Doing so BEFORE
622 * any events are detected will finish this asynchronous wait. Doing so AFTER any events are
623 * detected will be a harmless NOOP.
624 *
625 * Tip: Use lambdas (or `bind()`) to make async_wait() asynchronously call any arbitrary function
626 * or method with any arbitrary arguments -- NOT just a free `void` function with 1 argument. Outside the scope
627 * of discussion here, but if this doesn't ring a bell, please look into lambdas (`bind()` as a backup).
628 *
629 * Rationale for no timeout argument: Since the caller of async_wait() retains flow control
630 * without blocking, the user code can enforce its own timeout logic, if necessary, and simply
631 * call async_wait_finish() when desired. In fact that is just what sync_wait() (with timeout argument) does
632 * internally.
633 *
634 * @param on_event
635 * The function to call as soon as as one or more events previously described hold, AND
636 * `this->state()` is still State::S_WAITING. (Also see above tip.)
637 * `on_event(bool was_interrupted)` will be called.
638 * @param err_code
639 * See flow::Error_code docs for error reporting semantics. Generated codes:
640 * error::Code::S_EVENT_SET_DOUBLE_WAIT_OR_POLL, error::Code::S_EVENT_SET_CLOSED.
641 * @return `true` if and only if no error occurred (`*err_code` is success).
642 */
643 bool async_wait(const Event_handler& on_event, Error_code* err_code = 0);
644
645 /**
646 * Moves object from State::S_WAITING to State::S_INACTIVE, and forgets any handler saved by async_wait(), or does
647 * nothing if state is already INACTIVE. Use this to cut short an asynchronous wait started by
648 * async_wait(). After return, emit_result_sockets(), etc., can be used to check the active
649 * events (if any) detected during the last wait.
650 *
651 * In a non-error invocation, state() will be State::S_INACTIVE or State::S_WAITING before the call and
652 * State::S_INACTIVE after it. In an error invocation, state() will not change.
653 *
654 * You might call async_wait_finish() while another thread is executing a timeout-less sync_wait()
655 * (which is also invoked by blocking methods like Peer_socket::sync_receive()). However that
656 * will cause that sync_wait() to NEVER return. So don't do that.
657 *
658 * @param err_code
659 * See flow::Error_code docs for error reporting semantics. Generated codes:
660 * error::Code::S_EVENT_SET_CLOSED.
661 * @return `true` if and only if no error occurred (`*err_code` is success). In particular, state()
662 * being State::S_INACTIVE when the method starts is not an error.
663 */
664 bool async_wait_finish(Error_code* err_code = 0);
665
666 /**
667 * Returns `true` if and only if the last wait, if any, detected at least one event. In other
668 * words, returns `true` if and only if emit_result_sockets() would currently emit at least one
669 * socket, if tried with all possible Event_type.
670 *
671 * One can still use `emit_result_sockets()` to get the specific events after calling this.
672 *
673 * @note Conceptually, this is a bit like when `select()` returns 0 or higher; and one uses the check
674 * of whether its return value is 0 or non-zero. Non-zero is actually some complex index thing,
675 * but often that detail is not necessary (much like `emit_result_sockets()` is unnecessary, analogously),
676 * as the mere presence or absence of 1+ events is enough information. For example, if only one event
677 * for one socket is being waited on, one can check this and confidently perform the appropriate I/O
678 * operation for that one socket, if and only if this returns `true` -- or `select()` would return
679 * non-zero. Slightly more wastefully, but still not bad at all, is when (say) 2 event types are being
680 * waited on, but for only 1 socket. In that case `true` return => just perform both I/O operations;
681 * one OR both of them should yield something (and the one that doesn't hardly uses any resources).
682 * Similarly, even if you're waiting on a few sockets, if it's a limited number (like, say, 2-3), then
683 * indiscriminately trying all possible I/O on all 2-3 sockets is only slightly wasteful: and the code
684 * is quite a bit shorter than carefully checking `emit_result_sockets()` (or doing `FD_ISSET()`, etc., in
685 * the analogous `select()` code).
686 * @param err_code
687 * See flow::Error_code docs for error reporting semantics. Generated codes:
688 * error::Code::S_EVENT_SET_CLOSED, error::Code::S_EVENT_SET_RESULT_CHECK_WHEN_WAITING.
689 * @return `true` if there are active events; `false` if there are no active events (then `*err_code` is
690 * success) or there was an error (`*err_code` is failure; i.e., `bool(*err_code) == true`).
691 */
692 bool events_detected(Error_code* err_code = 0) const;
693
694 /**
695 * Gets the sockets that satisfy the condition of the given Event_type detected during the last wait.
696 * More precisely, moves all sockets satisfying that condition detected during the last wait (if any)
697 * into the set provided by the user. Because it is a move (for efficiency among other reasons), the
698 * subsequent calls to the same method will yield empty sets (until the next wait operation).
699 * Calling before any wait will also yield an empty set.
700 *
701 * Note that the accumulated sockets are NOT preserved across waits. That is, if you
702 * start a wait, the preceding wait's results are wiped out.
703 *
704 * ### Rationale ###
705 * Making the method a one-off that returns nothing after the first invocation (per
706 * wait) allows for thread safety without sacrificing efficiency by adding set copy.
707 *
708 * ### How to use ###
709 * First, prepare a (usually empty) target socket set structure. Second, call emit_result_sockets() to transfer
710 * the results to it. Third, for each socket of interest, `any_cast<>` it from `boost::any` to the
711 * socket pointer of the appropriate type. (Recall that `boost::any` stores an object of any type
712 * inside it, so to get access to that you must know to what to cast it; but this is easy, since by
713 * specifying `ev_type` you are implying a certain socket type.) Example:
714 *
715 * ~~~
716 * Event_set::Sockets readable_socks; // Note this contains boost::any's as elements, regardless of `ev_type`.
717 * event_set->emit_result_sockets(&readable_socks, Event_set::Event_type::S_PEER_SOCKET_READABLE);
718 * for (const auto& readable_sock_as_any : readable_socks)
719 * {
720 * // Since we asked for S_PEER_SOCKET_READABLE, we know results are all Peer_sockets. Cast to that type:
721 * const Peer_socket::Ptr readable_sock = boost::any_cast<Peer_socket::Ptr>(readable_sock_as_any);
722 * // PEER_SOCKET_READABLE indicates readiness to receive() to yield data or an error. We can now do so!
723 * readable_sock->receive(...); // Details of this call left to reader.
724 * }
725 * ~~~
726 *
727 * @param target_set
728 * Pointer to set of sockets to which to load the current internal set of result sockets
729 * of type `ev_type` found during the last wait. Any elements here at call time will be removed.
730 * @param ev_type
731 * The condition we are interested in which sockets have reached during the last wait.
732 * @param err_code
733 * See flow::Error_code docs for error reporting semantics. Generated codes:
734 * error::Code::S_EVENT_SET_CLOSED, error::Code::S_EVENT_SET_IMMUTABLE_WHEN_WAITING.
735 * @return `true` if and only if no error occurred (`*err_code` is success).
736 *
737 * @todo Event_set::emit_result_sockets() sets a #Sockets structure which stores `boost:any`s each of which
738 * stores either a `Peer_socket::Ptr` or a `Server_socket::Ptr`; #Sockets should be changed to store
739 * C++17 `std::variant`s. Performance, both internally and externally, would improve by using this
740 * type-safe compile-time mechanism (which is akin to `union`s but much more pleasant to use).
741 * At the time this feature was written, Flow was in C++11, so `variant`s were not available, and
742 * the author wanted to play around with `any`s instead of haxoring old-school `union`s.
743 * `variant` is much nicer, however, and the dynamic nature of `any` is entirely unnecessary here.
744 */
745 bool emit_result_sockets(Sockets* target_set, Event_type ev_type, Error_code* err_code = 0);
746
747 /**
748 * Identical to `emit_result_sockets(&sockets, ev_type, err_code)`, where originally `sockets` is empty and
749 * is afterwards cleared; but more efficient.
750 *
751 * @param ev_type
752 * See emit_result_sockets().
753 * @param err_code
754 * Same.
755 * @return Same.
756 */
757 bool clear_result_sockets(Event_type ev_type, Error_code* err_code = 0);
758
759 /**
760 * Forgets all sockets stored in this object in any fashion.
761 *
762 * @param err_code
763 * See flow::Error_code docs for error reporting semantics. Generated codes:
764 * error::Code::S_EVENT_SET_CLOSED, error::Code::S_EVENT_SET_IMMUTABLE_WHEN_WAITING.
765 * @return true if and only if no error occurred (*err_code is success).
766 */
767 bool clear(Error_code* err_code = 0);
768
769private:
770 // Friends.
771
772 /**
773 * See rationale for `friend`ing Node in class Event_set documentation header.
774 * @see Node.
775 */
776 friend class Node;
777
778 // Types.
779
780 /**
781 * Short-hand for reentrant mutex type. We explicitly rely on reentrant behavior, so this isn't "just in case."
782 * (One shouldn't use reentrant mutexes "just in case"; it should be entirely conscious and on-purpose; one should
783 * use non-reentrant mutexes, all else being equal.)
784 *
785 * @todo This doc header for Event_set::Mutex should specify what specific behavior requires mutex reentrance, so that
786 * for example one could reevaluate whether there's a sleeker code pattern that would avoid it.
787 */
789
790 /// Short-hand for RAII lock guard of #Mutex. Use instead of `boost::lock_guard` for `release()` at least.
792
793 /**
794 * Short-hand for type storing a set of socket sets -- one per possible #Event_type `enum` value.
795 * In practice, the key set for a value of this type is all Event_type members; use
796 * empty_ev_type_to_socks_map() to create a maximally empty such structure.
797 */
799
800 // Constructors.
801
802 /**
803 * Constructs object; initializes all values to well-defined but possibly meaningless values (0,
804 * empty, etc.).
805 *
806 * @param logger_ptr
807 * The Logger implementation to use subsequently.
808 */
809 explicit Event_set(log::Logger* logger_ptr);
810
811 // Methods.
812
813 /**
814 * Helper that ensures the state of `*this` is such that one may modify the #m_can and #m_want
815 * socket sets. Pre-condition: #m_mutex is locked.
816 *
817 * @param err_code
818 * If `false` returned, sets this to the reason why socket sets are not OK to modify.
819 * Otherwise sets it to success. Possible errors:
820 * error::Code::S_EVENT_SET_CLOSED, error::Code::S_EVENT_SET_IMMUTABLE_WHEN_WAITING.
821 * @return `true` if OK to modify; `false` if not OK to modify.
822 */
823 bool ok_to_mod_socket_set(Error_code* err_code) const;
824
825 /**
826 * Same as the public `sync_wait(max_wait)` but uses a `Fine_clock`-based `Fine_duration` non-template
827 * type for implementation convenience and to avoid code bloat in specifying timeout.
828 *
829 * @param max_wait
830 * See the public sync_wait() *with* a `max_wait` argument. `"duration<Rep, Period>::max()"` maps to the value
831 * `Fine_duration::max()` for this argument.
832 * @param err_code
833 * See sync_wait().
834 * @return See sync_wait().
835 */
836 bool sync_wait_impl(const Fine_duration& max_wait, Error_code* err_code);
837
838 /**
839 * Creates a maximally empty #Ev_type_to_socks_map: it will have all possible Event_type as keys
840 * but only empty #Sockets sets as values.
841 *
842 * @return Ditto.
843 */
845
846 /**
847 * Helper that clears each #Sockets set inside an #Ev_type_to_socks_map. Thus,
848 * it is equivalent to `*ev_type_to_socks_map = empty_ev_type_to_socks_map()`, but perhaps faster.
849 *
850 * @param ev_type_to_socks_map
851 * Target map.
852 */
853 static void clear_ev_type_to_socks_map(Ev_type_to_socks_map* ev_type_to_socks_map);
854
855 /**
856 * Functional helper that checks whether a given `pair` in an #Ev_type_to_socks_map contains an empty
857 * set of #Sockets or not.
858 *
859 * @param ev_type_and_socks
860 * A value (key and mapped, not just mapped) from an #Ev_type_to_socks_map.
861 * @return `true` if and only if the socket set is empty.
862 */
863 static bool ev_type_to_socks_map_entry_is_empty(const Ev_type_to_socks_map::Value& ev_type_and_socks);
864
865 /**
866 * Helper that returns a loggable string summarizing the sizes of the socket sets, by type, stored in
867 * an #Ev_type_to_socks_map.
868 *
869 * @param ev_type_to_socks_map
870 * A valid such map.
871 * @return Loggable string.
872 */
873 static std::string ev_type_to_socks_map_sizes_to_str(const Ev_type_to_socks_map& ev_type_to_socks_map);
874
875 /**
876 * Helper that returns a loggable string representing the socket stored in the given `boost::any` that
877 * stores a value allowed by the members of #Event_type `enum`.
878 *
879 * @param sock_as_any
880 * See above. `sock_as_any.empty()` is also allowed.
881 * @return Loggable string.
882 */
883 static std::string sock_as_any_to_str(const boost::any& sock_as_any);
884
885 // Constants.
886
887 /**
888 * Mapping from each possible #Event_type to the Node method that determines whether the condition defined by
889 * that #Event_type is currently true for the socket wrapped in the `boost::any` argument passed to the method.
890 *
891 * E.g., `bool Node::sock_is_readable(const boost::any sock_as_any) const` is to return true if and only if
892 * `sock = boost::any_cast<Peer_socket::Ptr>(sock_as_any)` -- which is a Peer_socket::Ptr -- would yield non-zero
893 * or zero and error if one were to execute `sock->receive()`. Note that the latter condition is defined by the
894 * doc comment on Event_type::S_PEER_SOCKET_READABLE.
895 */
896 static const boost::unordered_map<Event_type, Function<bool (const Node*, const boost::any&)>>
898 // Data.
899
900 /// See state(). Should be set before user gets access to `*this`. Must not be modified by non-W threads after that.
902
903 /// See node(). Should be set before user gets access to `*this`. Must not be modified by non-W threads after that.
905
906 /**
907 * The sockets, categorized by #Event_type of interest, to check for "ready" status (as defined in the doc header
908 * for each #Event_type), in the next wait (State::S_WAITING state).
909 */
911
912 /**
913 * The sockets, categorized by #Event_type of interest, that were found to be "ready" (as defined in the doc header
914 * for each #Event_type), during the last wait (State::S_WAITING state). For each WAITING period, each set in
915 * #m_can following that period must be a subset of the corresponding #m_want set when entering that period.
916 *
917 * E.g., `m_can[S_PEER_SOCKET_READABLE]` after WAITING is a subset of `m_want[S_PEER_SOCKET_READABLE]` when just
918 * starting WAITING.
919 */
921
922 /**
923 * During State::S_WAITING, stores the handler (a `void` function with 1 `bool` argument) that will be
924 * called once one or more events are found to hold. `m_on_event.empty()` at all other times.
925 */
927
928 /**
929 * While in State::S_WAITING, if this is `true`, an exhaustive check of all desired events is yet to
930 * be performed (in thread W); if `false`, it has alredy been performed (in thread W), and only
931 * "delta" checks need to be checked from now on during this wait. See details in giant comment
932 * block inside async_wait().
933 */
935
936 /// Mutex protecting ALL data in this object.
937 mutable Mutex m_mutex;
938}; // class Event_set
939
940/**
941 * Hasher class used in storing various sockets of types wrapped as `boost::any`s in the #Sockets type.
942 * We do not simply define `hash_value(any)`, because our implementation specifically works for `any` objects
943 * that store various sockets and won't work for all other possibilities; and we don't want to force this
944 * on every user of `hash<any>`.
945 */
947{
948public:
949 /**
950 * Returns hash value of the given object which must be stored in a #Sockets object.
951 *
952 * @param sock_as_any
953 * Object wrapping one the socket objects permitted by `enum` members of Event_type.
954 * @return Hash.
955 */
956 size_t operator()(const boost::any& sock_as_any) const;
957};
958
959/**
960 * Equality predicate class used in storing various sockets of types wrapped as `boost::any`s in the #Sockets type.
961 * We do not simply define `operator==(any, any)`, because our implementation specifically works for `any` objects
962 * that store various sockets and won't work for all other possibilities; and we don't want to force this
963 * on every user of `hash<any>`.
964 */
966{
967public:
968 /**
969 * Returns whether the two objects, which must be stored in #Sockets objects, are equal by value.
970 *
971 * @param sock_as_any1
972 * Object wrapping one the socket objects permitted by `enum` members of Event_type.
973 * @param sock_as_any2
974 * Object wrapping one the socket objects permitted by `enum` members of Event_type.
975 * @return `true` if and only if the two are equal by value.
976 */
977 bool operator()(const boost::any& sock_as_any1, const boost::any& sock_as_any2) const;
978};
979
980// Free functions: in *_fwd.hpp.
981
982// However the following refer to inner type(s) and hence must be declared here and not _fwd.hpp.
983
984/**
985 * Prints string representation of given Event_set state to given standard `ostream` and returns the
986 * latter.
987 *
988 * @relatesalso Event_set
989 *
990 * @param os
991 * Stream to print to.
992 * @param state
993 * Value to serialize.
994 * @return `os`.
995 */
996std::ostream& operator<<(std::ostream& os, Event_set::State state);
997
998/**
999 * Prints string representation of given event type to given standard `ostream` and returns the
1000 * latter.
1001 *
1002 * @relatesalso Event_set
1003 *
1004 * @param os
1005 * Stream to print to.
1006 * @param ev_type
1007 * Value to serialize.
1008 * @return `os`.
1009 */
1010std::ostream& operator<<(std::ostream& os, Event_set::Event_type ev_type);
1011
1012// Template implementations.
1013
1014template<typename Socket>
1015bool Event_set::add_wanted_socket(typename Socket::Ptr sock, Event_type ev_type, Error_code* err_code)
1016{
1017 using boost::any;
1018
1019 FLOW_ERROR_EXEC_AND_THROW_ON_ERROR(bool, Event_set::add_wanted_socket<Socket>, sock, ev_type, _1);
1020 // ^-- Call ourselves and return if err_code is null. If got to present line, err_code is not null.
1021
1022 // We are in thread U != W.
1023
1024 // Accessing m_state, socket sets, etc. which may be written by other threads at any time. Must lock.
1025 Lock_guard lock(m_mutex);
1026
1027 FLOW_LOG_TRACE("Object [" << sock << "] wanted for event type [" << ev_type << "] in Event_set [" << this << "].");
1028
1029 if (!ok_to_mod_socket_set(err_code)) // Ensure we can modify want_set in this state.
1030 {
1031 // *err_code is set.
1032 return false; // Closed; do not set *target_set (*want_set is blank anyway).
1033 }
1034 // else *err_code is success.
1035 assert(!*err_code);
1036
1037 /* Note that this method is a template solely because of the following `any(sock)`. We could've made the user
1038 * perform this conversion themselves and taken a `boost::any` into a non-template method. We give them a little
1039 * sugar though. */
1041 "Expecting amortized constant time insertion sockets container."); // Ensure fastness.
1042 if (!m_want[ev_type].insert(any(sock)).second)
1043 {
1045 }
1046 // else { *err_code is still success. }
1047
1048 return !*err_code;
1049}
1050
1051template<typename Socket>
1052bool Event_set::remove_wanted_socket(typename Socket::Ptr sock, Event_type ev_type, Error_code* err_code)
1053{
1054 FLOW_ERROR_EXEC_AND_THROW_ON_ERROR(bool, Event_set::remove_wanted_socket<Socket>, sock, ev_type, _1);
1055 // ^-- Call ourselves and return if err_code is null. If got to present line, err_code is not null.
1056
1057 using boost::any;
1058
1059 // We are in thread U != W.
1060
1061 // Accessing m_state, the sets, etc. which may be written by other threads at any time. Must lock.
1062 Lock_guard lock(m_mutex);
1063
1064 FLOW_LOG_TRACE("Object [" << sock << "] no longer wanted for event type [" << ev_type << "] in "
1065 "Event_set [" << this << "].");
1066
1067 if (!ok_to_mod_socket_set(err_code)) // Ensure we can modify want_set in this state.
1068 {
1069 // *err_code is set.
1070 return false; // Closed; do not set *target_set (*want_set is blank anyway).
1071 }
1072 // else *err_code is success.
1073
1074 Sockets& want_set = m_want[ev_type];
1075 /* Note that this method is a template solely because of the following assignment. We could've made the user
1076 * perform this conversion themselves and taken a `boost::any` into an non-template method. We give them a little
1077 * sugar though. */
1078 const any sock_as_any = sock;
1079
1080 const bool did_erase = want_set.erase(sock_as_any) == 1;
1081 if (!did_erase)
1082 {
1084 return false;
1085 }
1086 // else *err_code is success.
1087
1088 return true;
1089} // Event_set::remove_wanted_socket()
1090
1091template<typename Rep, typename Period>
1092bool Event_set::sync_wait(const boost::chrono::duration<Rep, Period>& max_wait, Error_code* err_code)
1093{
1094 assert(max_wait.count() > 0);
1095 return sync_wait_impl(util::chrono_duration_to_fine_duration(max_wait), err_code);
1096}
1097
1098} // namespace flow::net_flow
Convenience class that simply stores a Logger and/or Component passed into a constructor; and returns...
Definition: log.hpp:1619
Interface that the user should implement, passing the implementing Logger into logging classes (Flow'...
Definition: log.hpp:1291
Equality predicate class used in storing various sockets of types wrapped as boost::anys in the Socke...
Definition: event_set.hpp:966
bool operator()(const boost::any &sock_as_any1, const boost::any &sock_as_any2) const
Returns whether the two objects, which must be stored in Sockets objects, are equal by value.
Definition: event_set.cpp:838
Hasher class used in storing various sockets of types wrapped as boost::anys in the Sockets type.
Definition: event_set.hpp:947
size_t operator()(const boost::any &sock_as_any) const
Returns hash value of the given object which must be stored in a Sockets object.
Definition: event_set.cpp:809
A user-set collection of sockets and desired conditions on those sockets (such as: "socket has data t...
Definition: event_set.hpp:254
Mutex m_mutex
Mutex protecting ALL data in this object.
Definition: event_set.hpp:937
static void clear_ev_type_to_socks_map(Ev_type_to_socks_map *ev_type_to_socks_map)
Helper that clears each Sockets set inside an Ev_type_to_socks_map.
Definition: event_set.cpp:747
bool sync_wait(Error_code *err_code=0)
Blocks indefinitely until one or more of the previously described events hold – or the wait is interr...
Definition: event_set.cpp:458
bool sync_wait_impl(const Fine_duration &max_wait, Error_code *err_code)
Same as the public sync_wait(max_wait) but uses a Fine_clock-based Fine_duration non-template type fo...
Definition: event_set.cpp:336
Event_type
Type of event or condition of interest supported by class Event_set.
Definition: event_set.hpp:307
static std::string ev_type_to_socks_map_sizes_to_str(const Ev_type_to_socks_map &ev_type_to_socks_map)
Helper that returns a loggable string summarizing the sizes of the socket sets, by type,...
Definition: event_set.cpp:761
State
A state of an Event_set.
Definition: event_set.hpp:260
bool async_wait_finish(Error_code *err_code=0)
Moves object from State::S_WAITING to State::S_INACTIVE, and forgets any handler saved by async_wait(...
Definition: event_set.cpp:215
bool events_detected(Error_code *err_code=0) const
Returns true if and only if the last wait, if any, detected at least one event.
Definition: event_set.cpp:576
bool poll(Error_code *err_code=0)
Checks for all previously described events that currently hold, saves them for retrieval via emit_res...
Definition: event_set.cpp:266
bool async_wait(const Event_handler &on_event, Error_code *err_code=0)
Moves object to State::S_WAITING state, saves the given handler to be executed later (in a different,...
Definition: event_set.cpp:72
util::Mutex_recursive Mutex
Short-hand for reentrant mutex type.
Definition: event_set.hpp:788
bool add_wanted_socket(typename Socket::Ptr sock, Event_type ev_type, Error_code *err_code=0)
Adds the given socket to the set of sockets we want to know are "ready" by the definition of the give...
Definition: event_set.hpp:1015
State m_state
See state(). Should be set before user gets access to *this. Must not be modified by non-W threads af...
Definition: event_set.hpp:901
Event_handler m_on_event
During State::S_WAITING, stores the handler (a void function with 1 bool argument) that will be calle...
Definition: event_set.hpp:926
bool swap_wanted_sockets(Sockets *target_set, Event_type ev_type, Error_code *err_code)
Efficiently exchanges the current set of sockets we want to know are "ready" by the definiton of the ...
Definition: event_set.cpp:493
bool emit_result_sockets(Sockets *target_set, Event_type ev_type, Error_code *err_code=0)
Gets the sockets that satisfy the condition of the given Event_type detected during the last wait.
Definition: event_set.cpp:613
static Ev_type_to_socks_map empty_ev_type_to_socks_map()
Creates a maximally empty Ev_type_to_socks_map: it will have all possible Event_type as keys but only...
Definition: event_set.cpp:736
Event_set(log::Logger *logger_ptr)
Constructs object; initializes all values to well-defined but possibly meaningless values (0,...
Definition: event_set.cpp:42
util::Lock_guard< Mutex > Lock_guard
Short-hand for RAII lock guard of Mutex. Use instead of boost::lock_guard for release() at least.
Definition: event_set.hpp:791
State state() const
Current State of the Event_set.
Definition: event_set.cpp:60
bool clear_wanted_sockets(Event_type ev_type, Error_code *err_code=0)
Identical to swap_wanted_sockets(&sockets, ev_type, err_code), where originally sockets is empty and ...
Definition: event_set.cpp:527
bool ok_to_mod_socket_set(Error_code *err_code) const
Helper that ensures the state of *this is such that one may modify the m_can and m_want socket sets.
Definition: event_set.cpp:675
bool events_wanted(Error_code *err_code=0) const
Returns true if and only if at least one wanted event for at least one socket is registered (via add_...
Definition: event_set.cpp:553
void close(Error_code *err_code=0)
Clears all stored resources (any desired events, result events, and any handler saved by async_wait()...
Definition: event_set.cpp:464
static const boost::unordered_map< Event_type, Function< bool(const Node *, const boost::any &)> > S_EV_TYPE_TO_IS_ACTIVE_NODE_MTD
Mapping from each possible Event_type to the Node method that determines whether the condition define...
Definition: event_set.hpp:897
Node * node() const
Node that produced this Event_set.
Definition: event_set.cpp:66
bool clear(Error_code *err_code=0)
Forgets all sockets stored in this object in any fashion.
Definition: event_set.cpp:705
Ev_type_to_socks_map m_can
The sockets, categorized by Event_type of interest, that were found to be "ready" (as defined in the ...
Definition: event_set.hpp:920
bool remove_wanted_socket(typename Socket::Ptr sock, Event_type ev_type, Error_code *err_code=0)
Opposite of add_wanted_socket().
Definition: event_set.hpp:1052
~Event_set()
Boring destructor. Note that deletion is to be handled exclusively via shared_ptr,...
Definition: event_set.cpp:54
bool m_baseline_check_pending
While in State::S_WAITING, if this is true, an exhaustive check of all desired events is yet to be pe...
Definition: event_set.hpp:934
static std::string sock_as_any_to_str(const boost::any &sock_as_any)
Helper that returns a loggable string representing the socket stored in the given boost::any that sto...
Definition: event_set.cpp:777
static bool ev_type_to_socks_map_entry_is_empty(const Ev_type_to_socks_map::Value &ev_type_and_socks)
Functional helper that checks whether a given pair in an Ev_type_to_socks_map contains an empty set o...
Definition: event_set.cpp:756
Ev_type_to_socks_map m_want
The sockets, categorized by Event_type of interest, to check for "ready" status (as defined in the do...
Definition: event_set.hpp:910
Node * m_node
See node(). Should be set before user gets access to *this. Must not be modified by non-W threads aft...
Definition: event_set.hpp:904
bool clear_result_sockets(Event_type ev_type, Error_code *err_code=0)
Identical to emit_result_sockets(&sockets, ev_type, err_code), where originally sockets is empty and ...
Definition: event_set.cpp:649
An object of this class is a single Flow-protocol networking node, in the sense that: (1) it has a di...
Definition: node.hpp:937
Properties of various container types.
Definition: traits.hpp:43
std::pair< Event_type const, Sockets > Value
Short-hand for key/mapped-value pairs stored in the structure.
An object of this class is a set that combines the lookup speed of an unordered_set<> and ordering an...
Iterator erase(Const_iterator const &it)
Erases the element pointed to by the given valid iterator.
Convenience class template that endows the given subclass T with nested aliases Ptr and Const_ptr ali...
#define FLOW_ERROR_EXEC_AND_THROW_ON_ERROR(ARG_ret_type, ARG_method_name,...)
Narrow-use macro that implements the error code/exception semantics expected of most public-facing Fl...
Definition: error.hpp:357
#define FLOW_ERROR_EMIT_ERROR(ARG_val)
Sets *err_code to ARG_val and logs a warning about the error using FLOW_LOG_WARNING().
Definition: error.hpp:202
#define FLOW_LOG_TRACE(ARG_stream_fragment)
Logs a TRACE message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
Definition: log.hpp:227
@ S_EVENT_SET_EVENT_ALREADY_EXISTS
Attempted to add an event into an event set, but that event already exists.
@ S_EVENT_SET_EVENT_DOES_NOT_EXIST
Attempted to work with an event that does not exist in the event set.
Flow module containing the API and implementation of the Flow network protocol, a TCP-inspired stream...
Definition: node.cpp:25
std::ostream & operator<<(std::ostream &os, const Congestion_control_selector::Strategy_choice &strategy_choice)
Serializes a Peer_socket_options::Congestion_control_strategy_choice enum to a standard ostream – the...
Definition: cong_ctl.cpp:146
boost::unique_lock< Mutex > Lock_guard
Short-hand for advanced-capability RAII lock guard for any mutex, ensuring exclusive ownership of tha...
Definition: util_fwd.hpp:265
boost::recursive_mutex Mutex_recursive
Short-hand for reentrant, exclusive mutex.
Definition: util_fwd.hpp:218
Fine_duration chrono_duration_to_fine_duration(const boost::chrono::duration< Rep, Period > &dur)
Helper that takes a non-negative duration of arbitrary precision/period and converts it to Fine_durat...
Definition: util.hpp:31
boost::system::error_code Error_code
Short-hand for a boost.system error code (which basically encapsulates an integer/enum error code and...
Definition: common.hpp:502
Fine_clock::duration Fine_duration
A high-res time duration as computed from two Fine_time_pts.
Definition: common.hpp:410