Flow 1.0.1
Flow project: Full implementation reference.
node.hpp
Go to the documentation of this file.
1/* Flow
2 * Copyright 2023 Akamai Technologies, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the
5 * "License"); you may not use this file except in
6 * compliance with the License. You may obtain a copy
7 * of the License at
8 *
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in
12 * writing, software distributed under the License is
13 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
14 * CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing
16 * permissions and limitations under the License. */
17
18/// @file
19#pragma once
20
22#include "flow/log/log.hpp"
31#include "flow/util/util.hpp"
32#include <boost/unordered_map.hpp>
33
34/**
35 * Flow module containing the API and implementation of the *Flow network protocol*, a TCP-inspired stream protocol
36 * that uses UDP as underlying transport. See the large doc header on class net_flow::Node for the "root" of all
37 * documentation w/r/t `net_flow`, beyond the present brief sentences.
38 *
39 * ### Historical note ###
40 * Historically, the Flow project only existed in the first place to deliver the functionality now in this
41 * `namespace` flow::net_flow. However, since then, `net_flow` has become merely one of several Flow modules, each
42 * providing functionality independent of the others'. In the past, all/most `net_flow{}`
43 * contents resided directly in `namespace` ::flow, but now it has been segregated into its own namespace.
44 *
45 * `net_flow` may still be, by volume, the largest module (hence also perhaps the largest user of general-use modules
46 * like flow::log and flow::util). Nevertheless, it is no longer "special."
47 *
48 * @see Main class net_flow::Node.
49 */
50namespace flow::net_flow
51{
52// Types.
53
54/**
55 * An object of this class is a single Flow-protocol networking node, in the sense that: (1) it has a distinct IP
56 * address and UDP port; and (2) it speaks the Flow protocol over a UDP transport layer. Here we summarize class Node
57 * and its entire containing Flow module flow::net_flow.
58 *
59 * See also flow::asio::Node, a subclass that allows for full use of our API (its superclass) and turns our sockets
60 * into boost.asio I/O objects, able to participate with ease in all boost.asio event loops. If you're already very
61 * familiar with `boost::asio::ip::tcp`, you can skip to the asio::Node doc header. If not, recommend becoming
62 * comfortable with the asio-less API, then read the forementioned asio::Node doc header.
63 *
64 * The flow::asio::Node class doc header (as of this writing) includes a compact summary of all network operations
65 * supported by the entire hierarchy and hence deserves a look for your convenience.
66 *
67 * Using flow::net_flow, starting with the present class Node
68 * ----------------------------------------------------------
69 *
70 * Node is an important and central class of the `netflow` Flow module and thus deserves some semi-philosophical
71 * discussion, namely what makes a Node a Node -- why the name? Let's delve into the 2 aforementioned properties of a
72 * Node.
73 *
74 * ### A Node has a distinct IP address and UDP port: util::Udp_endpoint ###
75 * A Node binds to an IP address and UDP port, both of which are given (with the usual ephemeral port and
76 * IP address<->interface(s) nomenclature) as an argument at Node::Node() construction and can never change over the
77 * lifetime of the object. The IP and port together are a util::Udp_endpoint, which is a `using`-alias of boost.asio's
78 * `boost::asio::ip::udp::endpoint` . In the same network (e.g., the Internet) no two Node
79 * objects (even in separate processes; even on different machines) may be alive (as defined by
80 * `Node::running() == true`) with constructor-provided util::Udp_endpoint objects `R1` and `R2` such that `R1 == R2`.
81 * In particular, if `Node n1` exists, with `n1.running()` and `n1.local_low_lvl_endpoint() == R1`, and on the same
82 * machine one attempts to construct `Node n2(R2)`, such that `R1 == R2` (their IPs and ports are equal), then `n2`
83 * will fail to properly construct, hence `n2.running() == false` will be the case, probably due to port-already-bound
84 * OS error. (There are counter-examples with NAT'ed IP addresses and special values 0.0.0.0 and port 0, but please
85 * just ignore those and other pedantic objections and take the spirit of what I am saying. Ultimately, the point
86 * is:
87 *
88 * <em>A successfully constructed (`running() == true`) Node occupies the same IP-and-UDP "real estate" as would a
89 * a mere successfully bound UDP socket.</em>
90 *
91 * So all that was a long, overbearing way to emphasize that a Node binds to an IP address and UDP port, and a single
92 * such combo may have at most one Node on it (unless it has `!running()`).
93 * *That's why it is called a Node*: it's a node on the network, especially on Internet.
94 *
95 * ### A Node speaks the *Flow network protocol* to other, remote Nodes ###
96 * If `Node n1` is successfully constructed, and `Node n2` is as well, the two can communicate via a new protocol
97 * implemented by this Flow module. This protocol is capable of working with stream (TCP-like) sockets
98 * implemented on top of UDP in a manner analogous to how an OS's net-stack implements
99 * TCP over IP. So one could call this Flow/UDP. One can talk Flow/UDP to another
100 * Flow/UDP endpoint (a/k/a Node) only; no compatibility with any other protocol is supported.
101 * (This isn't, for example, an improvement to one side of TCP that is still compatible with legacy TCPs on
102 * the other end; though that is a fertile area for research in its own right.) The socket can also operate in
103 * unreliable, message boundary-preserving mode, controllable via a Flow-protocol-native socket option; in which case
104 * reliability is the responsibility of the `net_flow` user. By default, though, it's like TCP: message bounds are not
105 * preserved; reliability is guaranteed inside the protocol. `n1` and `n2` can be local in the same process, or local
106 * in the same machine, or remote in the same overall network -- as long as one is routable to the other, they can talk.
107 *
108 * For practical purposes, it's important to have idea of a single running() Node's "weight." Is it light-weight like
109 * a UDP or TCP socket? Is it heavy-weight like an Apache server instance? The answer is that it's MUCH close to
110 * the former: it is fairly light-weight. As of this writing, internally, it stores a table of peer and server sockets
111 * (of which there could be a handful or tons, depending on the user's own API calls prior); and uses at least one
112 * dedicated worker thread (essentially not visible to the user but conceptually similar to a UDP or TCP stack user's
113 * view of the kernel: it does stuff for one in the background -- for example it can wait for incoming connections,
114 * if asked). So, a Node is an intricate but fairly light-weight object that stores socket tables (proportional in
115 * size to the sockets currently required by the Node's user) and roughly a single worker thread performing low-level
116 * I/O and other minimally CPU-intensive tasks. A Node can get busy if a high-bandwidth network is sending or
117 * receiving intense traffic, as is the case for any TCP or UDP net-stack. In fact, a Node can be seen as a little
118 * Flow-protocol stack implemented on top of UDP transport. (Historical note: `class Node` used to be `class Stack`,
119 * but this implied a heavy weight and misleadingly discouraged multiple constructions in the same program; all that
120 * ultimately caused the renaming to Node.)
121 *
122 * ### Essential properties of Flow network protocol (Flow ports, mux/demuxing) ###
123 * A single Node supports 0 or more (an arbitrary # of) peer-to-peer connections to other `Node`s.
124 * Moreover, given two `Node`s `n1` and `n2`, there can similarly be 0 or more peer-to-peer connections
125 * running between the two. In order to allow this, a port (and therefore multiplexing/demultiplexing) system is
126 * a feature of Flow protocol. (Whether this features is necessary or even desirable is slightly controversial and
127 * not a settled matter -- a to-do on this topic can be found below.)
128 *
129 * More specifically, think of a *given* `Node n1` as analogous (in terms of is multiplexing capabilities) to
130 * one TCP stack running on a one-interface machine. To recap the TCP port-addressing scheme (assuming only 1
131 * interface): The TCP stack has approximately 2^16 (~65k) ports available. One may create and "bind" a server "socket"
132 * to (more or less, for our discussion) any 1 of these ports. Let's say a server socket is bound to port P1.
133 * If a remote TCP stack successfully connects to such a server-bound port, this results in a passively-connected
134 * client "socket," which -- also -- is bound to P1 (bear with me as to how this is possible). Finally, the TCP
135 * stack's user may bind an *actively* connecting client "socket" to another port P2 (P2 =/= P1; as P1 is reserved
136 * to that server and passively connected clients from that server). Recall that we're contriving a situation where
137 * there is only one other remote stack, so suppose there is the remote, 1-interface TCP stack.
138 * Now, let's say a packet arrives along an established connection from this stack.
139 * How does our local TCP stack determine to which connection this belongs? This is
140 * "demultiplexing." If the packet contains the info "destination port: P2," then that clearly belongs to the
141 * actively-connected client we mentioned earlier... but what if it's "dest. port: P1"? This could belong to any
142 * number of connections originally passive-connected by incoming server connection requests to port P1.
143 * Answer: the packet also contains a "source TCP port" field. So the *connection ID*
144 * (a/k/a *socket ID*) consists of BOTH pieces of data: (1) destination (local) port; (2) source (remote) port.
145 * (Recall that, symmetrically, the remote TCP stack had to have a client bind to some port,
146 * and that means no more stuff can bind to that port; so it is unique and can't clash with anything else -- inside that
147 * remote stack.) So this tuple uniquely identifies the connection in this scenario of a single-interface local TCP
148 * that can have both active client sockets and passive-client-socket-spawning server sockets; and talk to other stacks
149 * like it. Of course, there can be more than one remote TCP stack. So the 2-tuple (pair) becomes a 3-tuple (triplet)
150 * in our slightly simplified version of reality: (1) destination (local) TCP port; (2) source (remote) IP address;
151 * and (3) source (remote) TCP port. (In reality, the local TCP stack can bind
152 * to different interfaces, so it becomes a 4-tuple by adding in destination (local) IP address... but that's TCP and
153 * is of no interest to our analogy to Flow protocol.)
154 *
155 * What about Flow protocol? GIVEN `n1` and `n2`, it works just the same. We have a special, TCP-like, Flow port space
156 * WITHIN `n1` and similarly within `n2`. So if only `n1` and `n2` are involved, an `n1` Server_socket (class) object
157 * can listen() (<-- actual method) on a net_flow::flow_port_t (<-- alias to 2-byte unsigned as of this writing)
158 * port P1; Server_socket::accept() (another method) incoming connections, each still bound to port P1; and `n1` can
159 * also actively connect() (another method) to `n2` at some port over there. Then an incoming UDP packet's
160 * intended established connection is demuxed to by a 2-tuple: (1) destination (local) `flow_port_t`;
161 * (2) source (remote) `flow_port_t`.
162 *
163 * In reality, other remote `Node`s can of course be involved: `n3`, `n4`, whatever. As we've established, each Node
164 * lives at a UDP endpoint: util::Udp_endpoint (again, IP address + UDP port). Therefore, from the stand-point of
165 * a given local `Node n1`, each established peer-to-peer connection is identified fully by the 5-tuple (marked here
166 * with roman numerals):
167 * 1. Local `flow_port_t` within `n1`'s port-space (not dissimilar to TCP's port space in size and behavior). (I)
168 * 2. Remote endpoint identifying the remote Node: Remote_endpoint.
169 * 1. util::Udp_endpoint.
170 * 1. IP address. (II)
171 * 2. UDP port. (III)
172 * 3. Remote net_flow::flow_port_t. (IV)
173 *
174 * So, that is how it works. Of course, if this complexity is not really necessary for some application, then
175 * only really (II) and (III) are truly necessary. (I) and (IV) can just be chosen to be some agreed-upon
176 * constant port number. Only one connection can ever exist in this situation, and one would need to create
177 * more `Node`s one side or the other or both to achieve more connections between the same pair of IP addresses,
178 * but that's totally reasonable: it's no different from simply binding to more UDP ports. My point here is that
179 * the Flow-protocol-invented construct of "Flow ports" (given as `flow_port_t` values) can be used to conserve UDP
180 * ports; but they can also be NOT used, and one can just use more UDP ports, as a "regular" UDP-using pair of
181 * applications would, if more than one flow of information is necessary between those two apps. It is up to you.
182 * (Again, some arguments can be made for getting rid of (I) and (IV), after all. This possibility is discussed in
183 * a below to-do.)
184 *
185 * (Do note that, while we've emulated TCP's port scheme, there is no equivalent of IP's "interfaces." Each Node
186 * just has a bunch of ports; there is no port table belonging to each of N interfaces or any such thing.)
187 *
188 * ### flow::net_flow API overview ###
189 * This is a summary (and some of this is very briefly mentioned above); all the classes and APIs are much more
190 * deeply documented in their own right. Also, see above pointer to asio::Node whose doc header may be immediately
191 * helpful to experienced users. Meanwhile, to summarize:
192 *
193 * The Node hands out sockets as Peer_socket objects; it acts as a factory for them (directly) via its connect() and
194 * (indirectly) Server_socket::accept() families of methods. It is not possible to construct a Peer_socket
195 * independently of a Node, due to tight coordination between the Node and each Peer_socket. Moreover each Peer_socket
196 * is handed out via `boost::shared_ptr` smart pointer. While not strictly necessary, this is a situation where both
197 * the user and a central registry (Node) can own the Peer_socket at a given time, which is an ideal application for
198 * `shared_ptr<>` that can greatly simplify questions of object ownership and providing auto-`delete` to boot.
199 *
200 * Thus: `Node::listen(flow_port_t P)` yields a Server_socket::Ptr, which will listen for incoming connections on `P`.
201 * Server_socket::accept() (and similar) yields a Peer_socket::Ptr, one side of a peer-to-peer connection.
202 * On the other side, `Node::connect(Remote_endpoint R)` (where `R` contains `Udp_endpoint U`, where
203 * value equal to `U` had been earlier passed to constructor of the `listen()`ing `Node`; and `R` also contains
204 * `flow_port_t P`, passed to `Node::listen()`). connect(), too, yields a Peer_socket::Ptr. And thus, if all went
205 * well, each side now has a Peer_socket::Ptr `S1` and `S2`, which -- while originating quite differently --
206 * are now completely equal in capabilities: they are indeed *peer* sockets. They have methods like Peer_socket::send()
207 * and Peer_socket::receive().
208 *
209 * Further nuances can be explored in the various other doc headers, but I'll mention that both non-blocking behavior
210 * (meaning the call always returns immediately, even if unable to immediately perform the desired task such as
211 * accept a connection or receive 1 or more bytes) and blocking behavior as supported, as in (for example) a BSD
212 * sockets API. However, there is no "blocking" or "non-blocking" mode as in BSD or WinSock (personally I, Yuri, see it
213 * as an annoying anachronism). Instead you simply call a method named according to whether it will never block or
214 * (possibly -- if appropriate) block. The nomenclature convention is as follows: if the action is `X` (e.g.,
215 * `X` is `receive` or `accept`), then `->X()` is the non-blocking version; and `->sync_X()` is the blocking one.
216 * A non-blocking version always exists for any possible action; and a blocking version exists if it makes sense for it
217 * to exist. (Exception: Event_set::async_wait() explicitly includes `async_` prefix contrary to this convention.
218 * Partially it's because just calling it `wait()` -- convention or not -- makes it sound like it's going to block,
219 * whereas it emphatically will never do so. ALSO it's because it's a "special" method with unique properties
220 * including letting user execute their own code in a Node's internal worker thread. So rules go out the window a
221 * little bit for that method; hence the slight naming exception.)
222 *
223 * ### Nomenclature: "low-level" instead of "UDP" ###
224 * Side note: You will sometimes see the phrase `low_lvl` in various identifiers among `net_flow` APIs.
225 * `low_lvl` (low-level) really means "UDP" -- but theoretically some other packet-based transport could be used
226 * instead in the future; or it could even be an option to chooose between possible underlying protocols.
227 * For example, if `net_flow` moved to kernel-space, the transport could become IP, as it is for TCP.
228 * So this nomenclature is a hedge; and also it argubly is nicer/more generic: the fact it's UDP is immaterial; that
229 * it's the low-level (from our perspective) protocol is the salient fact. However, util::Udp_endpoint is thus named
230 * because it is very specifically a gosh-darned UDP port (plus IP address), so hiding from that by naming it
231 * `Low_Lvl_endpoint` (or something) seemed absurd.
232 *
233 * ### Event, readability, writability, etc. ###
234 * Any experienced use of BSD sockets, WinSock, or similar is probably wondering by now, "That sounds reasonable, but
235 * how does the API allow me to wait until I can connect/accept/read/write, letting me do other stuff in the meantime?"
236 * Again, one can use a blocking version of basically every operation; but then the wait for
237 * readability/writability/etc. may block the thread. One can work around this by creating multiple threads, but
238 * multi-threaded coding introduced various difficulties. So, the experienced socketeer will want to use non-blocking
239 * operations + an event loop + something that allow one to wait of various states (again, readability, writability,
240 * etc.) with various modes of operation (blocking, asynchronous, with or without a timeout, etc.).
241 * The most advanced and best way to get these capabilities is to use boost.asio integration (see asio::Node).
242 * As explained elsewhere (see Event_set doc header) this is sometimes not usable in practice. In that case:
243 * These capabilities are supplied in the class Event_set. See that class's doc header for further information.
244 * Event_set is the `select()` of this socket API. However it is significantly more convenient AND indeed supports
245 * a model that will allow one to use Flow-protocol sockets in a `select()`- or equivalent-based event loop, making
246 * `net_flow` module usable in a true server, such as a web server. That is, you don't just have to write a separate
247 * Flow event loop operating independently of your other sockets, file handles, etc. This is an important property in
248 * practice. (Again: Ideally you wouldn't need Event_set for this; asio::Node/etc. might be better to use.)
249 *
250 * ### Error reporting ###
251 * Like all Flow modules, `net_flow` uses error reporting conventions/semantics introduced in `namespace` ::flow
252 * doc header Error Reporting section.
253 *
254 * In particular, this module does add its own error code set. See `namespace` net_flow::error doc header which
255 * should point you to error::Code `enum`. All error-emitting `net_flow` APIs emit `Error_code`s assigned from
256 * error::Code `enum` values.
257 *
258 * ### Configurability, statistics, logging ###
259 * Great care is taken to provide visibility into the "black box" that is Flow-protocol. That is, while the API follows
260 * good practices wherein implementation is shielded away from the user, at the same time the *human* user has powerful
261 * tools to both examine the insides of the library/protocol's performance AND to tweak the parameters of its
262 * behavior. Picture a modern automobile: while you're driving at least, it's not going to let you look at or mess with
263 * its engine or transmission -- nor do you need to understand how they work; BUT, the onboard monitor
264 * will feature screens that tell you about its fuel economy performance, the engine's inner workings, and perhaps a
265 * few knobs to control the transmission's performance (for example). Same principles are followed here.
266 *
267 * More specifically:
268 * - *Configuration* Socket options are supported via Node_options and Peer_socket_options. These control many
269 * aspects of the library's behavior, for example which congestion control algorithm to use.
270 * These options can be set programmatically, through a config file, or through command line options.
271 * Particular care was taken to make the latter two features seamlessly available by
272 * leveraging boost.program_options.
273 * - *Statistics* Very detailed stats are kept in Peer_socket_receive_stats and Peer_socket_send_stats, combined
274 * with more data in Peer_socket_info. These can be accessed programmatically; their individual stats can also
275 * be accessed programmatically; or they can be logged to any `ostream`. Plus, the logging system periodically logs
276 * them (assuming this logging level is enabled).
277 * - *Logging* Like all Flow modules, `net_flow` uses logging conventions/semantics introduced in `namespace` ::flow
278 * doc header Logging section.
279 *
280 * ### Multiple Node objects ###
281 * As mentioned already many times, multiple Node objects can exist and function simultaneously (as long as they
282 * are not bound to the same conceptual util::Udp_endpoint, or to the same UDP port of at least one IP interface).
283 * However, it is worth emphasizing that -- practically speaking -- class Node is implemented in such a way as to make
284 * a given Node 100% independent of any other Node in the same process. They don't share working thread(s), data
285 * (except `static` data, probably just constants), any namespaces, port spaces, address spaces, anything. Each Node
286 * is independent both API-wise and in terms of internal implementation.
287 *
288 * ### Thread safety ###
289 * All operations safe for simultaneous execution on 2+ separate Node objects *or on the same Node*,
290 * or on any objects (e.g., Peer_socket) returned by Node. (Please note the *emphasized* phrase.)
291 * "Operations" are any Node or Node-returned-object method calls after construction and before destruction of the
292 * Node. (In particular, for example, one thread can listen() while another connect()s.) The same guarantee may or
293 * may not apply to other classes; see their documentation headers for thread safety information.
294 *
295 * ### Thread safety of destructor ###
296 * Generally it is not safe to destruct a Node (i.e., let Node::~Node() get called) while a Node operation is in
297 * progress on that Node (obviously, in another thread). There is one exception to this: if a blocking operation
298 * (any operation with name starting with `sync_`) has entered its blocking (sleep) phase, it is safe to delete the
299 * underlying Node. In practice this means simply that, while you need not lock a given Node with an external
300 * mutex while calling its various methods from different threads (if you really must use multiple threads this way),
301 * you should take care to probably join the various threads before letting a Node go away.
302 *
303 * Historical note re. FastTCP, Google BBR
304 * ---------------------------------------
305 *
306 * ### Historical note re. FastTCP ###
307 * One notable change in this `net_flow` vs. the original libgiga is this
308 * one lacks the FastTCP congestion control strategy. I omit the historical reasons for this for now
309 * (see to-do regarding re-introducing licensing/history/location/author info, in common.hpp).
310 *
311 * Addendum to the topic of congestion control: I am not that interested in FastTCP, as I don't see it as cutting-edge
312 * any longer. I am interested in Google BBR. It is a goal to implement Google BBR in `net_flow`, as that congestion
313 * control algorithm is seen by many as simply the best one available; a bold conclusion given how much research
314 * and given-and-take and pros-and-cons discussions have tramspired ever since the original Reno TCP became ubiquitous.
315 * Google BBR is (modulo whatever proprietary improvements Google chooses to implement in their closed-source software)
316 * publicly documented in research paper(s) and, I believe, available as Google open source.
317 *
318 * @todo flow::net_flow should use flow::cfg for its socket-options mechanism. It is well suited for that purpose, and
319 * it uses some ideas from those socket-options in the first place but is generic and much more advanced. Currently
320 * `net_flow` socket-options are custom-coded from long before flow::cfg existed.
321 *
322 * @todo `ostream` output operators for Node and asio::Node should exist. Also scour through all types; possibly
323 * some others could use the same. (I have been pretty good at already implementing these as-needed for logging; but
324 * I may have "missed a spot.")
325 *
326 * @todo Some of the `ostream<<` operators we have take `X*` instead of `const X&`; this should be changed to the latter
327 * for various minor reasons and for consistency.
328 *
329 * @todo Actively support IPv6 and IPv4, particularly in dual-stack mode (wherein net_flow::Server_socket would bind to
330 * an IPv6 endpoint but accept incoming V4 and V6 connections alike). It already supports it nominally, in that one
331 * can indeed listen on either type of address and connect to either as well, but how well it works is untested, and
332 * from some outside experience it might involve some subtle provisions internally.
333 *
334 * @todo Based on some outside experience, there maybe be problems -- particularly considering the to-do regarding
335 * dual-stack IPv6/v4 support -- in servers listening in multiple-IP situations; make sure we support these seamlessly.
336 * For example consider a machine with a WAN IP address and a LAN (10.x.x.x) IP address (and possibly IPv6 versions
337 * of each also) that (as is typical) binds on all of them at ANY:P (where P is some port; and ANY is the IPv6 version,
338 * with dual-stack mode ensuring V4 datagrams are also received). If a client connects to a LAN IP, while in our
339 * return datagrams we set the source IP to the default, does it work? Outside experience shows it might not,
340 * depending, plus even if in our protocol it does, it might be defeated by some firewall... the point is it requires
341 * investigation (e.g., mimic TCP itself; or look into what IETF or Google QUIC does; and so on).
342 *
343 * @internal
344 *
345 * Implementation notes
346 * --------------------
347 *
348 * In this section, and within implementation, I may simply say "Flow" instead of "Flow [network] protocol," for
349 * brevity. This is not meant to represent all of the containing Flow project nor refer to any module other
350 * that `net_flow`.
351 *
352 * ### Note on general design philosophy ###
353 * The protocol is TCP-like. However, it is not TCP. Moreover, it is not
354 * TCP even if you remove the fact that it sends UDP datagrams instead of IP packets. It follows the basic
355 * goals of TCP (stream of bytes, reliability, congestion control, etc.), and it borrows many of its internal
356 * techniques (congestion window, receive window ACKs, drop timeout, etc.), but it specifically will not
357 * inherit those limitations of TCP that are essentially historic and/or a result of having to be backwards-
358 * compatible with the other side which may be behind. For example, all ACKs in Flow are selective; whereas in
359 * TCP ACKs are generally cumulative, while selective ACKs (SACKs) are an advanced option that the other side
360 * may or may not support. Making certain modern decisions in Flow that TCP implementations simply cannot
361 * make means (1) a simpler protocol and implementation; and (2) potentially higher performance before we can
362 * try any advanced stuff like new congestion control strategies or FEC (forward error correction).
363 *
364 * Moreover, I tried to take care in not copying various classic TCP RFCs mindlessly. Instead, the idea was
365 * to evaluate the spirit, or intent, of each technical detail of a given RFC -- and then translate it into
366 * the (hopefully) more elegant and less historical baggage-encumbered world of Flow. This is particularly
367 * something I felt when translating the terse congestion control-related TCP RFCs into `net_flow` C++ code. (For
368 * a more specific explanation of what I mean, check out the "general design note" in the class doc header of
369 * the class Congestion_control_strategy.) Hopefully this means terse, concern-conflating TCP RFCs (and at
370 * times Linux kernel's somewhat less concern-conflating implementations of these RFCs) become clean, concern-
371 * separated Flow code. This means, also, a rather extremely high ratio of comments to code in many areas
372 * of this project. I wanted to clearly explain every non-trivial decision, even if it meant many long-winded
373 * English explanations.
374 *
375 * ### Basic implementation ###
376 * Constructor creates new thread, henceforth called thread W, which houses the Node's
377 * main loop and performs all blocking work (and generally most work) for the class; this exists until the
378 * destructor executes. In general, for simplicity (in a way) and consistency, even when a given call (e.g.,
379 * non-blocking connect()) could perform some preliminary work (e.g., argument checking and ephemeral port
380 * reservation) directly in the caller's thread and then place a callback (e.g., connect_worker()) on W for
381 * the real work (e.g., filling out packet, sending SYN, etc.), we instead choose to put all the work (even
382 * the aforementioned preliminary kind) onto W and (non-blockingly!) wait for that callback to finish, via a
383 * boost.thread `future`. While a bit slower, I find this simplifies the code by breaking it up less and
384 * keeping it in one place in the code base for a given operation (in this example, connect()). It can also
385 * reduce mutex usage in our code. Also it enables a slightly better user experience, as errors are reported
386 * earlier in the user code and state changes are not asynchronous, when they don't need to be.
387 *
388 * The above can apply to such things as non-blocking connect(), listen(). It probably wouldn't apply
389 * to Peer_socket::send(), Peer_socket::receive(), Server_socket::accept() for performance reasons.
390 *
391 * ### Note on threads W and U ###
392 * In classic Berkeley sockets, one often thinks of "the kernel" performing certain
393 * work in "the background," the results of which the user level code might access. For example, the kernel
394 * might receive IP packets on some port and deserialize them into a socket's receive buffer -- while the
395 * user's program is busy doing something completely unrelated like showing graphics in a video game -- then
396 * the `recv()` call would later transfer the deserialized stream into the user's own user-level buffer (e.g., a
397 * `char[]` array). In our terminology, "thread W" is the equivalent of "the kernel" doing stuff "in the
398 * background"; while "thread U" refers to the user's own thread where they do other stuff and at times access
399 * the results of the "kernel background" stuff. However, Flow is a user-level tool, so thread W is not
400 * actually running in the kernel... but conceptually it is like it.
401 *
402 * ### boost.asio ###
403 * The implementation heavily uses the boost.asio library, as recommended by ::flow doc
404 * header boost.asio section. This implements the main loop's flow
405 * (with a callback-based model), all UDP traffic, timers.
406 * Why use boost.asio and not home-made networking and event loop code? There are a few good reasons. For
407 * UDP networking, boost.asio provides a pain-free, portable, object-oriented wrapper around BSD
408 * sockets/WinSock (that nevertheless exposes the native resources like sockets/FDs *if* needed). For a high-
409 * performance event loop, boost.asio gives a flexible *proactor pattern* implementation, supporting arbitrary
410 * callbacks with lambdas or `bind()`, which is much easier and prettier than C-style function pointers with `void*`
411 * arguments a-la old-school HTTP servers written in C. For a complex event-driven loop like this, we'd have
412 * to implement some callback system anyway, and it would likely be far worse than boost.asio's, which is
413 * full-featured and proven through the years.
414 *
415 * To-dos and future features
416 * --------------------------
417 *
418 * @todo Receive UDP datagrams as soon as possible (avoid internal buffer overflow):
419 * The OS UDP net-stack buffers arriving datagrams until they're `recv()`d by the application
420 * layer. This buffer is limited; on my Linux test machine the default appears to buffer ~80 1k datagrams.
421 * With a high sender CWND and high throughput (e.g., over loopback), thread W -- which both reads off UDP
422 * datagrams and handles them, synchronously -- cannot always keep up, and the buffer fills up. This
423 * introduces Flow loss despite the fact that the datagram actually safely got to the destination; and this is
424 * with just ONE sender; in a server situation there could be thousands. In Linux I was able to raise, via
425 * `setsockopt()`, the buffer size to somewhere between 1000 and 2000 1k datagrams. This helps quite a bit.
426 * However, we may still overflow the buffer in busy situations (I have seen it, still with only one
427 * connection). So, the to-do is to solve this problem. See below to-dos for ideas.
428 * WARNING: UDP buffer overflow can be hard to detect and may just look like loss; plus the user not
429 * reading off the Receive buffer quickly enough will also incur similar-looking loss. If there
430 * were a way to detect the total # of bytes or datagrams pending on a socket, that would be cool,
431 * but `sock.available()` (where `sock` is a UDP socket) just gives the size of the first queued datagram.
432 * Congestion control (if effective) should prevent this problem, if it is a steady-state situation (i.e.,
433 * loss or queueing delay resulting from not keeping up with incoming datagrams should decrease CWND).
434 * However, if it happens in bursts due to high CPU use in the environment, then that may not help.
435 * NOTE 1: In practice a Node with many connections is running on a server and thus doesn't
436 * receive that much data but rather sends a lot. This mitigates the UDP-receive-buffer-overflow
437 * problem, as the Node receiving tons of data is more likely to be a client and thus have only one
438 * or two connections. Hence if we can handle a handful of really fast flows without such loss,
439 * we're good. (On other hand, ACKs are also traffic, and server may get a substantial amount of
440 * them. Much testing is needed.) Packet pacing on the sender side may also avoid this loss
441 * problem; on the other hand it may also decrease throughput.
442 * NOTE 2: Queue delay-based congestion control algorithms, such as FastTCP and Vegas, are highly
443 * sensitive to accurate RTT (round trip time) readings. Heavy CPU load can delay the recording of the
444 * "received" time stamp, because we call UDP `recv()` and handle the results all in one thread. Any solution,
445 * such as the dedicated thread proposed below, would _allow_ one to record the time stamp immediately upon receipt
446 * of the packet by the dedicated thread; while W would independently handle everything else. Is that a good
447 * idea though? Maybe not. If the CPU load is such that ACK-receiver-side can't get to the time-stamp-saving,
448 * RTT-measuring step without tricks like doing it immediately upon some low-level datagram receipt hook, then
449 * that CPU-pegged jungle is, in a way, part of the network and should probably be fairly counted as part of
450 * the RTT. So perhaps we should continue to take the RTT time stamp while actually handling the individual
451 * acknowledgments. Instead we should strive to use multi-core resources efficiently, so that the gap between
452 * receipt (on whatever thread) and acknowledgment processing (on whatever thread) is as small as possible.
453 * Then RTT is RTT, but we make it smaller via improved performance. Meanwhile, we hopefully also solve the
454 * original problem (internal kernel buffer overflowing and dropping datagrams).
455 *
456 * @todo Receive UDP datagrams as soon as possible (avoid internal buffer overflow): APPROACH 1 (CO-WINNER!):
457 * One approach is to note that, as of this writing, we call `m_low_lvl_sock.async_receive(null_buffers)`;
458 * the `null_buffers` value for the buffers arg means that the handler is called without any actual UDP
459 * receive is performed by boost.asio; our handler is called once there is at least 1 message TO read;
460 * and then indeed our handler does read it (and any more messages that may also have arrived).
461 * Well, if we pass in an actual buffer instead, then boost.asio will read 1 (and no more, even if there are more)
462 * message into that buffer and have it ready in the handler. Assuming the mainstream case involves only 1
463 * message being ready, and/or assuming that reading at least 1 message each time ASAP would help significantly,
464 * this may be a good step toward relieving the problem, when it exists. The code becomes a tiny bit less
465 * elegant, but that's negligible. This seems like a no-brainer that should be included in any solution, but it
466 * by itself may not be sufficient, since more than 1 datagram may be waiting, and datagrams 2, 3, ... would
467 * still have to be read off by our code in the handler. So other approaches should still be considered.
468 *
469 * @todo Receive UDP datagrams as soon as possible (avoid internal buffer overflow): APPROACH 2:
470 * To eliminate the problem to the maximum extent possible, we can dedicate its own thread --
471 * call it W2 -- to reading #m_low_lvl_sock. We could also write to #m_low_lvl_sock on W2 (which would also
472 * allow us to use a different util::Task_engine from #m_task_engine to exclusively deal with W2 and
473 * the UDP socket #m_low_lvl_sock -- simpler in some ways that the strand-based solution described below).
474 * There are a couple of problems with this. One, it may delay receive ops due to send ops, which compromises
475 * the goals of this project in the first place. Two, send pacing code is in thread W (and moving it to W2
476 * would be complex and unhelpful); and once the decision to REALLY send a given datagram has been made,
477 * this send should occur right then and there -- queuing that task on W2 may delay it, compromising the
478 * quality of send pacing (whose entire nature is about being precise down to the microsecond or even better).
479 * Therefore, we'd like to keep `m_low_lvl_sock.async_send()` in thread W along with all other work (which
480 * allows vast majority of internal state to be accessed without locking, basically just the Send/Receive
481 * buffers excluded); except the chain of [`m_low_lvl_sock.async_receive()` -> post handler of received datagram
482 * onto thread W; and immediately `m_low_lvl_sock.async_receive()` again] would be on W2.
483 * AS WRITTEN, this is actually hard or impossible to do with boost.asio because of its design: #m_low_lvl_sock
484 * must belong to exactly one `Task_engine` (here, #m_task_engine), whereas to directly schedule a specific
485 * task onto a specific thread (as above design requires) would require separate `Task_engine` objects (1 per
486 * thread): boost.asio guarantees a task will run on *a* thread which is currently executing `run()` --
487 * if 2 threads are executing `run()` on the same service, it is unknown which thread a given task will land
488 * upon, which makes the above design (AS WRITTEN) impossible. (Side note: I'm not sure it's possible in plain C
489 * with BSD sockets either. A naive design, at least, might have W `select()` on `m_low_lvl_sock.native()` for
490 * writability as well other stuff like timers, while W2 `select()`s on same for readability; then the two
491 * threads perform UDP `send()` and `recv()`, respectively, when so instructed by `select()`s. Is it allowed
492 * to use `select()` on the same socket concurrently like that? StackOverflow.com answers are not clear cut, and
493 * to me, at least, it seems somewhat dodgy.) However, an equivalent design IS possible (and supported cleanly by
494 * boost.asio): In addition to the 2 threads, set up 2 strands, S and S2. All work except the #m_low_lvl_sock
495 * reads and posting of the handler onto S will be scheduled with a strand S. All work regarding
496 * #m_low_lvl_sock reads and posting of its handler onto S will be scheduled with a strand S2.
497 * Recall that executing tasks in 1 strand, with 2+ threads executing `run()`, guarantees that no 2+ of
498 * those tasks will execute simultaneously -- always in series. This is actually -- in terms of efficiency
499 * and thread safety -- equivalent to the above W/W2 design. Since the S tasks will always execute serially,
500 * no locking is necessary to prevent concurrent execution; thus what we know today as thread W tasks (which
501 * need no locking against each other) will be equally thread safe; and same holds for the new S2 tasks
502 * (which are considerably simpler and fewer in number). So that's the thread safety aspect; but why is
503 * efficiency guaranteed? The answer becomes clear if one pictures the original thread W/W2 design;
504 * basically little task blocks serially pepper thread W timeline; and same for W2. By doing it with strands
505 * S and S2 (running on top of threads W and W2), the only thing that changes is that the little blocks
506 * might at random be swapped between threads. So the series of tasks T1 -> T2 -> T3 -> T4 meant for
507 * for S might actually jump between W and W2 randomly; but whenever thread W2 is chosen instead of
508 * thread W, that leaves an empty "space" in thread W, which will be used by the S2 task queue if it
509 * needs to do work at the same time. So tasks may go on either thread, but the strands ensure
510 * that both are used with maximum efficiency while following the expected concurrency constraints
511 * (that each strand's tasks are to be executed in series). Note, however, that #m_low_lvl_sock (the
512 * socket object) is not itself safe for concurrent access, so we WILL need a lock to protect the tiny/short
513 * calls `m_low_lvl_sock.async_receive()` and `m_low_lvl_sock.async_send()`: we specifically allow that
514 * a read and write may be scheduled to happen simultaneously, since the two are independent of each
515 * other and supposed to occur as soon as humanly possible (once the desire to perform either one
516 * is expressed by the code -- either in the pacing module in strand S or the read handler in S2).
517 * In terms of nomenclature, if we do this, it'd be more fair to call the threads W1 and W2 (as they
518 * become completely equal in this design). (In general, any further extensions of this nature (if
519 * we want still more mostly-independent task queues to use available processor cores efficiently),
520 * we would add 1 strand and 1 worker thread per each such new queue. So basically there's a thread pool
521 * of N threads for N mostly-independent queues, and N strands are used to use that pool efficiently
522 * without needing to lock any data that are accessed exclusively by at most 1 queue's tasks only.
523 * Resources accessed by 2 or more task queues concurrently would need explicit locking (e.g.,
524 * #m_low_lvl_sock in this design).) So then where we start thread W today, we'd start the thread
525 * pool of 2 threads W1, W2, with each executing `m_task_engine.run()`. Before the run()s execute,
526 * the initial tasks (each wrapped in strand S or S2, as appropriate) need to be posted onto
527 * #m_task_engine; this can even occur in the user thread U in the constructor, before W1 and W2
528 * are created. The destructor would `m_task_engine.stop()` (like today), ending each thread's
529 * `run()` and trigger the imminent end of that thread; at which point destructor can `W1.join()` and `W2.join()`
530 * (today it's just `W.join()`).
531 *
532 * @todo Receive UDP datagrams as soon as possible (avoid internal buffer overflow): APPROACH 3:
533 * Suppose we take APPROACH 1 (no-brainer) plus APPROACH 2. Certain decisions in the latter were made for
534 * certain stated reasons, but let's examine those more closely. First note that the APPROACH 1 part will
535 * ensure that, given a burst of incoming datagrams, the first UDP `recv()` will occur somewhere inside boost.asio,
536 * so that's definitely a success. Furthermore, strand S will invoke `m_low_lvl_sock.async_send()` as soon as
537 * the pacing module decides to do so; if I recall correctly, boost.asio will invoke the UDP `send()` right then
538 * and there, synchronously (at least I wrote that unequivocally in a Node::async_low_lvl_packet_send_impl() comment).
539 * Again, that's as good as we can possibly want. Finally, for messages 2, 3, ... in that incoming datagram burst,
540 * our handler will (indirectly but synchronously) perform the UDP `recv()`s in strand S2. Here we're somewhat
541 * at boost.asio's mercy, but assuming its strand task scheduling is as efficient as possible, it should occur
542 * on the thread that's free, and either W1 or W2 should be free given the rest of the design. Still, boost.asio
543 * docs even say that different strands' tasks are NOT guaranteed to be invoked concurrently (though common
544 * sense implies they will be when possible... but WHAT IF WE'RE WRONG!!!?). Also, we don't know how much
545 * computational overhead is involved in making strands work so nicely (again, hopefully it's well written...
546 * but WHAT IF!!!?). A negative is the small mutex section around the two #m_low_lvl_sock calls; not complex
547 * and probably hardly a performance concern, but still, it's a small cost. Finally, using strands -- while
548 * not ugly -- does involve a bit more code, and one has to be careful not to forget to wrap each handler with
549 * the appropriate strand (there is no compile- or runtime error if we forget!) So what can we change about
550 * APPROACH 2 to avoid those negatives? As stated in that approach's description, we could have thread W
551 * not deal with #m_low_lvl_sock at all; thread W2 would have a separate `Task_engine` handling only
552 * #m_low_lvl_sock (so no mutex needed). W2 would do both sends and receives on the socket; and absolutely
553 * nothing else (to ensure it's as efficient as possible at getting datagrams off the kernel buffer, solving
554 * the original problem). Yes, the receiving would have to share time with the sends, but assuming nothing
555 * else interferes, this feels like not much of a cost (at least compared with all the heavy lifting thread W
556 * does today anyway). Each receive would read off all available messages into raw buffers and pass those
557 * (sans any copying) on to thread W via `post(W)`. The other negative, also already mentioned, is that
558 * once pacing module (in thread W) decides that a datagram should be sent, the `post(W2)` for the task that
559 * would peform the send introduces a delay between the decision and the actual UDP `send()` done by boost.asio.
560 * Thinking about it now, it is questionable to me how much of a cost that really is. Without CPU contention,
561 * we can measure it; I expect it to be quite cheap, but I coudl be wrong. With CPU contention -- which would
562 * have to come from many datagrams arriving at the same time -- I'm not sure. It wouldn't be overly hard to
563 * test; basically flood with UDP traffic over loopback and log the delay between W deciding to send datagram
564 * and W2 calling `m_low_lvl_sock.async_send_to()` (obviously use #Fine_clock!). All in all, if we name
565 * the dedicated thread approach described here as APPROACH 3, then APPROACH 3 is appealingly simpler than
566 * APPROACH 2; and in most ways appears like it'd be at least as efficient and good at solving the original
567 * problem as APPROACH 2. The only danger that worries me is this business with messing up pacing (and no,
568 * moving pacing into W2 just endangers the receiving efficiency and introduces thread safety problems and
569 * complexity) by having it compete with receiving during incoming-traffic-heavy times. Ultimately, I'd
570 * recommend timing this "danger zone" as described a bit earlier (also time delay introduced by `post(W2)`
571 * without any traffic coming in); and if it looks good, do APPROACH 3. Otherwise spring for APPROACH 2.
572 *
573 * @todo Receive UDP datagrams as soon as possible (avoid internal buffer overflow): APPROACH 4:
574 * It now occurs to me how to solve the main questionable part about APPROACH 3. If we fear that the reads and
575 * writes in thread W2 may compete for CPU, especially the reads delaying timing-sensitive paced writes,
576 * then we can eliminate the problem by taking W2's own util::Task_engine (which would be separate from
577 * #m_task_engine) and have two equal threads W2' and W2'' start up and then each call `Task_engine::run()`.
578 * Without having to use any strand, this will essentially (as documented in boost.asio's doc overview)
579 * establish a thread pool of 2 threads and then perform the receive and send tasks at random on whichever
580 * thread(s) is/are available at a particular time. Since there's no strand(s) to worry about, the underlying
581 * overhead in boost.asio is probably small, so there's nothing to fear about efficiency. In fact, in this
582 * case we've now created 3 separate threads W, W2', W2'', all doing things independently of each other, which
583 * is an excellent feature in terms of using multiple cores. Do note we will now need a mutex and very short
584 * critical sections around the calls to `m_low_lvl_sock::async_receive()` and `m_low_lvl_sock::async_send()`,
585 * but as noted before this seems extremely unlikely to have any real cost due to the shortess of critical
586 * sections in both threads. If this is APPROACH 4, then I'd say just time how much absolute delay is
587 * introduced by a `post(W2')` or `post(W2'')` of the async send call compared to directly making such a
588 * call on thread W, as is done today. I suspect it's small, in which case the action is go for
589 * APPROACH 4... finally.
590 *
591 * @todo Receive UDP datagrams as soon as possible (avoid internal buffer overflow): APPROACH 5 (CO-WINNER!):
592 * Actually, the thing I've been dismissing in approaches 2-4, which was to combine the pacing logic with the
593 * actual `m_low_lvl_sock.async_send()` (like today) but in their own dedicated thread, now seems like the best
594 * way to solve the one questionable aspect of APPROACH 4. So, APPROACH 4, but: Move the pacing stuff into
595 * the task queue associated with threads W2' and W2''. So then these 2 threads/cores will be available for
596 * 2 task queues: one for pacing timers + datagram sending over #m_low_lvl_sock (with mutex); the other for
597 * receiving over #m_low_lvl_sock (with same mutex). Now, the only "delay" is moved to before pacing occurs:
598 * whatever minimal time cost exists of adding a queue from thread W to thread W2' or W2'' occurs just before
599 * the pacing logic, after which chances are the datagram will be placed on a pacing queue anyway and sent
600 * off somewhat later; intuitively this is better than the delay occurring between pacing logic and the
601 * actual UDP send. Note, also, that the timing-sensitive pacing logic now gets its own thread/core and should
602 * thus work better vs. today in situations when thread W is doing a lot of work. This is even more logical
603 * than APPROACH 4 in that sense; the pacing and sending are concern 1 and get their own thread (essentially;
604 * really they get either W2' or W2'' for each given task); the receiving is concern 2 and gets its own thread
605 * (same deal); and all the rest is concern 3 and remains in its own thread W (until we can think of ways to
606 * split that into concerns; but that is another to-do). Only one mutex with 2 very small critical sections,
607 * as in APPROACH 4, is used. The only subtlety regarding concurrent data access is in
608 * Node::mark_data_packet_sent(), which is called just before `m_low_lvl_sock.async_send()`, and which
609 * finalizes certain aspects of Peer_socket::Sent_packet::Sent_when; most notably
610 * Peer_socket::Sent_packet::Sent_when::m_sent_time (used in RTT calculation upon ACK receipt later).
611 * This is stored in Peer_socket::m_snd_flying_pkts_by_sent_when, which today is not protected by mutex due
612 * to only being accessed from thread W; and which is extremely frequently accessed. So either we protect
613 * the latter with a mutex (out of the question: it is too frequently accessed and would quite possibly
614 * reduce performance) or something else. Currently I think Node::mark_data_packet_sent() should just
615 * be placed onto #m_task_engine (thread W) via `post()` but perhaps take all or most of the items to
616 * update Sent_when with as arguments, so that they (especially `Sent_when::m_sent_time`) could be determined
617 * in thread W2' or W2'' but written thread-safely in W. (There is no way some other thread W task would mess
618 * with this area of Peer_socket::m_snd_flying_pkts_by_sent_when before the proposed mark_data_packet_sent()
619 * was able to run; thread W had just decided to send that packet over wire in the first place; so there's no
620 * reason to access it until ACK -- much later -- or some kind of socket-wide catastrophe.) All that put
621 * together I dub APPROACH 5. Thus, APPROACH 1 + APPROACH 5 seems like the best idea of all, distilling all
622 * the trade-offs into the the fastest yet close to simplest approach.
623 *
624 * @todo More uniform diagnostic logging: There is much diagnostic logging in the
625 * implementation (FLOW_ERROR*(), etc.), but some of it lacks useful info like `sock` or `serv` (i.e., the
626 * `ostream` representations of Peer_socket and Server_socket objects, which include the UDP/Flow endpoints
627 * involved in each socket). The messages that do include these have to do so explicitly. Provide some
628 * macros to automatically insert this info, then convert the code to use the macros in most places. Note that
629 * existing logging is rather exhaustive, so this is not the biggest of deals but would be nice for ease of coding
630 * (and detailed logging).
631 *
632 * @todo It may be desirable to use not boost.asio's out-of-the-box UDP receive routines but rather extensions
633 * capable of some advanced features, such as `recvmsg()` -- which can obtain kernel receipt time stamps and
634 * destination IP address via the `cmsg` feature. This would tie into various other to-dos listed around here.
635 * There is, as of this writing, also a to-do in the top-level `flow` namespace doc header about bringing some code
636 * into a new `io` namespace/Flow module; this includes the aforementioned `recvmsg()` wrapper.
637 *
638 * @todo It may be desirable to further use `recvmmsg()` for UDP input; this allows to read multiple UDP datagrams
639 * with one call for performance.
640 *
641 * @todo By the same token, wrapping `sendmsg()` and `sendmmsg()` may allow for futher perf and feature
642 * improvements -- in some ways potentially symmetrically to `recvmsg()` and `recvmmsg()` respectively.
643 * However, as of this writing, I (ygoldfel) see this more of an opportunistic "look into it" thing and not something
644 * of active value; whereas `recv[m]msg()` bring actual features we actively desire for practical reasons.
645 *
646 * @todo Send and Receive buffer max sizes: These are set to some constants right now. That's not
647 * optimal. There are two competing factors: throughput and RAM. If buffer is too small, throughput can
648 * suffer in practice, if the Receiver can't read the data fast enough (there are web pages that show this).
649 * Possibly with today's CPUs it's no longer true, but I don't know. If buffer is too large and with a lot of
650 * users, a LOT of RAM can be eaten up (however note that a server will likely be mostly sending, not
651 * receiving, therefore it may need smaller Receive buffers). Therefore, as in Linux 2.6.17+, the buffer
652 * sizes should be adaptively sized. It may be non-trivial to come up with a good heuristic, but we may be
653 * able to copy Linux. The basic idea would probably be to use some total RAM budget and divide it up among
654 * the # of sockets (itself a constant estimate, or adaptive based on the number of sockets at a given time?).
655 * Also, buffer size should be determined on the Receive side; the Send side should make its buffer to be of
656 * equal size. Until we implement some sensible buffer sizing, it might be a good idea (for demo purposes with
657 * few users) to keep the buffers quite large. However, flow control (receive window) is now implemented and
658 * should cope well with momentary Receive buffer exhaustion.
659 * Related facts found on the web: In Linux, since a long time ago, Send buffer size is determined by other
660 * side's Receive buffer size (probably sent over in the SYN or SYN-ACK as the receive window). Also, in
661 * older Linuxes, Receive buffer defaults to 128k but can be manually set. Supposedly the default can lead to
662 * low throughput in high-speed (gigabit+) situations. Thus Linux 2.6.17+ apparently made the Receive buffer
663 * size adaptive.
664 *
665 * @todo Drop Acknowledgments: DCCP, a somewhat similar UDP-based protocol, uses the concept of
666 * Data-Dropped acknowledgments. If a packet gets to the receiver, but the receiver is forced to drop it (for
667 * example, no Receive buffer space; not sure if there are other reasons in Flow protocol), then the sender will only
668 * find out about this by inferring the drop via Drop Timeout or getting acknowledgments for later data. That
669 * may take a while, and since receiver-side drops can be quite large, it would be more constructive for the
670 * receive to send an un-ACK of sorts: a Data-Dropped packet informing the sender that specific data were
671 * dropped. The sender can then update his congestion state (and retransmit if we enable that). See RFC 4340
672 * and 4341.
673 *
674 * @todo Add extra-thread-safe convention for setting options: We can provide a thread-safe (against other user
675 * threads doing the same thing) macro to set a given option. set_options() has a documented, albeit in
676 * practice not usually truly problematic, thread safety flaw if one calls options(), modifies the result,
677 * then calls set_options(). Since another thread may modify the Node's options between the two calls, the
678 * latter call may unintentionally revert an option's value. Macro would take an option "name" (identifier
679 * for the Node_options member), a Node, and a target value for the option and would work together with a
680 * helper method template to obtain the necessary lock, make the assignment to the internal option, and give
681 * up the lock. The implementation would probably require Node to expose its internal stored Node_options
682 * for the benefit of this macro only. Anyway, this feature is not super-important, as long as the user is
683 * aware that modifying options from multiple threads simultaneously may result in incorrect settings being
684 * applied.
685 *
686 * @todo The preceding to-do regarding Node_options applies to Peer_socket_options stored in Peer_socket in
687 * in an analogous way.
688 *
689 * @todo Consider removing Flow ports and even Server_socket:
690 * As explained above, we add the concept of a large set of available Flow ports within each
691 * Node, and each Node itself has a UDP port all to itself. So, for example, I could bind a Node to UDP
692 * port 1010, and within that listen() on Flow ports 1010 (unrelated to UDP port 1010!) and 1011. In
693 * retrospect, though, is that complexity necessary? We could save quite a few lines of code, particularly in
694 * the implementation (class Port_space, for example) and the protocol (extra bytes for Flow source and target
695 * ports, for example). (They're fun and pretty lines, but the absence of lines is arguably even prettier
696 * albeit less fun. On the other hand, bugs aren't fun, and more code implies a higher probability of bugs,
697 * maintenance errors, etc.) The interface would also be a bit simpler; and not just due to fewer items in
698 * Remote_endpoint (which would in fact reduce to util::Udp_endpoint and cease to exist). Consider Server_socket;
699 * currently listen() takes a #flow_port_t argument and returns a Server_socket which is listening; calling
700 * accept() (etc.) on the latter yields Peer_socket, as the other side connects. Without Flow ports, there is
701 * no argument to listen(); in fact, Server_socket itself is not strictly necessary and could be folded into
702 * Node, with listen() becoming essentially something that turns on the "are we listening?" Boolean state,
703 * while stop_listening() would turn it off (instead of something like `Server_socket::close()`). (Important
704 * note: That was not an endorsement of removing Server_socket. Arguably it is still a nice abstraction.
705 * Removing it would certainly remove some boiler-plate machinery to do with Server_socket's life cycle, on
706 * the other hand. Perhaps it's best to take a two-step appraoch; remove Flow ports first; then after a long
707 * time, assuming it becomes clear that nothing like them is going to come back, remove Server_socket as
708 * well.) A key question is, of course what would we lose? At first glance, Flow port allows multiple
709 * connections on a single UDP-port-taking Flow server, including multiple connections from one client (e.g.,
710 * with differing connection parameters such as reliability levels among the different connections, or
711 * "channels")... but actually that'd still work without Flow ports, assuming the "one client's" multiple
712 * connections can bind to different (presumably ephemeral) UDP ports; since the tuple (source host, source
713 * UDP port) is still enough to distinguish from the 2+ "channels" of the same "client" connecting to the one
714 * Flow Node (vs. today's tuple: source host, source UDP port, source Flow port, destination Flow port; see
715 * `struct` Socket_id). However, without Flow ports, it is not possible for one Node to connect to another
716 * Node twice, as each Node by definition is on one port. Is this important? Maybe, maybe not; for NAT
717 * purposes it can be important to use only 1 port; but that typically applies only to the server, while the
718 * client can send packets from anywhere. However, gaming applications can be very demanding and for the most
719 * restrictive NAT types might desire only a single port used on both sides. So whether to remove Flow ports
720 * is somewhat questionable, now that they exist; but arguably they didn't need to be added in the first
721 * place, until they were truly needed. I'd probably leave them alone, since they do exist.
722 *
723 * @todo Multi-core/multi-threading: The existing implementation already has a nice multi-threaded property,
724 * namely that each Node (object that binds to a single UDP endpoint/port) is entirely independent of any other
725 * such object -- they have entirely separate data, and each one does all its work on a separate thread.
726 * So to make use of multiple cores/processors, one can set up multiple Node objects. (Obviously this only makes
727 * sense for apps where using multiple ports is acceptable or even desired. E.g., a server could listen
728 * on N different UDP ports, where N=# of cores.) However, it would be nice for a single Node to be as
729 * multi-core/processor-friendly as possible. This is partially addressed by the "Dedicated thread to receive
730 * UDP datagrams ASAP" to-do item elsewhere. We could go further. boost.asio lets one easily go from
731 * 1 thread to multiple threads by simply starting more worker threads like W (W1, W2, ...) and executing
732 * `m_task_engine::run()` in each one -- note that #m_task_engine is shared (sans lock). Then subsequent
733 * handlers (timer-fired handlers, ack-received handlers, data-received handlers, and many more) would be
734 * assigned evenly to available threads currently executing run(). However, then all data these handlers
735 * access would need to be protected by a mutex or mutexes, which would be a significant increase in
736 * complexity and maintenance headaches compared to existing code, which features mutexes for data accessed
737 * both by W and the user thread(s) U -- which excludes most Node, Server_socket, Peer_socket state --
738 * essentially the user-visible "state" enums, and the Receive and Send buffers; but hugely complex things
739 * like the scoreboards, etc. etc., needed no mutex protection, but with this change they would need it.
740 * Actually, if the implementation essentially uses one mutex M, and every handler locks it for the entirety
741 * of its execution, then one isn't really making use of multi-cores/etc. anyway. One could make use of
742 * boost.asio "strands" to avoid the need for the mutex -- just wrap every handler in a shared strand S,
743 * and no locking is needed; boost.asio will never execute two handlers simultaneously in different threads.
744 * While this would arguably make the code simpler, but in terms of performance it wouldn't make any
745 * difference anyway, as it is functionally equivalent to the lock-M-around-every-operation solution (in
746 * fact, internally, it might even amount to exactly that anyway). So that's probably not worth it.
747 * We need to have more mutexes or strands, based on some other criterion/criteria. After a datagram is demuxed,
748 * vast majority of work is done on a particular socket independently of all others. Therefore we could
749 * add a mutex (or use an equivalent) into the socket object and then lock on that mutex. Multiple
750 * threads could then concurrently handle multiple sockets. However, for this to be used, one would have to
751 * use a single Node (UDP endpoint) with multiple sockets at the same time. Without any changes at all, one can
752 * get the same concurrency by instead setting up multiple Node objects. Other than a bit of lost syntactic sugar
753 * (arguably) -- multiple Node objects needed, each one having to initialize with its own set of options,
754 * for example -- this is particularly cost-free on the client side, as each Node can just use its own ephemeral
755 * UDP port. On the server side the network architecture has to allow for multiple non-ephemeral ports, and
756 * the client must know to (perhaps randomly) connect to one of N UDP ports/endpoints on the server, which is
757 * more restrictive than on the client. So perhaps there are some reasons to add the per-socket concurrency -- but
758 * I would not put a high priority on it. IMPORTANT UPDATE: Long after the preceding text was written, flow::async
759 * Flow module was created containing flow::async::Concurrent_task_loop interface. That looks at the general
760 * problem of multi-tasking thread pools and what's the user-friendliest-yet-most-powerful way of doing it.
761 * While the preceding discussion in this to-do has been left unchanged, one must first familiarize self with
762 * flow::async; and *then* read the above, both because some of those older ideas might need reevaluation; and because
763 * some of those ideas may have been implemented by flow::async and are now available easily.
764 *
765 * @todo In Node::low_lvl_packet_sent(), the UDP `async_send()` handler, there is an inline to-do about specially
766 * treating the corner case of the `would_block` and `try_again` boost.asio errors being reported (a/k/a POSIX
767 * `EAGAIN`/`EWOULDBLOCK`). Please see that inline comment for details.
768 *
769 * @todo Class Node `private` section is very large. It's so large that the implementations of the methods therein
770 * are split up among different files such as `flow_peer_socket.cpp`, `flow_event_set.cpp`, `flow_low_lvl_io.cpp`, etc.
771 * Consider the following scheme to both better express this separation as well as enforce which of a given method
772 * group's method(s) are meant to be called by outside code vs. being helpers thereof: Introduce `static`-method-only
773 * inner classes (and, conceivably, even classes within those classes) to enforce this grouping (`public` methods
774 * and `private` methods enforcing what is a "public" helper vs. a helper's helper).
775 *
776 * @todo We are now on Boost 1.75; the use of asio's `null_buffers` semantics is deprecated and should be changed to
777 * the replacement mechanism suggested in Boost docs -- the `async_wait()` method.
778 *
779 * @todo Make use of flow::async::Concurrent_task_loop or flow::async::Single_thread_task_loop, instead of manually
780 * setting up a thread and util::Task_engine, for #m_worker. I, Yuri, wrote the constructor, worker_run(), destructor,
781 * and related setup/teardown code as my very first boost.asio activity ever. It's solid, but flow::async just makes
782 * it easier and more elegant to read/maintain; plus this would increase Flow-wide consistency. It would almost
783 * certainly reduce the number of methods and, even nicely, state (such as #m_event_loop_ready).
784 *
785 * Misc. topic: Doxygen markup within a Doxygen command
786 * ----------------------------------------------------
787 * This section may seem random: Indeed, it is the meat of a similarly named ("Doxygen markup," etc.) subsection of
788 * the doc header on the very-general namespace ::flow. As noted in that subsection, this Node class is a more
789 * convenient place to explain this, because it is a large class with many examples available. Without further ado:
790 *
791 * ::flow doc header discusses which items should be accompanied by Doxygen comment snippets. More specifically, each
792 * item is accompanied by a Doxygen "command". `"@param param_name Explain parameter here."`, for example, documents
793 * the parameter `param_name` with the text following that. (Some commands are implicit; namely without
794 * an explicit `"@brief"`, the first sentence is the brief description of the class/function/whatever.)
795 *
796 * However, all that doesn't talk about formatting the *insides* of paragraphs in these commands. Essentially
797 * we are saying just use English. However, Doxygen uses certain markup language conventions when interpreting
798 * those paragraphs. For example `backticks` will turn the thing inside the ticks into an inline code snippet,
799 * in fixed-width font. There are a few things to watch out for with this:
800 *
801 * - Don't accidentally enable markup when you don't mean to. E.g., an * asterisk as the first character
802 * in a paragraph will cause a bullet point to appear. Also, sometimes you do want to
803 * use some character for semantical reason X which Doxygen shares with you, but automatic markup
804 * handling might make it come out a little wrong. Just learn these through practice
805 * and check over the generated web page(s) before checking in the code.
806 * - DO use markup within reason for certain COMMON items. Do not overdo it: mostly it should be things
807 * you're keen to do even if there were NO Doxygen or Javadoc involved. Bullet point lists are an example.
808 * Basically: If you were going to do something anyway, why not have it come out nicely in the doc page(s)?
809 * - Make use of auto-linking (a/k/a automatic liwnk generation) liberally. This is when Doxygen sees a certain
810 * pattern within a Doxygen comment and understands it a reference to some other object, like a class or
811 * method; so in the output this will come out as a link to that thing. The nice thing is that, usually,
812 * within raw code it looks fine/normal; AND the generated page has the convenient linking functionality.
813 * However, if enabling linking in a certain case is too tedious, feel free to not.
814 *
815 * That said, I will now list all of the pieces of markup that are allowed within comments inside Flow code.
816 * Try not to add to this list without a very good reason. Simplicity is a virtue.
817 *
818 * - Bullet points: Just a dash after one indent level: `" - Item text."`. Nesting allowed.
819 * - Numbered points: just type out the numbers explicitly instead of auto-numbering: `" 2. Item text."`; not
820 * `" -# Item text."`. Yes, it leaves the numbering to you, but more importantly the raw comment remains
821 * readable, and you can refer to the numbers (e.g., "according to the condition in item 3" makes more sense
822 * when you can see a `3.` nearby).
823 * - Emphasis: Just one asterisk before and after the emphasized stuff: *word*, *multiple words*. No "_underscores_"
824 * please. In general try to avoid too much emphasis, as asterisks are common characters and can confuse
825 * Doxygen. Plus, you shouldn't need to emphasize stuff THAT much. Plus, feel free to use CAPITALS to emphasize
826 * instead.
827 * - Inline code snippets: Backticks. `single_word`, `an_expression != other_expression * 2`. Definitely use
828 * this liberally: it helps readability of BOTH raw code and looks delightful in the generated web page(s).
829 * However, note explanation below regarding how this relates to auto-linking.
830 * - Syntax-highlighted code spans: Use three tildes `"~~~"` to begin and end a code snippet. This MUST be
831 * used for multi-line code snippets; and CAN be used instead of `backticks` for single-line code snippets.
832 * The output will be a separate paragraph, just like the raw code should be. More precisely, the tildes
833 * and code should follow a single absolute indentation level:
834 *
835 * ~~~
836 * if (some_condition) // Generated output will also be syntax-highlighted.
837 * {
838 * obj.call_it(arg1, "quote");
839 * return false;
840 * }
841 * ~~~
842 *
843 * - Large heading in a long doc header: Use the format seen in this comment itself: Words, underlined by a
844 * row of dashes ("----", etc.) on the next line. This results in a fairly large-fonted title.
845 * - Small heading is a long doc header: IF AND ONLY IF you need a sub-heading under a large heading
846 * (which would probably be in only quite long doc headers indeed), use the ### format. Again use the
847 * format seen in this very doc header. This results in a slightly large-fonted title (pretty close to
848 * normal).
849 * - Avoid any other levels of heading. At that point things have become too complex.
850 * - Escape from Doxygen formatting: To ensure Doxygen interprets a bunch of characters literally, when you
851 * know there is danger of it applying unwanted formatting, surround it in quotes. The quotes will
852 * be included in the output just like in the raw code; but anything inside quotes will be output verbatim
853 * even if full of Doxygen markup or commands. For example, if I don't want a to-do item to begin in
854 * the middle of this paragraph, but I do want to refer to how a to-do is declared is Doxygen comments,
855 * I will surround it in quotes: To declare a to-do, use the `"@todo"` command. Note that in that example
856 * I put `backticks` around the text to format the whole thing a certain way; any formatting already in
857 * effect will continue through the "quoted" text; but no formatting inside the "quotes" will go in effect.
858 * Plus, it looks appropriate in raw code. Best of all worlds.
859 *
860 * The most tricky yet powerful technique to learn here is the interplay between auto-linking and `inline code
861 * snippets`. Before discussing that in some detail, note the auto-linking which is allowed in this source
862 * code:
863 *
864 * - Class/`struct`/union names are auto-linked: for example, just Peer_socket. That's because every
865 * class/`struct`/union for us starts with a Capital letter. Easy!
866 * - A method local to the class/`struct` being documented, like running() in this class Node which we are
867 * documenting right now, is auto-linked as written.
868 * - ANY member explicitly specified as belonging to a class/`struct`/union or namespace is
869 * auto-linked as written. It can be a member function, variable, alias, nested class, or *anything* else.
870 * The presence of `"::"` will auto-link whatever it is. Note this is a very powerful auto-linking technique;
871 * the vast majority of things are members of something, even if's merely a namespace, so if you absolutely must
872 * auto-link something, there is always at least one straightforward way: a *fully or partially qualified name*.
873 * It will be simple/readable as raw source and equally simple/readable AND linked in the doc output. The only
874 * *possible* (not really probable, but it certainly happens) down-side is it can be too verbose.
875 * - Example: alias-type: Drop_timer::timer_wait_id_t; method: Port_space::return_port().
876 * - Macros are not members of anything and thus cannot be auto-linked by qualifying them. However, read below
877 * on how to auto-link them too.
878 * - A free (non-member) function or functional macro will generally auto-link, even if it's in some other namespace.
879 * - This can result in name collisions, if some function `f()` is in two namespaces meaning two entirely
880 * different things. And not just functions but anything else, like classes, can thus collide.
881 * That is, after all, why namespaces exist! Just be careful and qualify things with namespace paths
882 * when needed (or even just for clarity).
883 * - For non-functions/methods: Things like variables/constants, type aliases, `enum` values will not auto-link
884 * if seen "naked." For example S_PORT_ANY is, to Doxygen, just a word. We use either `"#"` or `"::"` to force
885 * auto-linking. Here is how to decide which one to use:
886 * - `"#id"`: To refer to a *member* of anything (compound type, namespace) member named `id`,
887 * such that the *currently documented item* is also a member of that [anything] -- either at the same depth
888 * (e.g., in the same class) or deeper (e.g., `id` is in namespace `flow`, while we are in namespace
889 * `flow::net_flow`, or in class `flow::Some_class`, or both -- `flow::net_flow::Some_class`).
890 * Example: #Udp_socket (member alias), #S_NUM_PORTS (member constant), #m_low_lvl_sock (member variable).
891 * - `"::id"`: To refer to an item in the global namespace. Almost always, this will be an (outer) namespace.
892 * Global-namespace members that are not themselves namespaces are strongly discouraged elsewhere.
893 * Example: ::flow (namespace).
894 * - A functional macro is formatted the same as a free function in global namespace: e.g., FLOW_LOG_WARNING().
895 * - If a non-functional macro needs to be documented (VERY rare or non-existent given our coding style), use
896 * this special format: `"#MACRO_NAME"`. `"::"` is inappropriate, since a macro does not belong to a namespace
897 * (global or otherwise), and that would look confusing in raw code.
898 *
899 * Now finally here are the guidelines about what to use: `backticks`, an auto-linked symbol, or both.
900 * Suppose there is some comment *snippet* X that you are considering how to format.
901 *
902 * - If X is just a piece of English language and not referring to or quoting code per se, then do not format
903 * it. Type it verbatim: "The socket's state is ESTABLISHED here." Even though ESTABLISHED may be thought
904 * of as code, here it's referring more to a concept (the state "ESTABLISHED") rather than code snippet.
905 * `S_ESTABLISHED` is a different story, on the other hand, and that one you must either backtick (as I just
906 * did there) or auto-link; read on for guidelines on that.
907 * - If the *entirety* of X is an identifier:
908 * - Auto-link it, WITHOUT backticks, if to auto-link it you would just write X verbatim anyway.
909 * For example, mentioning Peer_socket just like that will auto-link it. So, that's great. Do NOT
910 * add backticks, as that increases code verbosity and adds very little (making the auto-linked `Peer_socket`
911 * also use a fixed-width font; meh).
912 * - Auto-link it, WITHOUT backticks, if you would like the convenience of it being auto-linked in the output.
913 * - Do NOT auto-link it, but DO add `backticks`, if you do not need the convenience of the auto-linked output.
914 * Backticks are easy: auto-linking can be a little tricky/verbose. So in that case just `backtick` it
915 * for readable raw source AND pretty output; without worrying about subtleties of proper auto-linking.
916 * - If X consists of some identifiers but also contains non-identifiers:
917 * - The non-code parts should be verbatim.
918 * - ALL code parts should be in `backticks`.
919 * - IF you want the convenience of some parts of the output being auto-linked, auto-link those parts.
920 * - IF you'd prefer shorter and clearer raw code, then don't auto-link where doing so would require extra
921 * raw code characters.
922 * - Example: Suppose X is: "The allowed range is [S_FIRST_SERVICE_PORT + 1, S_FIRST_EPHEMERAL_PORT + 2)."
923 * Variant 1 will auto-link but a bit longer and less readable as raw code. Variant 2 will forego auto-linking
924 * but is short and readable as raw code.
925 * - *Variant 1*: The allowed range is [`#S_FIRST_SERVICE_PORT + 1`, `#S_FIRST_EPHEMERAL_PORT + 2`).
926 * - *Variant 2*: The allowed range is [`S_FIRST_SERVICE_PORT + 1`, `S_FIRST_EPHEMERAL_PORT + 2`).
927 * - Example: Suppose X is: "The condition holds if sock->m_local_port != 2223." Variant 1 is brief and readable.
928 * Variant 2 is readable enough but much longer. However, it will very conveniently auto-link to
929 * that obscure data member for the web page reader's pleasure, the convenience of which shouldn't be dismissed.
930 * - *Variant 1*: The condition holds if `sock->m_local_port != 2223`.
931 * - *Variant 2*: The condition holds if `Peer_socket::m_local_port != 2223` (for `sock`).
932 */
933class Node :
935 public log::Log_context,
936 private boost::noncopyable
937{
938public:
939 // Constants.
940
941 /// Total number of Flow ports in the port space, including #S_PORT_ANY.
942 static const size_t& S_NUM_PORTS;
943
944 /// Total number of Flow "service" ports (ones that can be reserved by number with Node::listen()).
945 static const size_t& S_NUM_SERVICE_PORTS;
946
947 /**
948 * Total number of Flow "ephemeral" ports (ones reserved locally at random with
949 * `Node::listen(S_PORT_ANY)` or Node::connect()).
950 */
951 static const size_t& S_NUM_EPHEMERAL_PORTS;
952
953 /**
954 * The port number of the lowest service port, making the range of service ports
955 * [#S_FIRST_SERVICE_PORT, #S_FIRST_SERVICE_PORT + #S_NUM_SERVICE_PORTS - 1].
956 */
958
959 /**
960 * The port number of the lowest ephemeral Flow port, making the range of ephemeral ports
961 * [#S_FIRST_EPHEMERAL_PORT, #S_FIRST_EPHEMERAL_PORT + #S_NUM_EPHEMERAL_PORTS - 1].
962 */
964
965 // Constructors/destructor.
966
967 /**
968 * Constructs Node.
969 * Post-condition: Node ready for arbitrary use. (Internally this includes asynchronously
970 * waiting for any incoming UDP packets on the given endpoint.)
971 *
972 * Does not block. After exiting this constructor, running() can be used to determine whether
973 * Node initialized or failed to do so; or one can get this from `*err_code`.
974 *
975 * ### Potential shared use of `Logger *logger` ###
976 * All logging, both in this thread (from which the constructor executes) and any potential internally
977 * spawned threads, by this Node and all objects created through it (directly
978 * or otherwise) will be through this Logger. `*logger` may have been used or not used
979 * for any purpose whatsoever prior to this constructor call. However, from now on,
980 * Node will assume that `*logger` will be in exclusive use by this Node and no other code until
981 * destruction. It is strongly recommended that all code refrains from further use of
982 * `*logger` until the destructor ~Node() exits. Otherwise, quality of this Node's logging (until destruction)
983 * may be lowered in undefined fashion except for the following formal guarantees: the output will not
984 * be corrupted from unsafe concurrent logging; and the current thread's nickname (for logging purposes only) will
985 * not be changed at any point. Less formally, interleaved or concurrent use of the same Logger might
986 * result in such things as formatters from Node log calls affecting output of your log calls or vice versa.
987 * Just don't, and it'll look good.
988 *
989 * @param low_lvl_endpoint
990 * The UDP endpoint (IP address and UDP port) which will be used for receiving incoming and
991 * sending outgoing Flow traffic in this Node.
992 * E.g.: `Udp_endpoint(Ip_address_v4::any(), 1234)` // UDP port 1234 on all IPv4 interfaces.
993 * @param logger
994 * The Logger implementation through which all logging from this Node will run.
995 * See notes on logger ownership above.
996 * @param net_env_sim
997 * Network environment simulator to use to simulate (fake) external network conditions
998 * inside the code, e.g., for testing. If 0, no such simulation will occur. Otherwise the
999 * code will add conditions such as loss and latency (in addition to any present naturally)
1000 * and will take ownership of the the passed in pointer (meaning, we will `delete` as we see fit;
1001 * and you must never do so from now on).
1002 * @param err_code
1003 * See flow::Error_code docs for error reporting semantics. error::Code generated:
1004 * error::Code::S_NODE_NOT_RUNNING (Node failed to initialize),
1005 * error::Code::S_OPTION_CHECK_FAILED.
1006 * @param opts
1007 * The low-level per-Node options to use. The default uses reasonable values that
1008 * normally need not be changed. No reference to opts is saved; it is only copied.
1009 * See also Node::set_options(), Node::options(), Node::listen(), Node::connect(),
1010 * Peer_socket::set_options(), Peer_socket::options().
1011 */
1012 explicit Node(log::Logger* logger, const util::Udp_endpoint& low_lvl_endpoint,
1013 Net_env_simulator* net_env_sim = 0, Error_code* err_code = 0,
1014 const Node_options& opts = Node_options());
1015
1016 /**
1017 * Destroys Node. Closes all Peer_socket objects as if by `sock->close_abruptly()`. Then closes all
1018 * Server_socket objects Then closes all Event_set objects as if by `event_set->close()`.
1019 * @todo Server_socket objects closed as if by what?
1020 *
1021 * Frees all resources except the objects still shared by `shared_ptr<>`s returned to the Node
1022 * user. All `shared_ptr<>` instances inside Node sharing the latter objects are, however,
1023 * eliminated. Therefore any such object will be deleted the moment the user also eliminates all
1024 * her `shared_ptr<>` instances sharing that same object; any object for which that is already the
1025 * case is deleted immediately.
1026 *
1027 * Does not block.
1028 *
1029 * Note: as a corollary of the fact this acts as if `{Peer|Server_}socket::close_abruptly()` and
1030 * Event_set::close(), in that order, were called, all event waits on the closed
1031 * sockets (`sync_send()`, `sync_receive()`, `sync_accept()`, Event_set::sync_wait(),
1032 * Event_set::async_wait()) will execute their on-event behavior (`sync_send()` return,
1033 * `sync_receive()` return, `sync_accept()` return, `sync_wait()` return and invoke handler, respectively).
1034 * Since Event_set::close() is executed soon after the sockets close, those Event_set objects are
1035 * cleared. Therefore, the user on-event behavior handling may find that, despite a given
1036 * event firing, the containing Event_set is empty; or they may win the race and see an Event_set
1037 * with a bunch of `S_CLOSED` sockets. Either way, no work is possible on these sockets.
1038 *
1039 * Rationale for previous paragraph: We want to wake up any threads or event loops waiting on
1040 * these sockets, so they don't sit around while the underlying Node is long since destroyed. On
1041 * the other hand, we want to free any resources we own (including socket handles inside
1042 * Event_set). This solution satisfies both desires. It does add a bit of non-determinism
1043 * (easily handled by the user: any socket in the Event_set, even if user wins the race, will be
1044 * `S_CLOSED` anyway). However it introduces no actual thread safety problems (corruption, etc.).
1045 *
1046 * @todo Provide another method to shut down everything gracefully?
1047 */
1048 ~Node() override;
1049
1050 // Methods.
1051
1052 /**
1053 * Returns `true` if and only if the Node is operating. If not, all attempts to use this object or
1054 * any objects generated by this object (Peer_socket::Ptr, etc.) will result in error.
1055 * @return Ditto.
1056 */
1057 bool running() const;
1058
1059 /**
1060 * Return the UDP endpoint (IP address and UDP port) which will be used for receiving incoming and
1061 * sending outgoing Flow traffic in this Node. This is similar to to the value passed to the
1062 * Node constructor, except that it represents the actual bound address and port (e.g., if you
1063 * chose 0 as the port, the value returned here will contain the actual emphemeral port randomly chosen by
1064 * the OS).
1065 *
1066 * If `!running()`, this equals Udp_endpoint(). The logical value of the returned util::Udp_endpoint
1067 * never changes over the lifetime of the Node.
1068 *
1069 * @return See above. Note that it is a reference.
1070 */
1072
1073 /**
1074 * Initiates an active connect to the specified remote Flow server. Returns a safe pointer to a
1075 * new Peer_socket. The socket's state will be some substate of `S_OPEN` at least initially. The
1076 * connection operation, involving network traffic, will be performed asynchronously.
1077 *
1078 * One can treat the resulting socket as already connected; its Writable and Readable status can
1079 * be determined; once Readable or Writable one can receive or send, respectively.
1080 *
1081 * Port selection: An available local Flow port will be chosen and will be available for
1082 * information purposes via sock->local_port(), where `sock` is the returned socket. The port will
1083 * be in the range [Node::S_FIRST_EPHEMERAL_PORT, Node::S_FIRST_EPHEMERAL_PORT +
1084 * Node::S_NUM_EPHEMERAL_PORTS - 1]. Note that there is no overlap between that range and the
1085 * range [Node::S_FIRST_SERVICE_PORT, Node::S_FIRST_SERVICE_PORT + Node::S_NUM_SERVICE_PORTS - 1].
1086 *
1087 * @param to
1088 * The remote Flow port to which to connect.
1089 * @param err_code
1090 * See flow::Error_code docs for error reporting semantics. error::Code generated:
1091 * error::Code::S_OUT_OF_PORTS, error::Code::S_INTERNAL_ERROR_PORT_COLLISION,
1092 * error::Code::S_OPTION_CHECK_FAILED.
1093 * @param opts
1094 * The low-level per-Peer_socket options to use in the new socket.
1095 * If null (typical), the per-socket options template in Node::options() is used.
1096 * If not null, the given per-socket options are first validated and, if valid, used.
1097 * If invalid, it is an error. See also Peer_socket::set_options(),
1098 * Peer_socket::options().
1099 * @return Shared pointer to Peer_socket, which is in the `S_OPEN` main state; or null pointer,
1100 * indicating an error.
1101 */
1102 Peer_socket::Ptr connect(const Remote_endpoint& to, Error_code* err_code = 0,
1103 const Peer_socket_options* opts = 0);
1104
1105 /**
1106 * Same as connect() but sends, as part of the connection handshake, the user-supplied metadata,
1107 * which the other side can access via Peer_socket::get_connect_metadata() after accepting the
1108 * connection.
1109 *
1110 * @note It is up to the user to serialize the metadata portably. One recommended convention is to
1111 * use `boost::endian::native_to_little()` (and similar) before connecting; and
1112 * on the other side use the reverse (`boost::endian::little_to_native()`) before using the value.
1113 * Packet dumps will show a flipped (little-endian) representation, while with most platforms the conversion
1114 * will be a no-op at compile time. Alternatively use `native_to_big()` and vice-versa.
1115 * @note Why provide this metadata facility? After all, they could just send the data upon connection via
1116 * send()/receive()/etc. Answers: Firstly, this is guaranteed to be delivered (assuming successful
1117 * connection), even if reliability (such as via retransmission) is disabled in socket options (opts
1118 * argument). For example, if a reliability mechanism (such as FEC) is built on top of the Flow layer,
1119 * parameters having to do with configuring that reliability mechanism can be bootstrapped reliably
1120 * using this mechanism. Secondly, it can be quite convenient (albeit not irreplaceably so) for
1121 * connection-authenticating techniques like security tokens known by both sides.
1122 * @param to
1123 * See connect().
1124 * @param serialized_metadata
1125 * Data copied and sent to the other side during the connection establishment. The other side can get
1126 * equal data using Peer_socket::get_connect_metadata(). The returned socket `sock` also stores it; it's
1127 * similarly accessible via sock->get_connect_metadata() on this side.
1128 * The metadata must fit into a single low-level packet; otherwise
1129 * error::Code::S_CONN_METADATA_TOO_LARGE error is returned.
1130 * @param err_code
1131 * See connect(). Added error: error::Code::S_CONN_METADATA_TOO_LARGE.
1132 * @param opts
1133 * See connect().
1134 * @return See connect().
1135 */
1137 const boost::asio::const_buffer& serialized_metadata,
1138 Error_code* err_code = 0,
1139 const Peer_socket_options* opts = 0);
1140
1141 /**
1142 * The blocking (synchronous) version of connect(). Acts just like connect() but instead of
1143 * returning a connecting socket immediately, waits until the initial handshake either succeeds or
1144 * fails, and then returns the socket or null, respectively. Additionally, you can specify a
1145 * timeout; if the connection is not successful by this time, the connection attempt is aborted
1146 * and null is returned.
1147 *
1148 * Note that there is always a built-in Flow protocol connect timeout that is mandatory
1149 * and will report an error if it expires; but it may be too long for your purposes, so you can
1150 * specify your own that may expire before it. The two timeouts should be thought of as fundamentally
1151 * independent (built-in one is in the lower level of Flow protocol; the one you provide is at the application
1152 * layer), so don't make assumptions about Flow's behavior and set a timeout if you know you need one -- even
1153 * if in practice it is longer than the Flow one (which as of this writing can be controlled via socket option).
1154 *
1155 * The following are the possible outcomes:
1156 * 1. Connection succeeds before the given timeout expires (or succeeds, if no timeout given).
1157 * Socket is at least Writable at time of return. The new socket is returned, no error is
1158 * returned via `*err_code`.
1159 * 2. Connection fails before the given timeout expires (or fails, if no timeout given). null
1160 * is returned, `*err_code` is set to reason for connection failure. (Note that a built-in
1161 * handshake timeout -- NOT the given user timeout, if any -- falls under this category.)
1162 * `*err_code == error::Code::S_WAIT_INTERRUPTED` means the wait was interrupted (similarly to POSIX's `EINTR`).
1163 * 3. A user timeout is given, and the connection does not succeed before it expires. null is
1164 * returned, and `*err_code` is set to error::Code::S_WAIT_USER_TIMEOUT.
1165 * (Rationale: consistent with Server_socket::sync_accept(),
1166 * Peer_socket::sync_receive(), Peer_socket::sync_send() behavior.)
1167 *
1168 * Tip: Typical types you might use for `max_wait`: `boost::chrono::milliseconds`,
1169 * `boost::chrono::seconds`, `boost::chrono::high_resolution_clock::duration`.
1170 *
1171 * @tparam Rep
1172 * See `boost::chrono::duration` documentation (and see above tip).
1173 * @tparam Period
1174 * See `boost::chrono::duration` documentation (and see above tip).
1175 * @param to
1176 * See connect().
1177 * @param max_wait
1178 * The maximum amount of time from now to wait before giving up on the wait and returning.
1179 * `"duration<Rep, Period>::max()"` will eliminate the time limit and cause indefinite wait
1180 * -- however, not really, as there is a built-in connection timeout that will expire.
1181 * @param err_code
1182 * See flow::Error_code docs for error reporting semantics. error::Code generated:
1183 * error::Code::S_WAIT_INTERRUPTED, error::Code::S_WAIT_USER_TIMEOUT, error::Code::S_NODE_NOT_RUNNING,
1184 * error::Code::S_CANNOT_CONNECT_TO_IP_ANY, error::Code::S_OUT_OF_PORTS,
1185 * error::Code::S_INTERNAL_ERROR_PORT_COLLISION,
1186 * error::Code::S_CONN_TIMEOUT, error::Code::S_CONN_REFUSED,
1187 * error::Code::S_CONN_RESET_BY_OTHER_SIDE, error::Code::S_NODE_SHUTTING_DOWN,
1188 * error::Code::S_OPTION_CHECK_FAILED.
1189 * @param opts
1190 * See connect().
1191 * @return See connect().
1192 */
1193 template<typename Rep, typename Period>
1194 Peer_socket::Ptr sync_connect(const Remote_endpoint& to, const boost::chrono::duration<Rep, Period>& max_wait,
1195 Error_code* err_code = 0,
1196 const Peer_socket_options* opts = 0);
1197
1198 /**
1199 * A combination of sync_connect() and connect_with_metadata() (blocking connect, with supplied
1200 * metadata).
1201 *
1202 * @param to
1203 * See sync_connect().
1204 * @param max_wait
1205 * See sync_connect().
1206 * @param serialized_metadata
1207 * See connect_with_metadata().
1208 * @param err_code
1209 * See sync_connect(). Added error: error::Code::S_CONN_METADATA_TOO_LARGE.
1210 * @param opts
1211 * See sync_connect().
1212 * @return See sync_connect().
1213 */
1214 template<typename Rep, typename Period>
1216 const boost::chrono::duration<Rep, Period>& max_wait,
1217 const boost::asio::const_buffer& serialized_metadata,
1218 Error_code* err_code = 0,
1219 const Peer_socket_options* opts = 0);
1220
1221 /**
1222 * Equivalent to `sync_connect(to, duration::max(), err_code, opt)s`; i.e., sync_connect() with no user
1223 * timeout.
1224 *
1225 * @param to
1226 * See other sync_connect().
1227 * @param err_code
1228 * See other sync_connect().
1229 * @param opts
1230 * See sync_connect().
1231 * @return See other sync_connect().
1232 */
1234 const Peer_socket_options* opts = 0);
1235
1236 /**
1237 * Equivalent to `sync_connect_with_metadata(to, duration::max(), serialized_metadata, err_code, opts)`; i.e.,
1238 * sync_connect_with_metadata() with no user timeout.
1239 *
1240 * @param to
1241 * See sync_connect().
1242 * @param serialized_metadata
1243 * See connect_with_metadata().
1244 * @param err_code
1245 * See sync_connect(). Added error: error::Code::S_CONN_METADATA_TOO_LARGE.
1246 * @param opts
1247 * See sync_connect().
1248 * @return See sync_connect().
1249 */
1251 const boost::asio::const_buffer& serialized_metadata,
1252 Error_code* err_code = 0,
1253 const Peer_socket_options* opts = 0);
1254
1255 /**
1256 * Sets up a server on the given local Flow port and returns Server_socket which can be used to
1257 * accept subsequent incoming connections to this server. Any subsequent incoming connections
1258 * will be established asynchronously and, once established, can be claimed (as Peer_socket
1259 * objects) via Server_server::accept() and friends.
1260 *
1261 * Port specification: You must select a port in the range [Node::S_FIRST_SERVICE_PORT,
1262 * Node::S_FIRST_SERVICE_PORT + Node::S_NUM_SERVICE_PORTS - 1] or the special value #S_PORT_ANY.
1263 * In the latter case an available port in the range [Node::S_FIRST_EPHEMERAL_PORT,
1264 * Node::S_FIRST_EPHEMERAL_PORT + Node::S_NUM_EPHEMERAL_PORTS - 1] will be chosen for you.
1265 * Otherwise we will use the port you explicitly specified.
1266 *
1267 * Note that using #S_PORT_ANY in this context typically makes sense only if you somehow
1268 * communicate `serv->local_port()` (where `serv` is the returned socket) to the other side through
1269 * some other means (for example if both client and server are running in the same program, you
1270 * could just pass it via variable or function call). Note that there is no overlap between the
1271 * two aforementioned port ranges.
1272 *
1273 * @param local_port
1274 * The local Flow port to which to bind.
1275 * @param err_code
1276 * See flow::Error_code docs for error reporting semantics. error::Code generated:
1277 * error::Code::S_NODE_NOT_RUNNING, error::Code::S_PORT_TAKEN,
1278 * error::Code::S_OUT_OF_PORTS, error::Code::S_INVALID_SERVICE_PORT_NUMBER,
1279 * error::Code::S_INTERNAL_ERROR_PORT_COLLISION.
1280 * @param child_sock_opts
1281 * If null, any Peer_sockets that `serv->accept()` may return (where `serv` is the returned
1282 * Server_socket) will be initialized with the options set equal to
1283 * `options().m_dyn_sock_opts`. If not null, they will be initialized with a copy of
1284 * `*child_sock_opts`. No reference to `*child_sock_opts` is saved.
1285 * @return Shared pointer to Server_socket, which is in the Server_socket::State::S_LISTENING state at least
1286 * initially; or null pointer, indicating an error.
1287 */
1288 Server_socket::Ptr listen(flow_port_t local_port, Error_code* err_code = 0,
1289 const Peer_socket_options* child_sock_opts = 0);
1290
1291 /**
1292 * Creates a new Event_set in Event_set::State::S_INACTIVE state with no sockets/events stored; returns this
1293 * Event_set.
1294 *
1295 * @param err_code
1296 * See flow::Error_code docs for error reporting semantics. error::Code generated:
1297 * error::Code::S_NODE_NOT_RUNNING.
1298 * @return Shared pointer to Event_set; or null pointer, indicating an error.
1299 */
1301
1302 /**
1303 * Interrupts any blocking operation, a/k/a wait, and informs the invoker of that operation that the
1304 * blocking operation's outcome was being interrupted. Conceptually, this causes a similar fate as a POSIX
1305 * blocking function exiting with -1/`EINTR`, for all such functions currently executing. This may be called
1306 * from any thread whatsoever and, particularly, from signal handlers as well.
1307 *
1308 * Before deciding to call this explicitly from signal handler(s), consider using the simpler
1309 * Node_options::m_st_capture_interrupt_signals_internally instead.
1310 *
1311 * The above is vague about how an interrupted "wait" exhibits itself. More specifically, then:
1312 * Any operation with name `sync_...()` will return with an error, that error being
1313 * #Error_code error::Code::S_WAIT_INTERRUPTED. Event_set::async_wait()-initiated wait will end, with the handler
1314 * function being called, passing the Boolean value `true` to that function. `true` indicates the wait was
1315 * interrupted rather than successfully finishing with 1 or more active events (`false` would've indicated th
1316 * latter, more typical situation).
1317 *
1318 * Note that various calsses have `sync_...()` operations, including Node (Node::sync_connect()),
1319 * Server_socket (Server_socket::sync_accept()), and Peer_socket (Peer_socket::sync_receive()).
1320 *
1321 * @param err_code
1322 * See flow::Error_code docs for error reporting semantics. error::Code generated:
1323 * error::Code::S_NODE_NOT_RUNNING.
1324 */
1325 void interrupt_all_waits(Error_code* err_code = 0);
1326
1327 /**
1328 * Dynamically replaces the current options set (options()) with the given options set.
1329 * Only those members of `opts` designated as dynamic (as opposed to static) may be different
1330 * between options() and `opts`. If this is violated, it is an error, and no options are changed.
1331 *
1332 * Typically one would acquire a copy of the existing options set via options(), modify the
1333 * desired dynamic data members of that copy, and then apply that copy back by calling
1334 * set_options(). Warning: this technique is only safe if other (user) threads do not call
1335 * set_options() simultaneously. There is a to-do to provide a thread-safe maneuver for when this is
1336 * a problem (see class Node doc header).
1337 *
1338 * @param opts
1339 * The new options to apply to this socket. It is copied; no reference is saved.
1340 * @param err_code
1341 * See flow::Error_code docs for error reporting semantics. error::Code generated:
1342 * error::Code::S_OPTION_CHECK_FAILED, error::Code::S_NODE_NOT_RUNNING.
1343 * @return `true` on success, `false` on error.
1344 */
1345 bool set_options(const Node_options& opts, Error_code* err_code = 0);
1346
1347 /**
1348 * Copies this Node's option set and returns that copy. If you intend to use set_options() to
1349 * modify a Node's options, we recommend you make the modifications on the copy returned by
1350 * options().
1351 *
1352 * @return Ditto.
1353 */
1354 Node_options options() const;
1355
1356 /**
1357 * The maximum number of bytes of user data per received or sent block on connections generated
1358 * from this Node, unless this value is overridden in the Peer_socket_options argument to
1359 * listen() or connect() (or friend). See Peer_socket_options::m_st_max_block_size.
1360 *
1361 * @return Ditto.
1362 */
1363 size_t max_block_size() const;
1364
1365protected:
1366
1367 // Methods.
1368
1369 // Basic setup/teardown/driver/general methods.
1370
1371 /**
1372 * Returns a raw pointer to newly created Peer_socket or sub-instance like asio::Peer_socket, depending on
1373 * the template parameter.
1374 *
1375 * @tparam Peer_socket_impl_type
1376 * Either net_flow::Peer_socket or net_flow::asio::Peer_socket, as of this writing.
1377 * @param opts
1378 * See, for example, `Peer_socket::connect(..., const Peer_socket_options&)`.
1379 * @return Pointer to new object of type Peer_socket or of a subclass.
1380 */
1381 template<typename Peer_socket_impl_type>
1383
1384 /**
1385 * Like sock_create_forward_plus_ctor_args() but for Server_sockets.
1386 *
1387 * @tparam Server_socket_impl_type
1388 * Either net_flow::Server_socket or net_flow::asio::Server_socket, as of this writing.
1389 * @param child_sock_opts
1390 * See, for example, `Peer_socket::accept(..., const Peer_socket_options* child_sock_opts)`
1391 * @return Pointer to new object of type Server_socket or of a subclass.
1392 */
1393 template<typename Server_socket_impl_type>
1395
1396 // Constants.
1397
1398 /**
1399 * Type and value to supply as user-supplied metadata in SYN, if user chooses to use
1400 * `[[a]sync_]connect()` instead of `[[a]sync_]connect_with_metadata()`. If you change this value, please
1401 * update Peer_socket::get_connect_metadata() doc header.
1402 */
1404
1405private:
1406 // Friends.
1407
1408 /**
1409 * Peer_socket must be able to forward `send()`, `receive()`, etc. to Node.
1410 * @see Peer_socket.
1411 */
1412 friend class Peer_socket;
1413 /**
1414 * Server_socket must be able to forward `accept()`, etc. to Node.
1415 * @see Server_socket.
1416 */
1417 friend class Server_socket;
1418 /**
1419 * Event_set must be able to forward `close()`, `event_set_async_wait()`, etc. to Node.
1420 * @see Event_set.
1421 */
1422 friend class Event_set;
1423
1424 // Types.
1425
1426 /// Short-hand for UDP socket.
1427 using Udp_socket = boost::asio::ip::udp::socket;
1428
1429 /// boost.asio timer wrapped in a ref-counted pointer.
1430 using Timer_ptr = boost::shared_ptr<util::Timer>;
1431
1432 /// Short-hand for a signal set.
1433 using Signal_set = boost::asio::signal_set;
1434
1435 /// Short-hand for high-performance, non-reentrant, exclusive mutex used to lock #m_opts.
1437
1438 /// Short-hand for lock that acquires exclusive access to an #Options_mutex.
1440
1441 struct Socket_id;
1442 // Friend of Node: For ability to reference private `struct` Node::Socket_id.
1443 friend size_t hash_value(const Socket_id& socket_id);
1444 friend bool operator==(const Socket_id& lhs, const Socket_id& rhs);
1445
1446 /**
1447 * A map from the connection ID (= remote-local socket pair) to the local Peer_socket that is
1448 * the local portion of the connection. Applies to peer-to-peer (not server) sockets.
1449 */
1450 using Socket_id_to_socket_map = boost::unordered_map<Socket_id, Peer_socket::Ptr>;
1451
1452 /// A map from the local Flow port to the local Server_socket listening on that port.
1453 using Port_to_server_map = boost::unordered_map<flow_port_t, Server_socket::Ptr>;
1454
1455 /// A set of Event_set objects.
1456 using Event_sets = boost::unordered_set<Event_set::Ptr>;
1457
1458 // Methods.
1459
1460 // Basic setup/teardown/driver/general methods.
1461
1462 /**
1463 * Worker thread W (main event loop) body. Does not exit unless told to do so by Node's
1464 * destruction (presumably from a non-W thread, as W is not exposed to Node user).
1465 *
1466 * @param low_lvl_endpoint
1467 * See that parameter on Node constructor. Intentionally passed by value, to
1468 * avoid race with user's Udp_endpoint object disappearing before worker_run() can
1469 * use it.
1470 */
1471 void worker_run(const util::Udp_endpoint low_lvl_endpoint);
1472
1473 /**
1474 * Helper to invoke for each thread in which this Node executes, whether or not it starts that thread,
1475 * that applies certain common settings to all subsequent logging from that thread.
1476 *
1477 * E.g., it might nickname the thread (w/r/t logging) and set a certain style of printing duration units (short
1478 * like "ms" or long like "milliseconds"): these probably won't change for the rest of the Node's logging.
1479 *
1480 * @param thread_type
1481 * Roughly 3-letter character sequence identifying the thread's purpose, to be included in the thread's logged
1482 * nickname in subsequent log message prefixes; or empty string to let the thread's nickname stay as-is.
1483 * @param logger
1484 * The Logger whose logging to configure(); or null to assume `this->get_logger()` (which is typical but may
1485 * not yet be available, say, during object construction).
1486 * @return Address of the Logger that was configured (either `logger` or `this->get_logger()`).
1487 */
1488 log::Logger* this_thread_init_logger_setup(const std::string& thread_type, log::Logger* logger = 0);
1489
1490 /**
1491 * Given a new set of Node_options intended to replace (or initialize) a Node's #m_opts, ensures
1492 * that these new option values are legal. In all cases, values are checked for individual and
1493 * mutual validity. Additionally, unless init is true, which means we're being called from
1494 * constructor, ensures that no `static` data member is different between #m_opts and opts. If any
1495 * validation fails, it is an error.
1496 *
1497 * Pre-condition: If `!init`, #m_opts_mutex is locked.
1498 *
1499 * @todo Is it necessary to return `opts` now that we've switched to C++11 or better?
1500 *
1501 * @param opts
1502 * New option values to validate.
1503 * @param init
1504 * True if called from constructor; false if called from set_options().
1505 * @param err_code
1506 * See flow::Error_code docs for error reporting semantics. error::Code generated:
1507 * error::Code::S_OPTION_CHECK_FAILED, error::Code::S_STATIC_OPTION_CHANGED.
1508 * @return `opts`. The only reason we return this is so that it can be called during the
1509 * construction's initializer section (go, C++03!).
1510 */
1511 const Node_options& validate_options(const Node_options& opts, bool init, Error_code* err_code) const;
1512
1513 /**
1514 * Helper that compares `new_val` to `old_val` and, if they are not equal, logs and returns an error;
1515 * used to ensure static options are not changed.
1516 *
1517 * @tparam Opt_type
1518 * Type of a Node_options, etc., data member.
1519 * @param new_val
1520 * Proposed new value for the option.
1521 * @param old_val
1522 * Current value of the option.
1523 * @param opt_id
1524 * The name of the option, suitable for logging; this is presumably obtained using the
1525 * macro `#` technique.
1526 * @param err_code
1527 * See Peer_socket::set_options().
1528 * @return `true` on success, `false` on validation error.
1529 */
1530 template<typename Opt_type>
1531 bool validate_static_option(const Opt_type& new_val, const Opt_type& old_val, const std::string& opt_id,
1532 Error_code* err_code) const;
1533
1534 /**
1535 * Helper that, if the given condition is false, logs and returns an error; used to check for
1536 * option value validity when setting options.
1537 *
1538 * @param check
1539 * `false` if and only if some validity check failed.
1540 * @param check_str
1541 * String describing which condition was checked; this is presumably obtained using the
1542 * macro # technique.
1543 * @param err_code
1544 * See Peer_socket::set_options().
1545 * @return `true` on success, `false` on validation error.
1546 */
1547 bool validate_option_check(bool check, const std::string& check_str, Error_code* err_code) const;
1548
1549 /**
1550 * Obtain a copy of the value of a given option in a thread-safe manner. Because #m_opts may be
1551 * modified at any time -- even if the desired option is static and not being modified, this is
1552 * still unsafe -- #m_opts must be locked, the desired value must be copied, and #m_opts must be
1553 * unlocked. This method does so.
1554 *
1555 * Do NOT read option values without opt().
1556 *
1557 * @tparam Opt_type
1558 * The type of the option data member.
1559 * @param opt_val_ref
1560 * A reference (important!) to the value you want; this may be either a data member of
1561 * `this->m_opts` or the entire `this->m_opts` itself.
1562 * @return A copy of the value at the given reference.
1563 */
1564 template<typename Opt_type>
1565 Opt_type opt(const Opt_type& opt_val_ref) const;
1566
1567 /**
1568 * Performs low-priority tasks that should be run on an infrequent, regular basis, such as stat
1569 * logging and schedules the next time this should happen. This is the timer handler for that
1570 * timer.
1571 *
1572 * @param reschedule
1573 * If `true`, after completing the tasks, the timer is scheduled to run again later;
1574 * otherwise it is not.
1575 */
1576 void perform_regular_infrequent_tasks(bool reschedule);
1577
1578 /* Methods dealing with low-level packet I/O. Implementations are in low_lvl_io.cpp. The
1579 * line between these methods and the ones further down (like handle_incoming())) is blurred, but basically the
1580 * latter methods deal with each incoming packet after it has been read off wire and gone through
1581 * the network simulator (if any is active). By contrast the methods just below
1582 * (low_lvl_io.cpp) deal with receiving and sending low-level packets (including packet
1583 * pacing) and network condition simulation (if active) -- basically leaving the core protocol
1584 * logic to the aforementioned core logic methods. */
1585
1586 // Input.
1587
1588 /**
1589 * Registers so that during the current or next `m_task_engine.run()`, the latter will wait for a receivable UDP
1590 * packet and, when one is available, will call low_lvl_recv_and_handle().
1591 *
1592 * Pre-condition: we are in thread W.
1593 */
1594 void async_low_lvl_recv();
1595
1596 /**
1597 * Handles the pre-condition that #m_low_lvl_sock has a UDP packet available for reading, or that there
1598 * was an error in waiting for this pre-condition. If no error (`!sys_err_code`) then the packet is read
1599 * (thus erased) from the OS UDP net-stack's packet queue. The packet is then properly handled (for
1600 * example it may result in more data decoded into an appropriate Peer_socket's stream buffer).
1601 *
1602 * @param sys_err_code
1603 * Error code of the operation.
1604 */
1605 void low_lvl_recv_and_handle(Error_code sys_err_code);
1606
1607 /**
1608 * Helper for low_lvl_recv_and_handle() that calls handle_incoming() on the not-yet-deserialized low-level
1609 * packet just read off the UDP socket, but first handles simulation of various network conditions
1610 * like latency, loss, and duplication. Pre-condition is that a UDP receive just successfully
1611 * got the data, or that a simulation thereof occurred.
1612 *
1613 * @param packet_data
1614 * See handle_incoming(). Note that, despite this method possibly acting asynchronously (e.g.,
1615 * if simulating latency), `*packet_data` ownership is retained by the immediate caller.
1616 * Caller must not assume anything about its contents upon return and is free to do anything else to it
1617 * (e.g., read another datagram into it).
1618 * @param low_lvl_remote_endpoint
1619 * See handle_incoming().
1620 * @param is_sim_duplicate_packet
1621 * `false` if `packet_data` contains data actually just read from UDP socket.
1622 * `true` if `packet_data` contains data placed there as a simulated duplicate packet.
1623 * The latter is used to prevent that simulated duplicated packet from itself getting
1624 * duplicated or dropped.
1625 * @return The number of times handle_incoming() was called *within* this call (before this call
1626 * returned); i.e., the number of packets (e.g., packet and/or its duplicate) handled
1627 * immediately as opposed to dropped or scheduled to be handled later.
1628 */
1629 unsigned int handle_incoming_with_simulation(util::Blob* packet_data,
1630 const util::Udp_endpoint& low_lvl_remote_endpoint,
1631 bool is_sim_duplicate_packet = false);
1632
1633 /**
1634 * Sets up `handle_incoming(packet_data, low_lvl_remote_endpoint)` to be called asynchronously after a
1635 * specified period of time. Used to simulate latency.
1636 *
1637 * @param latency
1638 * After how long to call handle_incoming().
1639 * @param packet_data
1640 * See handle_incoming_with_simulation().
1641 * @param low_lvl_remote_endpoint
1642 * See handle_incoming_with_simulation().
1643 */
1645 util::Blob* packet_data,
1646 const util::Udp_endpoint& low_lvl_remote_endpoint);
1647
1648 // Output.
1649
1650 /**
1651 * async_low_lvl_packet_send_impl() wrapper to call when `packet` is to be sent to the remote side of
1652 * the connection `sock`. In particular, this records certain per-socket stats accordingly.
1653 *
1654 * @param sock
1655 * Socket whose remote side to target when sending.
1656 * @param packet
1657 * See async_low_lvl_packet_send_impl().
1658 * @param delayed_by_pacing
1659 * See async_low_lvl_packet_send_impl().
1660 */
1662 bool delayed_by_pacing);
1663
1664 /**
1665 * async_low_lvl_packet_send_impl() wrapper to call when `packet` is to be sent to the remote side of
1666 * the connection `sock`. In particular, this records certain per-socket stats accordingly.
1667 *
1668 * @param low_lvl_remote_endpoint
1669 * UDP endpoint for the Node to which to send the packet.
1670 * @param packet
1671 * See async_low_lvl_packet_send_impl().
1672 */
1673 void async_no_sock_low_lvl_packet_send(const util::Udp_endpoint& low_lvl_remote_endpoint,
1675
1676 /**
1677 * Takes given low-level packet structure, serializes it, and initiates
1678 * asynchronous send of these data to the remote Node specified by the given UDP endpoint.
1679 * The local and target ports are assumed to be already filled out in `*packet`.
1680 * Once the send is possible (i.e., UDP net-stack is able to buffer it for sending; or there is an
1681 * error), low_lvl_packet_sent() is called (asynchronously).
1682 *
1683 * Takes ownership of `packet`; do not reference it in any way after this method returns.
1684 *
1685 * @note This method exiting in no way indicates the send succeeded (indeed,
1686 * the send cannot possibly initiate until this method exits).
1687 *
1688 * @param low_lvl_remote_endpoint
1689 * UDP endpoint for the Node to which to send the packet.
1690 * @param packet
1691 * Pointer to packet structure with everything filled out as desired.
1692 * @param delayed_by_pacing
1693 * `true` if there was a (pacing-related) delay between when higher-level code decided to send this packet
1694 * and the execution of this method; `false` if there was not, meaning said higher-level code executed us
1695 * immediately (synchronously), though not necessarily via a direct call (in fact that's unlikely; hence
1696 * `_impl` in the name).
1697 * @param sock
1698 * Peer_socket associated with this connection; null pointer if none is so associated.
1699 * If not null, behavior undefined unless `low_lvl_remote_endpoint == sock->remote_endpoint().m_udp_endpoint`.
1700 */
1701 void async_low_lvl_packet_send_impl(const util::Udp_endpoint& low_lvl_remote_endpoint,
1702 Low_lvl_packet::Const_ptr packet, bool delayed_by_pacing, Peer_socket::Ptr sock);
1703
1704 /**
1705 * Completion handler for async_low_lvl_packet_send_impl(); called when the packet is either
1706 * successfully fed to the UDP net-stack for sending, or when there is an error in doing so.
1707 *
1708 * @warning It is important to pass `packet` to this, because the serialization operation produces
1709 * a bunch of pointers into `*packet`; if one does not pass it here through the
1710 * boost.asio send call, `*packet` might get deleted, and then send op will try to access
1711 * pointer(s) to invalid memory.
1712 * @param packet
1713 * Ref-counted pointer to the packet that was hopefully sent.
1714 * Will be destroyed at the end of low_lvl_packet_sent() unless a copy of this pointer is
1715 * saved elsewhere before that point. (Usually you should indeed let it be destroyed.)
1716 * @param sock
1717 * See async_low_lvl_packet_send_impl(). Note the null pointer is allowed.
1718 * @param bytes_expected_transferred
1719 * Size of the serialization of `*packet`, that being the total # of bytes we want sent
1720 * over UDP.
1721 * @param sys_err_code
1722 * Result of UDP send operation.
1723 * @param bytes_transferred
1724 * Number of bytes transferred assuming `!err_code`.
1725 * Presumably that would equal `bytes_expected_transferred`, but we will see.
1726 */
1727 void low_lvl_packet_sent(Peer_socket::Ptr sock, Low_lvl_packet::Const_ptr packet, size_t bytes_expected_transferred,
1728 const Error_code& sys_err_code, size_t bytes_transferred);
1729
1730 /**
1731 * Performs important book-keeping based on the event "DATA packet was sent to destination."
1732 * The affected data structures are: Sent_packet::m_sent_when (for the Sent_packet in question),
1733 * Peer_socket::m_snd_last_data_sent_when, Drop_timer Peer_socket::m_snd_drop_timer (in `*sock`).
1734 * sock->m_snd_drop_timer. More information is in the doc headers for
1735 * those data members.
1736 *
1737 * @param sock
1738 * Socket for which the given DATA packet is sent.
1739 * @param seq_num
1740 * The first sequence number for the sent DATA packet.
1741 * Sent_packet::m_sent_when for its Sent_packet should contain the time at which send_worker() removed
1742 * the data from Send buffer and packetized it; it's used to log the difference between
1743 * that time and now.
1744 */
1745 void mark_data_packet_sent(Peer_socket::Ptr sock, const Sequence_number& seq_num);
1746
1747 /**
1748 * Sends an RST to the given UDP endpoint in response to the given incoming low-level packet that
1749 * came from that endpoint, when there is no associated Peer_socket for that remote endpoint/local port combo.
1750 * An error is unlikely, but if it happens there is no reporting other than logging.
1751 *
1752 * You should use this to reply with an RST in situations where no Peer_socket is applicable; for
1753 * example if anything but a SYN or RST is sent to a server port. In situations where a
1754 * Peer_socket is applicable (which is most of the time an RST is needed), use
1755 * async_sock_low_lvl_rst_send().
1756 *
1757 * @param causing_packet
1758 * Packet we're responding to (used at least to set the source and destination Flow ports
1759 * of the sent packet).
1760 * @param low_lvl_remote_endpoint
1761 * Where `causing_packet` came from (the Node low-level endpoint).
1762 */
1764 const util::Udp_endpoint& low_lvl_remote_endpoint);
1765
1766 /**
1767 * Begins the process of asynchronously sending the given low-level packet to the remote Node
1768 * specified by the given Peer_socket. The method, if this feature is applicable and enabled,
1769 * applies packet pacing (which attempts to avoid burstiness by spreading out packets without
1770 * changing overall sending rate). Therefore the given packet may be sent as soon as a UDP send
1771 * is possible according to OS (which is typically immediate), or later, if pacing delays it. Once it is
1772 * time to send it, async_sock_low_lvl_packet_send() is used.
1773 *
1774 * Takes ownership of packet; do not reference it in any way after this method returns.
1775 *
1776 * Note that an error may occur in asynchronous operations triggered by this method; if this
1777 * happens the socket will be closed via close_connection_immediately(). However if the error
1778 * happens IN this method (`false` is returned), it is up to the caller to handle the error as
1779 * desired.
1780 *
1781 * @param sock
1782 * Socket whose `remote_endpoint()` specifies to what Node and what Flow port within that
1783 * Node this socket will go.
1784 * @param packet
1785 * Pointer to packet structure with everything except the source, destination, and
1786 * retransmission mode fields (essentially, the public members of Low_lvl_packet proper but
1787 * not its derived types) filled out as desired.
1788 * @param err_code
1789 * After return, `*err_code` is success or:
1790 * error::Code::S_INTERNAL_ERROR_SYSTEM_ERROR_ASIO_TIMER.
1791 * @return `true` on success so far; `false` on failure (and thus no send initiation).
1792 * Note that `true` in no way indicates the send succeeded (indeed, the send cannot possibly
1793 * *initiate* until this method exits).
1794 */
1796 Low_lvl_packet::Ptr&& packet,
1797 Error_code* err_code);
1798
1799 /**
1800 * async_sock_low_lvl_packet_send_paced() pacing helper: Handles a DATA or ACK packet that was just
1801 * passed into async_sock_low_lvl_packet_send_paced(), i.e., is available for sending. That is, either
1802 * sends the packet via async_sock_low_lvl_packet_send() immediately or queues it for sending later.
1803 *
1804 * Pre-conditions: pacing is enabled for the socket in options; an SRTT value has been computed
1805 * (is not undefined); packet is DATA or ACK; packet is fully filled out; `sock` is in OPEN state;
1806 * invariants described for `struct` Send_pacing_data hold.
1807 *
1808 * Note that an error may occur in asynchronous operations triggered by this method; if this
1809 * happens the socket will be closed via close_connection_immediately(). However if the error
1810 * happens IN this method (`false` is returned), it is up to the caller to handle the error as
1811 * desired.
1812 *
1813 * Takes ownership of packet; do not reference it in any way after this method returns.
1814 *
1815 * @param sock
1816 * Socket under consideration.
1817 * @param packet
1818 * Packet to send.
1819 * @param err_code
1820 * See async_sock_low_lvl_packet_send_paced().
1821 * @return See async_sock_low_lvl_packet_send_paced().
1822 */
1824 Error_code* err_code);
1825
1826 /**
1827 * async_sock_low_lvl_packet_send_paced() pacing helper: Resets the socket's Send_pacing_data structure
1828 * to reflect that a new pacing time slice should begin right now. The slice start is set to now,
1829 * its period is set based on the current SRTT and congestion window (so that packets are evenly
1830 * spread out over the next SRTT); and the number of full packets allowed over this time slice are
1831 * computed.
1832 *
1833 * Pre-conditions: pacing is enabled for the socket in options; an SRTT value has been computed
1834 * (is not undefined); `sock` is in OPEN state; invariants described for `struct` Send_pacing_data
1835 * hold.
1836 *
1837 * @see `struct` Send_pacing_data doc header.
1838 * @param sock
1839 * Socket under consideration. Should be in OPEN state.
1840 * @param now
1841 * For performance (so that we don't need to acquire the current time again), this is the
1842 * very recent time point at which it was determined it is time for a new pacing time slice.
1843 */
1845
1846 /**
1847 * async_sock_low_lvl_packet_send_paced() pacing helper: Given that we are currently in the pacing time
1848 * slice in `sock->m_snd_pacing_data`, sends as many queued packets as possible given the time
1849 * slice's budget, and if any remain queued after this, schedules for them to be sent in the next
1850 * time slice.
1851 *
1852 * Pre-conditions: pacing is enabled for the socket in options; an SRTT value has been computed
1853 * (is not undefined); `sock` is in OPEN state; invariants described for `struct` Send_pacing_data
1854 * hold; the current time is roughly within the current pacing time slice.
1855 *
1856 * Note that an error may occur in asynchronous operations triggered by this method; if this
1857 * happens to socket will be closed via close_connection_immediately(). However if the error
1858 * happens IN this method (`false` is returned), it is up to the caller to handle the error as
1859 * desired.
1860 *
1861 * @param sock
1862 * Socket under consideration.
1863 * @param err_code
1864 * See async_sock_low_lvl_packet_send_paced().
1865 * @param executing_after_delay
1866 * `true` if executing from a pacing-related timer handler; `false` otherwise (i.e.,
1867 * if sock_pacing_new_packet_ready() is in the call stack).
1868 * @return See async_sock_low_lvl_packet_send_paced().
1869 */
1870 bool sock_pacing_process_q(Peer_socket::Ptr sock, Error_code* err_code, bool executing_after_delay);
1871
1872 /**
1873 * async_sock_low_lvl_packet_send_paced() pacing helper: If sock_pacing_process_q() ran out of the last
1874 * time slice's budget and still had packets to send, this is the handler that triggers when the
1875 * out-of-budget time slice ends. Sets up a new time slice starting now and tries to send as many
1876 * queud packets as possible with the new budget; if still more packets remain after this,
1877 * schedules yet another timer.
1878 *
1879 * This may also be called via `cancel()` of the timer. In this case, the pre-condition is that
1880 * `sock->state() == Peer_socket::State::S_CLOSED`; the method will do nothing.
1881 *
1882 * Otherwise, pre-conditions: Send_pacing_data::m_packet_q for `sock` is NOT empty; the byte budget for
1883 * the current time slice is less than the packet at the head `m_packet_q`; `sock` is in OPEN state;
1884 * invariants described for `struct` Send_pacing_data hold; the current time is roughly just past
1885 * the current pacing time slice.
1886 *
1887 * Note that an error may occur in asynchronous operations triggered by this method; if this
1888 * happens to socket will be closed via close_connection_immediately(). However if the error
1889 * happens IN this method (`false` is returned), it is up to the caller to handle the error as
1890 * desired.
1891 *
1892 * @param sock
1893 * Socket under consideration.
1894 * @param sys_err_code
1895 * boost.asio error code.
1896 */
1897 void sock_pacing_time_slice_end(Peer_socket::Ptr sock, const Error_code& sys_err_code);
1898
1899 /**
1900 * Similar to async_sock_low_lvl_packet_send_paced() except it also calls
1901 * `close_connection_immediately(sock)` if the former fails.
1902 *
1903 * @param sock
1904 * See async_sock_low_lvl_packet_send_paced(). Additionally, `sock` must be suitable for
1905 * close_connection_immediately(); see that method's doc comment.
1906 * @param packet
1907 * See async_sock_low_lvl_packet_send_paced() analogous parameter.
1908 * @param defer_delta_check
1909 * Same meaning as in close_connection_immediately().
1910 * @return See async_low_lvl_packet_send_paced().
1911 */
1913 Low_lvl_packet::Ptr&& packet,
1914 bool defer_delta_check);
1915
1916 /**
1917 * Sends an RST to the other side of the given socket asynchronously when possible. An error is
1918 * unlikely, but if it happens there is no reporting other than logging.
1919 *
1920 * @param sock
1921 * Socket the remote side of which will get the RST.
1922 */
1924
1925 /**
1926 * Sends an RST to the other side of the given socket, synchronously. An error is
1927 * unlikely, but if it happens there is no reporting other than logging. Will block (though
1928 * probably not for long, this being UDP) if #m_low_lvl_sock is in blocking mode.
1929 *
1930 * @param sock
1931 * Socket the remote side of which will get the RST.
1932 */
1934
1935 // Methods for core protocol logic dealing with deserialized packets before demuxing to Peer_socket or Server_socket.
1936
1937 /**
1938 * Handles a just-received, not-yet-deserialized low-level packet. A rather important method....
1939 *
1940 * @param packet_data
1941 * Packet to deserialize and handle. Upon return, the state of `*packet_data` is not known; and caller retains
1942 * ownership of it (e.g., can read another datagram into it if desired).
1943 * @param low_lvl_remote_endpoint
1944 * From where the packet came.
1945 */
1946 void handle_incoming(util::Blob* packet_data,
1947 const util::Udp_endpoint& low_lvl_remote_endpoint);
1948
1949 /**
1950 * Performs all tasks to be performed at the end of low_lvl_recv_and_handle() or
1951 * async part of async_wait_latency_then_handle_incoming(), as determined over the course of the execution
1952 * of either of those methods. This includes at least performing event_set_all_check_delta() for
1953 * anything in #m_sock_events, etc., and any accumulated ACK-related tasks stored in the Peer_sockets
1954 * in #m_socks_with_accumulated_pending_acks and similar. This is done for efficiency and to
1955 * reduce network overhead (for example, to combine several individual acknowledgments into one
1956 * ACK packet).
1957 */
1959
1960 // Methods dealing with individual Peer_sockets. Implementations are in peer_socket.cpp.
1961
1962 /**
1963 * Handles a just-deserialized, just-demultiplexed low-level SYN_ACK packet delivered to the given
1964 * peer socket in `S_SYN_SENT` state. So it will hopefully send back a SYN_ACK_ACK, etc.
1965 *
1966 * @param socket_id
1967 * Connection ID (socket pair) identifying the socket in #m_socks.
1968 * @param sock
1969 * Peer socket in Peer_socket::Int_state::S_SYN_SENT internal state.
1970 * @param syn_ack
1971 * Deserialized immutable SYN_ACK.
1972 */
1974 Peer_socket::Ptr sock,
1975 boost::shared_ptr<const Syn_ack_packet> syn_ack);
1976
1977 /**
1978 * Handles a just-deserialized, just-demultiplexed, duplicate (equal to already-received SYN_ACK)
1979 * low-level SYN_ACK packet delivered to the given peer socket in `S_ESTABLISHED` state. This will
1980 * hopefully reply with SYN_ACK_ACK again. Reasoning for this behavior is given in
1981 * handle_incoming() at the call to this method.
1982 *
1983 * @param sock
1984 * Peer socket in Peer_socket::Int_state::S_ESTABLISHED internal state with sock->m_active_connect.
1985 * @param syn_ack
1986 * Deserialized immutable SYN_ACK.
1987 */
1989 boost::shared_ptr<const Syn_ack_packet> syn_ack);
1990
1991 /**
1992 * Handles a just-deserialized, just-demultiplexed, low-level DATA packet delivered to the given
1993 * peer socket in `S_ESTABLISHED` state. This will hopefully reply with ACK and deliver the data to
1994 * the Receive buffer, where the user can receive() them.
1995 *
1996 * Also similarly handles packets received and queued earlier while in `S_SYN_RCVD` state.
1997 *
1998 * @param socket_id
1999 * Connection ID (socket pair) identifying the socket in #m_socks.
2000 * @param sock
2001 * Peer socket in Peer_socket::Int_state::S_ESTABLISHED internal state.
2002 * @param packet
2003 * Deserialized DATA packet. (For performance when moving data to Receive
2004 * buffer, this is modifiable.)
2005 * @param syn_rcvd_qd_packet
2006 * If `true`, this packet was saved during Peer_socket::Int_state::S_SYN_RCVD by handle_data_to_syn_rcvd() and
2007 * is being handled now that socket is Peer_socket::Int_state::S_ESTABLISHED. If `false`, this packet was
2008 * received normally during `S_ESTABLISHED` state.
2009 */
2011 Peer_socket::Ptr sock,
2012 boost::shared_ptr<Data_packet> packet,
2013 bool syn_rcvd_qd_packet);
2014
2015 /**
2016 * Helper for handle_data_to_established() that categorizes the DATA packet received as either
2017 * illegal; legal but duplicate of a previously received DATA packet;
2018 * legal but out-of-order; and finally legal and in-order. Illegal means sender can never validly send
2019 * such sequence numbers in a DATA packet. Legal means it can, although network problems may still lead to
2020 * the received DATA being not-useful in some way. Out-of-order means that `packet` occupies seq. numbers
2021 * past the start of the first unreceived data, or "first gap," which starts at Peer_socket::m_rcv_next_seq_num.
2022 * In-order, therefore, means `packet` indeed begins exactly at Peer_socket::m_rcv_next_seq_num (which means typically
2023 * one should increment the latter by `packet->m_data.size()`).
2024 *
2025 * No statistics are marked down on `sock`; the caller should proceed depending on the output as described
2026 * just below.
2027 *
2028 * If a truthy value is returned, packet is illegal; other outputs are meaningless. Otherwise, falsy is returned;
2029 * and: If `*dupe`, then packet is a legal dupe; and other outputs are meaningless. Otherwise, `!*dupe`. and:
2030 * `*slide` if and only if the packet is in-order (hence receive window left edge should "slide" right).
2031 * `*slide_size` is the number of bytes by which Peer_socket::m_rcv_next_seq_num should increment ("slide");
2032 * it is meaningful if and only if `*slide`.
2033 *
2034 * (Aside: Every attempt to detect illegality is made, within reason, but NOT every illegal behavior can be detected
2035 * as such; but defensive coding strives that a failure to detect such leads to nothing worse than meaningless data
2036 * received by user.)
2037 *
2038 * @param sock
2039 * See handle_data_to_established().
2040 * @param packet
2041 * See handle_data_to_established(). Note it is read-only, however.
2042 *.@param dupe
2043 * Output for whether the packet is a dupe (true if so). Meaningless if truthy is returned.
2044 * @param slide
2045 * Output for whether the packet consists of the next data to be passed to Receive buffer.
2046 * Meaningless if truthy is returned, or else if `*dupe` is set to `true`.
2047 * @param slide_size
2048 * By how much to increment Peer_socket::m_rcv_next_seq_num due to this in-order packet.
2049 * Meaningless unless `*slide` is set to `true`.
2050 * @return Success if `packet` is legal; the recommended error to accompany the connection-breaking RST due
2051 * to the illegal `packet`, otherwise.
2052 */
2054 boost::shared_ptr<const Data_packet> packet,
2055 bool* dupe, bool* slide, size_t* slide_size);
2056
2057 /**
2058 * Helper for handle_data_to_established() that aims to pass the payload of the given DATA packet to
2059 * the given socket's Receive buffer for user consumption; but detects and reports overflow if appropriate,
2060 * instead. Certain relevant stats are logged in all cases. `packet.m_data` is emptied due to moving it
2061 * elsewhere -- for performance (recommend saving its `.size()` before-hand, if needed for later) --
2062 * and the implications on rcv_wnd recovery (if any) are handled. `true` is returned assuming no overflow.
2063 *
2064 * If overflow detected, only statistical observations and logs are made, and `false` is returned.
2065 *
2066 * @param sock
2067 * See handle_data_to_established().
2068 * @param packet
2069 * See handle_data_to_established().
2070 * @return `false` on overflow; `true` on success.
2071 */
2073 boost::shared_ptr<Data_packet> packet);
2074
2075 /**
2076 * Helper for handle_data_to_established() that assumes the given's socket Receive buffer is currently
2077 * readable and handles implications on the Event_set subsystem.
2078 *
2079 * @param sock
2080 * See handle_data_to_established().
2081 * @param syn_rcvd_qd_packet
2082 * See handle_data_to_established().
2083 */
2084 void sock_rcv_buf_now_readable(Peer_socket::Ptr sock, bool syn_rcvd_qd_packet);
2085
2086 /**
2087 * Helper for handle_data_to_established() that aims to register the given DATA packet as an out-of-order
2088 * packet in `sock->m_rcv_packets_with_gaps` -- in retransmission-off mode. The retransmission-on counterpart
2089 * is, roughly speaking, sock_data_to_reassembly_q_unless_overflow().
2090 *
2091 * This assumes that sock_categorize_data_to_established() returned
2092 * `*slide == false`. However, due to overflow considerations
2093 * this helper itself set its own `*slide` (and `*slide_size`) value. The `*slide` argument should be
2094 * interpereted the same way as from sock_categorize_data_to_established(); `*slide_size` (meaningful if
2095 * and only if `*slide = true` is set) specifies by how much Peer_socket::m_rcv_next_seq_num must now increment.
2096 * (Note, then, that in the caller this can only set `*slide` from `false` to `true`; or not touch it.)
2097 *
2098 * @param sock
2099 * See handle_data_to_established().
2100 * @param packet
2101 * See handle_data_to_established(). Note it is read-only, however.
2102 * @param data_size
2103 * Original `packet->m_data.size()` value; by now presumbly that value is 0, but we want the original.
2104 * @param slide
2105 * Same semantics as in sock_categorize_data_to_established() (except it is always set; no "illegal" case).
2106 * @param slide_size
2107 * By how much to increment Peer_socket::m_rcv_next_seq_num due certain overflow considerations.
2108 */
2110 boost::shared_ptr<const Data_packet> packet,
2111 size_t data_size,
2112 bool* slide, size_t* slide_size);
2113
2114 /**
2115 * Helper for handle_data_to_established() that aims to register the given DATA packet as an out-of-order
2116 * packet in the reassembly queue `sock->m_rcv_packets_with_gaps` -- in retransmission-on mode; but detects
2117 * and reports overflow if appropriate, instead. Certain relevant stats are logged in all cases.
2118 * `packet.m_data` is emptied due to moving it elsewhere -- for performance (recommend saving its `.size()`
2119 * before-hand, if needed for later) -- and the implications on rcv_wnd recovery (if any) are handled.
2120 * `true` is returned assuming no overflow. The retransmission-off counterpart
2121 * is, roughly speaking, sock_track_new_data_after_gap_rexmit_off().
2122 *
2123 * If overflow detected, only statistical observations and logs are made, and `false` is returned.
2124 *
2125 * This assumes that sock_categorize_data_to_established() returned `*slide == false`.
2126 *
2127 * @param sock
2128 * See handle_data_to_established().
2129 * @param packet
2130 * See handle_data_to_established().
2131 * @return `false` on overflow; `true` on success.
2132 */
2134 boost::shared_ptr<Data_packet> packet);
2135
2136 /**
2137 * Helper for handle_data_to_established() that aims to register a set of received DATA packet data as in-order
2138 * payload in the structures Peer_socket::m_rcv_packets_with_gaps and Peer_socket::m_rcv_next_seq_num
2139 * in `sock`. Both structures are updated given the precondition that a set of data had arrived with data
2140 * starting at `sock->m_rcv_next_seq_num`. If `reassembly_in_progress` (which should be `true` if and only
2141 * if retransmission is on), then the reassembly queue is popped into `sock->m_rcv_buf` to the appropriate
2142 * extent (as the just-arrived packet may have bridged the entire gap to the first packet in that queue).
2143 *
2144 * Certain relevant stats are logged in all cases. Note that it's possible to simulate DATA packets' receipt
2145 * without actually having received such a packet. This method will slide the window as directed regardless.
2146 *
2147 * @param sock
2148 * See handle_data_to_established().
2149 * @param slide_size
2150 * By how much to increment (slide right) Peer_socket::m_rcv_packets_with_gaps.
2151 * See handle_data_to_established().
2152 * @param reassembly_in_progress
2153 * Basically, `sock->rexmit_on()`.
2154 */
2155 void sock_slide_rcv_next_seq_num(Peer_socket::Ptr sock, size_t slide_size, bool reassembly_in_progress);
2156
2157 /**
2158 * Computes and returns the max size for Peer_socket::m_rcv_packets_with_gaps for `sock`.
2159 *
2160 * @param sock
2161 * An open socket.
2162 * @return See above.
2163 */
2165
2166 /**
2167 * Causes an acknowledgment of the given received packet to be included in a future Ack_packet
2168 * sent to the other side. That ACK low-level UDP packet is not sent in this handler, even if
2169 * the low-level UDP socket is currently writable. The sending of this packet is performed
2170 * asynchronously in the manner of `boost::asio::io_service::post()`.
2171 *
2172 * Note that the Ack_packet may include other packets being acknowledged; and that ACK may be
2173 * artificially delayed for reasons like the desire to accumulate more acknowledgments before
2174 * sending ACK (to cut down on overhead).
2175 *
2176 * @param sock
2177 * Peer socket in Peer_socket::Int_state::S_ESTABLISHED.
2178 * @param seq_num
2179 * Sequence number of first datum in the packet to be acknowledged.
2180 * @param rexmit_id
2181 * Which attempt are we acknowledging (0 = initial send, 1 = first retransmission, 2 =
2182 * second retransmission, ...). Always 0 if retransmission is off.
2183 * @param data_size
2184 * Number of bytes in the user data in the packet to be acknowledged.
2185 */
2186 void async_acknowledge_packet(Peer_socket::Ptr sock, const Sequence_number& seq_num, unsigned int rexmit_id,
2187 size_t data_size);
2188
2189 /**
2190 * Helper of perform_accumulated_on_recv_tasks() that handles any additional individual outgoing
2191 * acknowledgments accumulated during the currently running receive handler. Pre-conditions:
2192 * executed from perform_accumulated_on_recv_tasks(); `!(Peer_socket::m_rcv_pending_acks).empty()`
2193 * for `sock`; Peer_socket::m_rcv_pending_acks_size_at_recv_handler_start (for `sock`) has been set;
2194 * `sock` is in #m_socks_with_accumulated_pending_acks.
2195 *
2196 * If state is not Peer_socket::Int_state::S_ESTABLISHED, method does nothing except possibly log.
2197 *
2198 * @param socket_id
2199 * Connection ID (socket pair) identifying the socket in #m_socks.
2200 * @param sock
2201 * Peer socket.
2202 */
2204
2205 /**
2206 * Helper for handle_data_to_established() that gets simple info about
2207 * Peer_socket::m_rcv_packets_with_gaps in `sock`.
2208 *
2209 * @param sock
2210 * Socket to examine.
2211 * @param first_gap_exists
2212 * Pointer to value to set to true if and only if !(Peer_socket::m_rcv_packets_with_gaps).empty()
2213 * in `sock`. If the Peer_socket::m_rcv_packets_with_gaps invariant fully holds, this means that
2214 * there is at least one gap of unreceived packets between some received packets and other received packets,
2215 * by sequence number order.
2216 * @param seq_num_after_first_gap
2217 * Pointer to value that will be set to the first sequence number of the first element of
2218 * `sock->m_rcv_packets_with_gaps`; untouched if `!*first_gap_exists` at return.
2219 */
2221 bool* first_gap_exists, Sequence_number* seq_num_after_first_gap);
2222
2223 /**
2224 * Logs TRACE or DATA messages that show the detailed state of the receiving sequence number
2225 * space. Quite slow if DATA log level is enabled or `force_verbose_info_logging` is `true`.
2226 *
2227 * @param sock
2228 * Socket whose data to log.
2229 * @param force_verbose_info_logging
2230 * If `true`, then the method acts as if DATA logging is enabled, i.e., the maximum amount of
2231 * information is logged (but with INFO verbosity). You should only do this if you know
2232 * for a fact that this is being called infrequently (such as from
2233 * perform_regular_infrequent_tasks()).
2234 */
2235 void log_rcv_window(Peer_socket::Const_ptr sock, bool force_verbose_info_logging = false) const;
2236
2237 /**
2238 * Handles a just-deserialized, just-demultiplexed, low-level ACK packet delivered to the given
2239 * peer socket in Peer_socket::Int_state::S_ESTABLISHED state. This will hopefully
2240 * update internal data structures and inform congestion control (or queue that to be done by the end of the
2241 * current receive handler, low_lvl_recv_and_handle() or async part of async_wait_latency_then_handle_incoming().
2242 *
2243 * @param sock
2244 * Peer socket in Peer_socket::Int_state::S_ESTABLISHED.
2245 * @param ack
2246 * Deserialized immutable ACK.
2247 */
2249 boost::shared_ptr<const Ack_packet> ack);
2250
2251 /**
2252 * Helper of perform_accumulated_on_recv_tasks() that handles any incoming acknowledgments and
2253 * rcv_wnd updates accumulated during the currently running receive handler. Pre-conditions:
2254 * executed from perform_accumulated_on_recv_tasks(); Peer_socket::m_rcv_acked_packets and
2255 * Peer_socket::m_snd_pending_rcv_wnd (in `sock`) have been set; `sock` is in
2256 * #m_socks_with_accumulated_acks.
2257 *
2258 * If `sock` is not in Peer_socket::Int_state::S_ESTABLISHED, method does nothing except possibly log.
2259 *
2260 * @param socket_id
2261 * Connection ID (socket pair) identifying the socket in #m_socks.
2262 * @param sock
2263 * Peer socket.
2264 */
2266
2267 /**
2268 * Helper of perform_accumulated_on_recv_tasks() that categorizes the given accumulated individual acknowledgment
2269 * w/r/t legality and validity; determines the DATA packet being acked if possible; logs and record stats accordingly;
2270 * and closes underlying socket if ack is illegal.
2271 *
2272 * In all cases, all relevant (to the categorization of the given ack) information is logged and stats are recorded.
2273 *
2274 * Furthermore, if the ack is illegal, the socket is closed (while `false` is returned). Otherwise, `true` is
2275 * returned, and `*dupe_or_late` is set to indicate whether the ack is valid or not. If valid,
2276 * `*acked_pkt_it` is definitely set to indicate which DATA packet is being acked. If invalid, `*acked_pkt_it`
2277 * may or may not be set, as that information may or may not be available any longer (example of it being available:
2278 * the ack is for an earlier transmission attempt of packet P, but packet P is currently In-flight due to a
2279 * subsequent retransmission attempt).
2280 *
2281 * @param socket_id
2282 * Connection ID (socket pair) identifying the socket in #m_socks.
2283 * @param sock
2284 * Peer socket.
2285 * @param ack
2286 * Individual acknowledgment being categorized.
2287 * @param dupe_or_late
2288 * Set to false if ack refers to currently In-flight instance of a packet; true if no longer In-flight
2289 * (late = considered Dropped laready; duplicate = was acked before); untouched if `false` returned.
2290 * @param acked_pkt_it
2291 * Set to point into Peer_socket::m_snd_flying_pkts_by_sent_when that is being acked if `!*dupe_or_late`,
2292 * or if `*dupe_or_late` but the acked packet is still known; set to `end()` a/k/a `past_oldest()`
2293 * otherwise; untouched if `false`
2294 * returned.
2295 * @return `false` if and only if the ack is sufficiently invalid to have made this method close the socket.
2296 */
2299 bool* dupe_or_late, Peer_socket::Sent_pkt_ordered_by_when_iter* acked_pkt_it);
2300
2301 /**
2302 * Helper of perform_accumulated_on_recv_tasks() that computes the RTT implied by a given individual acknowledgment.
2303 * In addition to returning the RTT, note the convenience out-param.
2304 *
2305 * @param flying_pkt
2306 * The In-flight DATA packet to which the ack pertains.
2307 * @param time_now
2308 * The current time to use for the RTT computation (not using value within to allow for caller to simulate
2309 * simultaneity between nearby RTT computations).
2310 * @param ack
2311 * Individual acknowledgment being categorized.
2312 * @param sent_when
2313 * This out-param is set to point within Peer_socket::m_snd_flying_pkts_by_sent_when's `Sent_when`
2314 * structure pertaining to the DATA packet send attempt to which `ack` refers.
2315 * @return The RTT. May be zero.
2316 */
2318 const Fine_time_pt& time_now,
2320 const Peer_socket::Sent_packet::Sent_when** sent_when) const;
2321
2322 /**
2323 * Handles a just-computed new RTT (round trip time) measurement for an individual packet earlier
2324 * sent: updates smoothed RTT, DTO, and anything else relevant.
2325 *
2326 * @param sock
2327 * Peer socket in Peer_socket::Int_state::S_ESTABLISHED.
2328 * @param round_trip_time
2329 * The RTT just computed, with as much resolution as is available.
2330 */
2331 void new_round_trip_time_sample(Peer_socket::Ptr sock, Fine_duration round_trip_time);
2332
2333 /**
2334 * Helper of perform_accumulated_on_recv_tasks() that determines the range of In-flight packets that should be
2335 * Dropped due to given individual acks that have just been processed; and updates the relevant `m_acks_after_me`
2336 * members in the socket.
2337 *
2338 * Logging is minimal, and no stats are recorded. However, see associated drop_pkts_on_acks() method.
2339 *
2340 * Peer_socket::Sent_packet::m_acks_after_me data members, as documented, are incremented where relevant based
2341 * on the just-processed acks in `flying_now_acked_pkts`.
2342 *
2343 * Finally, the following In-flight packets must be considered Dropped due to acks:
2344 * - The packet referred to by the returned iterator into Peer_socket::m_snd_flying_pkts_by_sent_when.
2345 * - All packets contained in the same structure appearing later in it (i.e., sent out earlier), up to
2346 * `past_oldest()` (a/k/a `end()`).
2347 *
2348 * Note that this method does not actually perform the various tasks: it only updates `m_acks_after_me` and
2349 * computes/returns the start of the to-be-Dropped range. See drop_pkts_on_acks() for the actual dropping.
2350 *
2351 * @param sock
2352 * Peer socket.
2353 * @param flying_now_acked_pkts
2354 * The individual DATA packet send attempts acks of which have just been processed.
2355 * The Peer_socket::Sent_packet (and within it, the Peer_socket::Sent_packet::Sent_when) with the order ID
2356 * P, where P is in `flying_now_acked_pkts`, must be in Peer_socket::m_snd_flying_pkts_by_sent_when.
2357 * @return Iterator into `sock->m_snd_flying_pkts_by_sent_when` indicating the latest-sent packet that should
2358 * be Dropped due to acks; `past_oldest()` a/k/a `end()` if none should be so Dropped.
2359 */
2362 const boost::unordered_set<Peer_socket::order_num_t>& flying_now_acked_pkts);
2363
2364 /**
2365 * Helper of perform_accumulated_on_recv_tasks() that acts on the determination made by
2366 * categorize_pkts_as_dropped_on_acks().
2367 *
2368 * In all cases, all relevant (to the categorization of the In-flight packets as Dropped) information is logged
2369 * and stats are recorded.
2370 *
2371 * This acts, or gathers information necessary to act, on the determination by categorize_pkts_as_dropped_on_acks()
2372 * that a certain range of In-flight packets should be Dropped due to excess acks of packets sent before them.
2373 * Namely:
2374 * - `*cong_ctl_dropped_...` are set to the values to report congestion control as part of a new loss event.
2375 * - `*dropped_...` are set to values that indicate totals w/r/t the packets Dropped (regardless of whether it's
2376 * a new or existing loss event).
2377 * - `*pkts_marked_to_drop` are loaded with the Peer_socket::Sent_packet::Sent_when::m_order_num order IDs
2378 * specifying the Dropped packets.
2379 * - `sock` members `m_snd_flying_pkts*` and related are updated, meaning the newly Dropped packets are removed.
2380 * - On the other hand, if retransmission is on, Peer_socket::m_snd_rexmit_q is pushed onto, gaining the
2381 * just-Dropped packets to retransmit.
2382 * - `true` is returned.
2383 *
2384 * However, if it is determined that a retransmission placed onto `sock->m_snd_rexmit_q` would indicate one
2385 * retransmission too many, the socket is closed, and `false` is returned.
2386 *
2387 * @param sock
2388 * Peer socket.
2389 * @param last_dropped_pkt_it
2390 * Return value of of categorize_pkts_as_dropped_on_acks().
2391 * @param cong_ctl_dropped_pkts
2392 * Will be set to total # of packets marked as Dropped to report to congestion control as part of
2393 * a loss event (`<= *dropped_pkts`).
2394 * @param cong_ctl_dropped_bytes
2395 * Total data size corresponding to `cong_ctl_dropped_pkts` (`<= *dropped_bytes)`).
2396 * @param dropped_pkts
2397 * Will be set to total # of packets marked as Dropped by this method.
2398 * @param dropped_bytes
2399 * Total data size corresponding to `dropped_pkts`.
2400 * @param pkts_marked_to_drop
2401 * Will be filled with packet IDs (`sock->m_snd_flying_pkts_by_sent_when[...]->m_sent_when->m_order_num`)
2402 * of the packets marked dropped by this method. Results undefined unless empty at method start.
2403 * @return `true` normally; `false` if too many retransmissions detected, and thus `sock` was closed.
2404 */
2406 const Peer_socket::Sent_pkt_ordered_by_when_iter& last_dropped_pkt_it,
2407 size_t* cong_ctl_dropped_pkts, size_t* cong_ctl_dropped_bytes,
2408 size_t* dropped_pkts, size_t* dropped_bytes,
2409 std::vector<Peer_socket::order_num_t>* pkts_marked_to_drop);
2410
2411 /**
2412 * Helper of handle_accumulated_acks() that logs the about-to-be-handled accumulated individual acknowledgments.
2413 *
2414 * @param sock
2415 * Peer socket with 0 or more accumulated acks recorded.
2416 */
2418
2419 /**
2420 * Handles a Drop_timer (Peer_socket::m_snd_drop_timer) event in ESTABLISHED state by dropping the specified
2421 * packets. To be executed as a Drop_timer callback.
2422 *
2423 * @param sock
2424 * Peer socket is Peer_socket::Int_state::S_ESTABLISHED with at least one In-flight sent packet.
2425 * @param drop_all_packets
2426 * If `true`, will consider all packets Dropped. If `false`, will consider only the earliest
2427 * In-flight packet dropped.
2428 */
2429 void drop_timer_action(Peer_socket::Ptr sock, bool drop_all_packets);
2430
2431 /**
2432 * Logs TRACE or DATA messages thats show the detailed state of the sending sequence number space.
2433 * Quite slow if DATA log level is enabled or `force_verbose_info_logging` is `true`.
2434 *
2435 * @param sock
2436 * Socket whose data to log.
2437 * @param force_verbose_info_logging
2438 * Similar to same argument in log_rcv_window().
2439 */
2440 void log_snd_window(Peer_socket::Const_ptr sock, bool force_verbose_info_logging = false) const;
2441
2442 /**
2443 * Thread W implementation of connect(). Performs all the needed work up to waiting for network
2444 * traffic, gives the resulting Peer_socket to the user thread, and signals that user thread.
2445 *
2446 * Pre-condition: We're in thread W; thread U != W is waiting for us to return having set `*sock`. Post-condition:
2447 * `*sock` contains a Peer_socket::Ptr in an OPEN+CONNECTING state if `!(Peer_socket::m_disconnect_cause)`
2448 * for `*sock`; otherwise an error occurred, and that error is Peer_socket::m_disconnect_cause (in `*sock`).
2449 *
2450 * @param to
2451 * See connect().
2452 * @param serialized_metadata
2453 * Serialized metadata to provide to the peer when the connection is being established.
2454 * @param opts
2455 * See connect().
2456 * @param sock
2457 * `*sock` shall be set to the resulting new Peer_socket. Check `(*sock)->m_disconnect_cause`.
2458 */
2459 void connect_worker(const Remote_endpoint& to,
2460 const boost::asio::const_buffer& serialized_metadata,
2461 const Peer_socket_options* opts,
2462 Peer_socket::Ptr* sock);
2463
2464 /**
2465 * Implementation core of `sync_connect*()` that gets rid of templated or missing arguments thereof.
2466 *
2467 * E.g., the API would wrap this and supply a Fine_duration instead of generic `duration`; and supply
2468 * `Fine_duration::max()` if user omitted the timeout argument. Code bloat and possible circular definition issues
2469 * are among the reasons for this "de-templating" pattern.
2470 *
2471 * @param to
2472 * See connect().
2473 * @param max_wait
2474 * See the public `sync_connect(timeout)`. `"duration<Rep, Period>::max()"` maps to the value
2475 * `Fine_duration::max()` for this argument.
2476 * @param serialized_metadata
2477 * See connect_with_metadata().
2478 * @param err_code
2479 * See sync_connect().
2480 * @param opts
2481 * See connect().
2482 * @return See sync_connect().
2483 */
2485 const boost::asio::const_buffer& serialized_metadata,
2486 Error_code* err_code,
2487 const Peer_socket_options* opts);
2488
2489 /**
2490 * Assuming we've just sent SYN or SYN_ACK, sets up an asynchronous scheduled task to fire within some
2491 * amount of time, so that we may try the SYN[_ACK] again if we don't get the acknowledgement by
2492 * then (or we may close socket after too many such retries). If `initial` is `true`, an overall
2493 * connection timeout scheduled task is also set up, to trigger the aforementioned close on timeout.
2494 *
2495 * @param socket_id
2496 * Connection ID (socket pair) identifying the socket in #m_socks.
2497 * @param sock
2498 * Peer socket in SYN_SENT or SYN_RCVD internal state.
2499 * @param initial
2500 * `true` if and only if the first SYN or SYN_ACK; otherwise it is a retry.
2501 */
2502 void setup_connection_timers(const Socket_id& socket_id, Peer_socket::Ptr sock, bool initial);
2503
2504 /**
2505 * Handles the triggering of the retransmit timer wait set up by
2506 * setup_connection_timers(); it will re-send the SYN or SYN_ACK.
2507 *
2508 * @param socket_id
2509 * Connection ID (socket pair) identifying the socket in #m_socks.
2510 * @param sock
2511 * Peer socket.
2512 */
2514
2515 /**
2516 * Cancel any timers and scheduled tasks active in the given socket. More precisely, causes for each handler
2517 * scheduled to happen in the future to be called as soon as possible with error code
2518 * `operation_aborted`. If, by the time the current handler has begun, the handler was about to be
2519 * called due the timer triggering, this method will not be able to induce `operation_aborted`.
2520 * Therefore the handler should be careful to check state and not rely on `operation_aborted`,
2521 * despite this method.
2522 *
2523 * Update: The caveats in previous paragraph do not apply to scheduled tasks (`util::schedule_task_*()`).
2524 * Canceling such tasks (which this method also does) prevents their handlers from running.
2525 *
2526 * @param sock
2527 * Socket whose timers/scheduled tasks to abort.
2528 */
2530
2531 /**
2532 * Creates a new Drop Timer and saves it to `sock->m_snd_drop_timer`. Pre-condition: `m_int_state ==
2533 * S_ESTABLISHED`, and `sock->m_snd_drop_timer` is null.
2534 *
2535 * @param socket_id
2536 * Connection ID (socket pair) identifying the socket in #m_socks.
2537 * @param sock
2538 * Socket that just entered ESTABLISHED state.
2539 */
2541
2542 /**
2543 * Implementation of non-blocking `sock->close_abruptly()` for socket `sock` in all cases except when
2544 * `sock->state() == State::S_CLOSED`. See Peer_socket::close_abruptly() doc
2545 * header; this method is the entirety of that method's implementation after CLOSED is
2546 * eliminated as a possibility.
2547 *
2548 * Pre-conditions:
2549 * - current thread is not W;
2550 * - `sock->m_mutex` is locked and just after entering `sock->close_abruptly()`;
2551 * - no changes to `*sock` have been made since `m_mutex` was locked;
2552 * - `sock->state() == Stated::S_OPEN` (so `sock` is in #m_socks);
2553 * - `sock` has been given to user via accept() or connect() or friends.
2554 *
2555 * Post-condition (not exhaustive): `sock->m_mutex` is unlocked.
2556 *
2557 * @param sock
2558 * Socket in OPEN state.
2559 * @param err_code
2560 * See Peer_socket::close_abruptly().
2561 */
2562 void close_abruptly(Peer_socket::Ptr sock, Error_code* err_code);
2563
2564 /**
2565 * A thread W method that handles the transition of the given socket from OPEN (any sub-state)
2566 * to CLOSED (including eliminating the given Peer_socket from our data structures). For
2567 * example, if an invalid packet comes in on the socket, and we send back an RST, then we're free
2568 * to then close our side immediately, as no further communication (with the other side or the
2569 * local user) is needed. As another example, if we there is a graceful close while Receive buffer
2570 * has data, user must Receive all of it, and the final handshake must finish, and then this is called.
2571 *
2572 * @todo Graceful close not yet implemented w/r/t close_connection_immediately().
2573 *
2574 * Pre-condition: if `err_code` is failure: `sock` is in #m_socks; `sock->state() == S_OPEN` (and any
2575 * `sock->m_int_state` that corresponds to it); `err_code` contains the reason for the close.
2576 *
2577 * Pre-condition: if `err_code` is success: `sock` is in #m_socks; `sock` state is
2578 * OPEN+DISCONNECTING; `m_int_state` is CLOSED; Send and Receive buffers are empty;
2579 * `m_disconnect_cause` is not success.
2580 *
2581 * Post-condition: `sock` Receive and Send buffers are empty; `sock->state() == S_CLOSED` (and `sock`
2582 * is no longer in #m_socks or any other Node structures, directly or indirectly) with
2583 * `sock->m_disconnect_cause` set to reason for closing. Other decently memory-consuming structures
2584 * are also cleared to conserve memory.
2585 *
2586 * Any socket that is in #m_socks MUST be eventually closed using this method. No
2587 * socket that is not in #m_socks must be passed to this method. In particular, do not call this
2588 * method during connect() or handle_syn_to_listening_server().
2589 *
2590 * @param socket_id
2591 * Connection ID (socket pair) identifying the socket in #m_socks.
2592 * @param sock
2593 * Socket to close.
2594 * @param err_code
2595 * If this is not success, then it is an abrupt close, and this is why `sock` is being
2596 * abruptly closed. `m_disconnect_cause` is set accordingly and logged.
2597 * If `err_code` is failure, then: `sock` is OPEN+DISCONNECTING (graceful close), and all
2598 * criteria required for it to move so CLOSED are satisfied: internal state is CLOSED
2599 * (goodbye handshake finished), and Receive and Send buffers are empty; `m_disconnect_cause`
2600 * is already set.
2601 * @param defer_delta_check
2602 * Same meaning as in event_set_all_check_delta().
2603 */
2605 const Error_code& err_code, bool defer_delta_check);
2606
2607 /**
2608 * Helper that creates a new SYN packet object to the extent that is suitable for immediately passing to
2609 * async_sock_low_lvl_packet_send_paced(). `sock` members that reflect any data in Syn_packet must already be
2610 * saved and are not used as the source for such data.
2611 *
2612 * @param sock
2613 * See async_sock_low_lvl_packet_send().
2614 * @return Pointer to new packet object suitable for async_sock_low_lvl_packet_send_paced() without having to fill
2615 * any further data members in the object.
2616 */
2618
2619 /**
2620 * Like create_syn() but for SYN_ACK.
2621 *
2622 * @param sock
2623 * See create_syn().
2624 * @return See create_syn().
2625 */
2627
2628 /**
2629 * Helper to create, fully fill out, and asynchronously send via async_sock_low_lvl_packet_send_or_close_immediately()
2630 * a SYN_ACK_ACK packet. Since rcv_wnd is advertised, Peer_socket::m_rcv_last_sent_rcv_wnd is updated for `sock`.
2631 *
2632 * @param sock
2633 * See async_sock_low_lvl_packet_send().
2634 * @param syn_ack
2635 * SYN_ACK to which the resulting SYN_ACK_ACK is the reply.
2636 * @return See async_sock_low_lvl_packet_send().
2637 */
2639 boost::shared_ptr<const Syn_ack_packet>& syn_ack);
2640
2641 /**
2642 * Asynchronously send RST to the other side of the given socket and
2643 * close_connection_immediately().
2644 *
2645 * @param socket_id
2646 * See close_connection_immediately().
2647 * @param sock
2648 * See close_connection_immediately().
2649 * @param err_code
2650 * See close_connection_immediately().
2651 * @param defer_delta_check
2652 * Same meaning as in event_set_all_check_delta().
2653 */
2655 const Error_code& err_code, bool defer_delta_check);
2656
2657 /**
2658 * Implementation of non-blocking `sock->send()` for socket `sock` in all cases except when
2659 * `sock->state() == State::S_CLOSED`.
2660 *
2661 * Pre-conditions:
2662 * - current thread is not W;
2663 * - `sock->m_mutex` is locked and after entering `sock->[sync_]send()`;
2664 * - no changes to `*sock` have been made since `m_mutex` was locked;
2665 * - `sock->state() == State::S_OPEN` (so `sock` is in #m_socks);
2666 * - `snd_buf_feed_func is as described below.
2667 *
2668 * This method completes the functionality of `sock->send()`.
2669 *
2670 * @see Important: see giant comment inside Node::send() for overall design and how send_worker()
2671 * fits into it.
2672 * @param sock
2673 * Socket, which must be in #m_socks, on which `[sync_]send()` was called.
2674 * @param snd_buf_feed_func
2675 * Pointer to function with signature `size_t fn(size_t x)` that will perform
2676 * `sock->m_snd_buf.feed_bufs_copy(...)` call with `max_data_size == X`, which will feed the
2677 * data the user wants to `sock->send()` into `sock->m_snd_buf`, and return the return value
2678 * of that call (which indicates how many bytes the call was able to fit into `m_snd_buf`).
2679 * Doing it this way prevents this Node::send() from being a template, which prevents
2680 * circular dependency unpleasantness. See Peer_socket::send() for details.
2681 * @param err_code
2682 * See Peer_socket::send().
2683 * @return See Peer_socket::send().
2684 */
2685 size_t send(Peer_socket::Ptr sock,
2686 const Function<size_t (size_t max_data_size)>& snd_buf_feed_func,
2687 Error_code* err_code);
2688
2689 /**
2690 * Returns `true` if and only if calling `sock->send()` with at least some arguments would return
2691 * either non-zero (i.e., successfully enqueued data to send) or zero and an error (but not
2692 * zero and NO error). `sock` will be locked and unlocked; safe to call from any thread.
2693 *
2694 * @param sock_as_any
2695 * Socket to examine, as an `any` wrapping a Peer_socket::Ptr.
2696 * @return See above.
2697 */
2698 bool sock_is_writable(const boost::any& sock_as_any) const;
2699
2700 /**
2701 * Helper placed by send() onto W to invoke send_worker() but ensures that the socket has not
2702 * entered some state such that sending data is not possible and no longer going to be possible.
2703 *
2704 * Example: `send(sock)` runs while `sock` is in ESTABLISHED state; queues up
2705 * send_worker_check_state() on thread W; thread W detects a connection reset and moves `sock` to
2706 * CLOSED; send_worker_check_state() gets its turn on thread W; detects state is now CLOSED and
2707 * returns without doing anything.
2708 *
2709 * @see Important: see giant comment inside Node::send() for overall design and how send_worker()
2710 * fits into it.
2711 * @param sock
2712 * Socket on which to possibly send low-level packets.
2713 */
2715
2716 /**
2717 * Thread W implemention of send(): synchronously or asynchronously send the contents of
2718 * `sock->m_snd_buf` to the other side. This locks the socket and examines `m_snd_buf`. If a low-level
2719 * UDP packet cannot be produced from the front of `m_snd_buf` (i.e., not enough data in `m_snd_buf`),
2720 * then there is nothing to do. Otherwise, determines whether network conditions (e.g.,
2721 * congestion control) allow for 1 or more such packets to be sent. If not, then there is nothing
2722 * to do. Otherwise (if 1 or more packets can be sent), 1 or more packets are sent and removed
2723 * from `sock->m_snd_buf`. Finally, `m_snd_buf` is unlocked.
2724 *
2725 * Pre-condition: `sock->m_int_state == S_ESTABLISHED`. @todo Are there other states where sending
2726 * DATA packets is OK? If so it would be during graceful termination, if we implement it. See
2727 * send_worker() for contedt for this to-do.
2728 *
2729 * @see Important: see giant comment inside Node::send() for overall design and how send_worker()
2730 * fits into it.
2731 * @param sock
2732 * Socket on which to possibly send low-level packets.
2733 * @param defer_delta_check
2734 * Same meaning as in event_set_all_check_delta().
2735 */
2736 void send_worker(Peer_socket::Ptr sock, bool defer_delta_check);
2737
2738 /**
2739 * Answers the perennial question of congestion and flow control: assuming there is a DATA packet
2740 * to send to the other side on the given socket, should we do so at this moment? Over a perfect
2741 * link and with a perfect receiver, this would always return true, and we would always send every
2742 * packet as soon as we could make it. As it is, some congestion control algorithm is used here
2743 * to determine if the link should be able to handle a packet, and rcv_wnd is used to determine if
2744 * the receive would be able to buffer a packet if it did arrive.
2745 *
2746 * @param sock
2747 * Socket for which we answer the question.
2748 * @return `true` if should send; `false` if should wait until it becomes `true` and THEN send.
2749 */
2750 bool can_send(Peer_socket::Const_ptr sock) const;
2751
2752 /**
2753 * Implementation of non-blocking sock->receive() for socket `sock` in all cases except when
2754 * `sock->state() == State::S_CLOSED`.
2755 *
2756 * Pre-conditions:
2757 * - current thread is not W;
2758 * - `sock->m_mutex` is locked and just after entering `sock->receive()`;
2759 * - no changes to `*sock` have been made since `m_mutex` was locked;
2760 * - `sock->state() == Stated::S_OPEN` (so `sock` is in #m_socks);
2761 * - `rcv_buf_feed_func` is as described below.
2762 *
2763 * This method completes the functionality of `sock->receive()`.
2764 *
2765 * @param sock
2766 * Socket, which must be in #m_socks, on which `receive()` was called.
2767 * @param rcv_buf_consume_func
2768 * Pointer to function with signature `size_t fn()` that will perform
2769 * `sock->m_rcv_buf.consume_bufs_copy(...)` call, which will consume data from `m_rcv_buf`,
2770 * and return the return value of that call (which indicates how many bytes
2771 * Socket_buffer::consume_bufs_copy() was able to fit into the user's data structure). Doing it this way
2772 * prevents this Node::receive() from being a template, which prevents circular dependency
2773 * unpleasantness. See Peer_socket::receive() for details.
2774 * @param err_code
2775 * See Peer_socket::receive().
2776 * @return See Peer_socket::receive().
2777 */
2778 size_t receive(Peer_socket::Ptr sock,
2779 const Function<size_t ()>& rcv_buf_consume_func,
2780 Error_code* err_code);
2781
2782 /**
2783 * Returns `true` if and only if calling sock->receive() with at least some arguments would return
2784 * either non-zero (i.e., successfully dequeued received data) or zero and an error (but not
2785 * zero and NO error). `sock` will be locked and unlocked; safe to call from any thread.
2786 *
2787 * @param sock_as_any
2788 * Socket to examine, as an `any` wrapping a Peer_socket::Ptr.
2789 * @return See above.
2790 */
2791 bool sock_is_readable(const boost::any& sock_as_any) const;
2792
2793 /**
2794 * Placed by receive() onto W if it has dequeued data from Receive buffer and given it to the
2795 * user, which would free up space in the Receive buffer, which *possibly* should result in a
2796 * window update sent to the server, so that it knows it can now send more data.
2797 *
2798 * @see Node::receive().
2799 * @param sock
2800 * Socket (whose state is ESTABLISHED or later).
2801 */
2803
2804 /**
2805 * receive_wnd_updated() helper that continues rcv_wnd recovery: that is, sends unsolicited ACK
2806 * with a rcv_wnd advertisement only and schedules the next iteration of a timer to have this
2807 * occur again, unless that timer is canceled due to too long a recovery phase or DATA packets
2808 * arriving from the other side.
2809 *
2810 * @param sock
2811 * See receive_wnd_updated().
2812 * @param rcv_wnd
2813 * The rcv_wnd (free Receive buffer space) to advertise to the other side.
2814 */
2815 void async_rcv_wnd_recovery(Peer_socket::Ptr sock, size_t rcv_wnd);
2816
2817 /**
2818 * Pertaining to the async_rcv_wnd_recovery() mechanism, this handles the event that we have
2819 * received an acceptable (either into Receive buffer or reassembly queue) DATA packet from the
2820 * other side. If we are currently in rcv_wnd recovery, this signifies the recovery "worked" --
2821 * the sender is sending data again -- so we can now end this phase.
2822 *
2823 * @param sock
2824 * See receive_wnd_updated().
2825 */
2827
2828 /**
2829 * Computes and returns the currently correct rcv_wnd value; that is the amount of space free in
2830 * Receive buffer for the given socket. This may only be called from thread W.
2831 *
2832 * @param sock
2833 * A socket.
2834 * @return See above.
2835 */
2836 size_t sock_rcv_wnd(Peer_socket::Const_ptr sock) const;
2837
2838 /**
2839 * Placed by receive() onto W during a graceful close, after the Receive buffer had been emptied
2840 * by the user; determines whether the socket can now proceed to
2841 * `Peer_socket::m_state == Peer_socket::State::S_CLOSED`
2842 * and be removed from the Node.
2843 *
2844 * @see Node::receive().
2845 * @param sock
2846 * Socket which may possibly now move to `m_state == S_CLOSED`.
2847 */
2849
2850 /**
2851 * Sends a low-level ACK packet, with all accumulated in Peer_socket::m_rcv_pending_acks of `sock` individual packet
2852 * acknowledgments, to the other side's UDP endpoint. If the pending acknowledgments don't fit
2853 * into one ACK, more ACKs are generated and sent as necessary. If there is an error sending or
2854 * preparing to send, `sock` is closed abruptly (close_connection_immediately()).
2855 *
2856 * This may be called either directly or by boost.asio due to delayed ACK timer being triggered.
2857 * If `sock` is not in Peer_socket::Int_state::S_ESTABLISHED, this does nothing except possibly logging.
2858 *
2859 * @param sock
2860 * Socket the remote side of which will get the RST. Method is basically a NOOP unless
2861 * state is Peer_socket::Int_state::S_ESTABLISHED.
2862 * @param defer_delta_check
2863 * Same meaning as in event_set_all_check_delta().
2864 * @param sys_err_code
2865 * If invoked via timer trigger, this is boost.asio's error code. If invoked directly,
2866 * this should be set to the default (success). Value is handled as follows: assuming
2867 * ESTABLISHED state: `operation_aborted` => NOOP; success or any other error => attempt to
2868 * send ACK(s).
2869 */
2870 void async_low_lvl_ack_send(Peer_socket::Ptr sock, bool defer_delta_check,
2871 const Error_code& sys_err_code = Error_code());
2872
2873 /**
2874 * Return `true` if and only if there are enough data either in Peer_socket::m_snd_rexmit_q of `sock` (if
2875 * retransmission is on) or in Peer_socket::m_snd_buf of `sock` to send a DATA packet to the other
2876 * side.
2877 *
2878 * Pre-condition: `sock->m_mutex` is locked.
2879 *
2880 * @param sock
2881 * Socket whose retransmission queue and Send buffer to examine.
2882 * @return See above.
2883 */
2884 bool snd_deqable(Peer_socket::Const_ptr sock) const;
2885
2886 /**
2887 * Return `true` if and only if there is enough free space in Peer_socket::m_snd_buf of `sock` to enqueue any given
2888 * atomic piece of user data.
2889 *
2890 * Pre-condition: `sock->m_mutex` is locked.
2891 *
2892 * Currently this simply means that there is space for at least max-block-size bytes (i.e., one
2893 * maximally large block) in `sock->m_snd_buf`.
2894 *
2895 * Design rationale for the latter: See code.
2896 *
2897 * @param sock
2898 * Socket whose Send buffer to examine.
2899 * @return See above.
2900 */
2901 bool snd_buf_enqable(Peer_socket::Const_ptr sock) const;
2902
2903 /**
2904 * Return true if and only if there are enough data in Peer_socket::m_rcv_buf of `sock` to give the user some
2905 * data in a Peer_socket::receive() call.
2906 *
2907 * Pre-condition: `sock->m_mutex` is locked.
2908 *
2909 * Currently this simply means that there is at least 1 block of data in `m_rcv_buf`.
2910 *
2911 * Design rationale: see snd_buf_deqable().
2912 *
2913 * @param sock
2914 * Socket whose Receive buffer to examine.
2915 * @return See above.
2916 */
2917 bool rcv_buf_deqable(Peer_socket::Const_ptr sock) const;
2918
2919 /**
2920 * Sets internal state of given socket to the given state and logs a TRACE message about it.
2921 * Should only be run from thread W; performs no locking.
2922 *
2923 * @param sock
2924 * Socket under consideration.
2925 * @param new_state
2926 * New state.
2927 */
2929
2930 /**
2931 * Sets Peer_socket::m_state and Peer_socket::m_open_sub_state. If moving to Peer_socket::State::S_CLOSED, resets
2932 * the required data to their "undefined" values (e.g., Peer_socket::m_local_port = #S_PORT_ANY). Thread-safe.
2933 *
2934 * @warning Only set `state` = `S_CLOSED` if no more data are in Receive buffer, so that the
2935 * user can get those data before `S_CLOSED` state. See Peer_socket::State::S_DISCONNECTING.
2936
2937 * @param sock
2938 * Socket under consideration.
2939 * @param state
2940 * New Peer_socket::m_state.
2941 * @param open_sub_state
2942 * Ignored if `state != S_OPEN`; otherwise the new value for Peer_socket::m_open_sub_state.
2943 */
2945 Peer_socket::State state,
2947
2948 /**
2949 * Records that thread W shows underlying connection is broken (graceful termination, or error)
2950 * and sets Peer_socket::m_disconnect_cause and Peer_socket::m_state, Peer_socket::m_open_sub_state accordingly.
2951 * Optionally also empties the Send and Receive buffers and any other decently memory-consuming structures.
2952 * Thread-safe.
2953 *
2954 * So the mutually exclusive closure scenarios are:
2955 * - `sock_disconnect_detected(sock, err_code, false); ...; sock_disconnect_completed(sock);`
2956 * Graceful close initiated; ...buffers emptied...; graceful close completed.
2957 * - `sock_disconnect_detected(sock, err_code, true);`
2958 * Abrupt close, or graceful close when the buffers already happen to be empty.
2959 *
2960 * @param sock
2961 * Socket under consideration.
2962 * @param disconnect_cause
2963 * The cause of the disconnect.
2964 * @param close
2965 * If `true`, the target public state should be the super-final `S_CLOSED`, and the Send and
2966 * Receive buffers are cleared; if `false`, the target public state should be the ominous
2967 * `S_OPEN`+`S_DISCONNECTING`, and the buffers are left alone. The caller's responsibility is
2968 * to decide which one it is, but `true` is typically either for an abrupt close (e.g.,
2969 * RST) or for a graceful close when buffers are empty; while `false` is typically for a
2970 * graceful close before buffers are empty, so that the user can get Receive buffer, and
2971 * the Node can send out Send buffer.
2972 */
2974 const Error_code& disconnect_cause, bool close);
2975
2976 /**
2977 * While in `S_OPEN`+`S_DISCONNECTING` state (i.e., after beginning a graceful close with
2978 * `sock_disconnect_detected(..., false)`, moves the socket to `S_CLOSED` state and clears Receive/Send
2979 * buffers and any other decently memory-consuming structures.
2980 *
2981 * Pre-conditions: state is `S_OPEN`+`S_DISCONNECTING`; Peer_socket::m_disconnect_cause is set to non-success
2982 * value.
2983 *
2984 * @param sock
2985 * Socket under consideration.
2986 */
2988
2989 /**
2990 * Helper that clears all non-O(1)-space data structures stored inside `sock`. Intended to be
2991 * called from `sock_disconnect_*()`, not anywhere else. Pre-condition: `sock->m_mutex` is
2992 * locked.
2993 *
2994 * @param sock
2995 * Socket under consideration.
2996 */
2998
2999 /**
3000 * Analogous to validate_options() but checks per-socket options instead of per-Node
3001 * options.
3002 *
3003 * `*prev_opts` is replaced with `opts`. Leave `prev_opts` as null unless an
3004 * existing Peer_socket's options are being changed via Peer_socket::set_options(). Otherwise a
3005 * Node_options::m_dyn_sock_opts Peer_socket_options is being changed, and that is
3006 * always allowed (since if a per-socket option were not dynamic in that way, it would simply be a
3007 * per-Node option instead).
3008 *
3009 * @param opts
3010 * New option values to validate.
3011 * @param prev_opts
3012 * null if called from constructor; `&sock->m_opts` if called from sock->set_options().
3013 * Used to ensure no static per-socket option is being changed.
3014 * @param err_code
3015 * After return, `*err_code` is success or: error::Code::S_OPTION_CHECK_FAILED,
3016 * error::Code::S_STATIC_OPTION_CHANGED.
3017 * If `!err_code`, error::Runtime_error() with that #Error_code is thrown instead.
3018 * @return `true` on success, `false` on validation error.
3019 */
3020 bool sock_validate_options(const Peer_socket_options& opts, const Peer_socket_options* prev_opts,
3021 Error_code* err_code) const;
3022
3023 /**
3024 * Thread W implementation of sock->set_options(). Performs all the needed work to complete
3025 * `sock->set_options()` call.
3026 *
3027 * Pre-condition: `sock->state()` is not Peer_socket::State::S_CLOSED.
3028 *
3029 * @param sock
3030 * See Peer_socket::set_options().
3031 * @param opts
3032 * See Peer_socket::set_options().
3033 * @param err_code
3034 * See Peer_socket::set_options().
3035 * @return See Peer_socket::set_options().
3036 */
3037 bool sock_set_options(Peer_socket::Ptr sock, const Peer_socket_options& opts, Error_code* err_code);
3038
3039 /**
3040 * Implementation of `sock->info()` for socket `sock` in all cases except when
3041 * `sock->state() == Peer_socket::State::S_CLOSED`. See Peer_socket::info() doc header; this method is the entirety
3042 * of that method's implementation after `S_CLOSED` is eliminated as a possibility.
3043 *
3044 * Pre-conditions:
3045 * - current thread is not W;
3046 * - `sock->m_mutex` is locked and just after entering `sock->info()`;
3047 * - no changes to *sock have been made since `m_mutex` was locked;
3048 * - `sock->state() == Peer_socket::State::S_OPEN`.
3049 *
3050 * Post-condition (not exhaustive): `sock->m_mutex` is unlocked.
3051 *
3052 * @param sock
3053 * Socket in consideration.
3054 * @return See Peer_socket::info().
3055 */
3057
3058 /**
3059 * Given a Peer_socket, copies all stats info (as available via Peer_socket::info()) from various
3060 * structures into the given stats `struct`. This can then be logged, given to the user, etc.
3061 *
3062 * This should be run from thread W only.
3063 *
3064 * @param sock
3065 * Socket in consideration. It can be in any state, but see above.
3066 * @param stats
3067 * All members (direct or indirect) of this `struct` will be filled.
3068 */
3070
3071 /**
3072 * Constructs the socket pair (connection ID) for the given socket. For performance, try not to
3073 * use this, as this is usually already available in most points in Node code and can be passed
3074 * around to places where it's not. However there are situations when one must reconstruct it
3075 * from a Peer_socket::Ptr alone.
3076 *
3077 * Call from thread W only.
3078 *
3079 * @todo Could make it a Socket_id constructor instead.
3080 * @param sock
3081 * Source socket.
3082 * @return Ditto.
3083 */
3085
3086 /**
3087 * Obtain the sequence number for the datum just past the last (latest) In-flight (i.e., sent but
3088 * neither Acknowledged nor Dropped) packet, for the given socket. If there are no In-flight
3089 * packets, returns the default Sequence_number -- which is < all other Sequence_numbers.
3090 *
3091 * Note that "last" in this case refers to position in the sequence number space, not time at which packets
3092 * are sent. (A packet with a given Sequence_number may be sent several times due to retransmission.)
3093 *
3094 * @param sock
3095 * Socket whose In-flight packets to examine.
3096 * @return See above.
3097 */
3099
3100 /**
3101 * Erases (for example if considered Acknowledged or Dropped) a packet `struct` from the
3102 * "scoreboard" (Peer_socket::m_snd_flying_pkts_by_sent_when) and adjusts all related structures.
3103 *
3104 * Note: It does NOT inform `sock->m_snd_drop_timer` (namely calling Drop_timer::on_packet_no_longer_in_flight()).
3105 * This is left to the caller; in particular because the timing may not be appropriate for what such a
3106 * call might trigger (e.g., on-Drop-Timeout actions such as massive retransmission).
3107 *
3108 * @param sock
3109 * Socket to modify.
3110 * @param pkt_it
3111 * Iterator into `m_snd_flying_pkts_by_sent_when` which will be deleted.
3112 */
3114
3115 /**
3116 * Adds a new packet `struct` (presumably representing packet to be sent shortly) to the
3117 * "scoreboard" (Peer_socket::m_snd_flying_pkts_by_sent_when) and adjusts all related structures as applicable. Note,
3118 * however, that mark_data_packet_sent() is NOT called, because we should do that when the DATA
3119 * packet is actually sent (after pacing, if any).
3120 *
3121 * @param sock
3122 * Socket to modify.
3123 * @param seq_num
3124 * The first sequence number of the DATA packet.
3125 * @param sent_pkt
3126 * Ref-counted pointer to new packet `struct`.
3127 */
3129 const Sequence_number& seq_num,
3131
3132 /**
3133 * Updates Peer_socket::m_snd_flying_bytes according to an operation (add packets, remove packets)
3134 * caller is about to undertake or has just undertaken on Peer_socket::m_snd_flying_pkts_by_sent_when (= the
3135 * scoreboard). Call this WHENEVER `m_snd_flying_pkts_by_sent_when` is about to be modified (if erasing) or
3136 * has just been modified (if adding) to ensure `m_snd_flying_bytes` is updated accordingly.
3137 *
3138 * @warning This has strong implications for congestion control! Do not forget.
3139 * @param sock
3140 * Socket to modify.
3141 * @param pkt_begin
3142 * Iterator to first packet that was added or will be removed.
3143 * @param pkt_end
3144 * Iterator one past the last packet that was added or will be removed.
3145 * @param added
3146 * If `true`, the given range of packets was just added (e.g., Sent); if `false`, the given
3147 * range of packets is about to be removed (e.g., Dropped or Acknowledged).
3148 */
3152 bool added);
3153
3154 /**
3155 * Checks whether the given sent packet has been retransmitted the maximum number of allowed
3156 * times; if so then performs rst_and_close_connection_immediately() and returns `false`; otherwise
3157 * returns `true`.
3158 *
3159 * @param sock
3160 * Socket to check and possibly close.
3161 * @param pkt_it
3162 * Iterator info Peer_socket::m_snd_flying_pkts_by_sent_when of `sock` for packet in question. Its
3163 * `m_rexmit_id` should not yet be incremented for the potential new retransmission.
3164 * @param defer_delta_check
3165 * Same meaning as in event_set_all_check_delta().
3166 * @return See above.
3167 */
3170 bool defer_delta_check);
3171
3172 /**
3173 * Logs a verbose state report for the given socket. This is suitable for calling from
3174 * perform_regular_infrequent_tasks() and other infrequently executed spots.
3175 *
3176 * @param sock
3177 * Socket whose state to log.
3178 */
3179 void sock_log_detail(Peer_socket::Const_ptr sock) const;
3180
3181 /**
3182 * Assuming `*seq_num` points to the start of data.m_data, increments `*seq_num` to point
3183 * to the datum just past `data->m_data`.
3184 *
3185 * @param seq_num
3186 * Pointer to sequence number to increment.
3187 * @param data
3188 * DATA packet whose `m_data` to examine.
3189 */
3190 static void advance_seq_num(Sequence_number* seq_num,
3191 boost::shared_ptr<const Data_packet> data);
3192
3193 /**
3194 * Assuming `*seq_num` points to the start of some data of the given size, increments
3195 * `*seq_num` to point to the datum just past that amount of data.
3196 *
3197 * @param seq_num
3198 * Pointer to sequence number to increment.
3199 * @param data_size
3200 * Data size.
3201 */
3202 static void advance_seq_num(Sequence_number* seq_num, size_t data_size);
3203
3204 /**
3205 * Given an iterator into a Peer_socket::Sent_pkt_by_sent_when_map or Peer_socket::Recv_pkt_map, gets the range of
3206 * sequence numbers in the packet represented thereby.
3207 *
3208 * @tparam Packet_map_iter
3209 * Iterator type (`const` or otherwise) into one of the above-mentioned maps.
3210 * @param packet_it
3211 * A valid, non-`end()` iterator into such a map.
3212 * @param seq_num_start
3213 * If 0, ignored; otherwise the sequence number of the first datum in that packet is placed
3214 * there.
3215 * @param seq_num_end
3216 * If 0, ignored; otherwise the sequence number just past the last datum in that packet is
3217 * placed there.
3218 */
3219 template<typename Packet_map_iter>
3220 static void get_seq_num_range(const Packet_map_iter& packet_it,
3221 Sequence_number* seq_num_start, Sequence_number* seq_num_end);
3222
3223 /**
3224 * Returns the "order number" to use for Peer_socket::Sent_packet::Sent_when structure corresponding to the next
3225 * packet to be sent. This will be higher than the last sent packet's number. Make sure you send packets
3226 * in exactly increasing numeric order of this order number.
3227 *
3228 * 0 is reserved and never returned by this.
3229 *
3230 * @param sock
3231 * Socket to consider.
3232 * @return See above.
3233 */
3235
3236 /**
3237 * Internal factory used for ALL Peer_socket objects created by this Node (including subclasses).
3238 *
3239 * @param opts
3240 * See Peer_socket::Peer_socket().
3241 * @return Pointer to newly constructed socket.
3242 */
3243 virtual Peer_socket* sock_create(const Peer_socket_options& opts);
3244
3245 // Methods dealing with individual Server_sockets. Implementations are in server_socket.cpp.
3246
3247 /**
3248 * Implementation of non-blocking `serv->accept()` for server socket `serv` in all cases except when
3249 * `serv->state() == Server_socket::State::S_CLOSED`.
3250 *
3251 * Pre-conditions:
3252 * - current thread is not W;
3253 * - `serv->m_mutex` is locked and just after entering `serv->accept()`;
3254 * - no changes to `*serv` have been made since `m_mutex` was locked;
3255 * - `serv->state() != Server_socket::State::S_CLOSED` (so `serv` is in `m_servs`).
3256 *
3257 * This method completes the functionality of `serv->accept()`.
3258 *
3259 * @param serv
3260 * Server socket, which must be in #m_servs, on which Server_socket::accept() was called.
3261 * @param err_code
3262 * See Server_socket::accept().
3263 * @return See Server_socket::accept().
3264 */
3266
3267 /**
3268 * Returns `true` if and only if calling `serv->accept()` with at least some arguments would return
3269 * either non-null (i.e., successfully dequeued a connected socket) or null and an error (but not
3270 * null and NO error). `serv` will be locked and unlocked; safe to call from any thread.
3271 *
3272 * @param serv_as_any
3273 * Socket to examine, as an `any` wrapping a Server_socket::Ptr.
3274 * @return See above.
3275 */
3276 bool serv_is_acceptable(const boost::any& serv_as_any) const;
3277
3278 /**
3279 * Thread W implementation of listen(). Performs all the needed work, gives the resulting
3280 * Server_socket to the user thread, and signals that user thread.
3281 *
3282 * Pre-condition: We're in thread W; thread U != W is waiting for us to return having set `*serv`. Post-condition:
3283 * `*serv` contains a `Server_socket::Ptr` in a Server_socket::State::S_LISTENING state if
3284 * `!(*serv)->m_disconnect_cause`; otherwise an error occurred, and that error is `(*serv)->m_disconnect_cause`.
3285 *
3286 * @param local_port
3287 * See listen().
3288 * @param child_sock_opts
3289 * See listen().
3290 * @param serv
3291 * `*serv` shall be set to the resulting Server_socket. Check `(*serv)->m_disconnect_cause`.
3292 */
3293 void listen_worker(flow_port_t local_port, const Peer_socket_options* child_sock_opts,
3294 Server_socket::Ptr* serv);
3295
3296 /**
3297 * Handles a just-deserialized, just-demultiplexed low-level SYN packet delivered to the given
3298 * server socket. So it will hopefully create a #m_socks entry, send back a SYN_ACK, etc.
3299 *
3300 * @param serv
3301 * Server socket in LISTENING state to which this SYN was demuxed.
3302 * @param syn
3303 * Deserialized immutable SYN.
3304 * @param low_lvl_remote_endpoint
3305 * The remote Node address.
3306 * @return New socket placed into Node socket table; or `Ptr()` on error, wherein no socket was saved.
3307 */
3309 boost::shared_ptr<const Syn_packet> syn,
3310 const util::Udp_endpoint& low_lvl_remote_endpoint);
3311
3312 /**
3313 * Handles a just-deserialized, just-demultiplexed low-level SYN_ACK_ACK packet delivered to the
3314 * given peer socket in Peer_socket::Int_state::S_SYN_RCVD state. So it will hopefully finish up establishing
3315 * connection on our side.
3316 *
3317 * @param socket_id
3318 * Connection ID (socket pair) identifying the socket in #m_socks.
3319 * @param sock
3320 * Peer socket in Peer_socket::Int_state::S_SYN_RCVD.
3321 * @param syn_ack_ack
3322 * Deserialized immutable SYN_ACK_ACK.
3323 */
3325 Peer_socket::Ptr sock,
3326 boost::shared_ptr<const Syn_ack_ack_packet> syn_ack_ack);
3327
3328 /**
3329 * Handles a just-deserialized, just-demultiplexed, low-level DATA packet delivered to the given
3330 * peer socket in SYN_RCVD state. This is legitimate under loss and re-ordering conditions.
3331 * This will hopefully save the packet for later handling once we have entered ESTABLISHED state.
3332 *
3333 * @param sock
3334 * Peer socket in Peer_socket::Int_state::S_SYN_RCVD.
3335 * @param packet
3336 * Deserialized packet of type DATA.
3337 * (For performance when moving data to Receive buffer, this is modifiable.)
3338 */
3340 boost::shared_ptr<Data_packet> packet);
3341
3342 /**
3343 * Handles the transition of the given server socket from `S_LISTENING`/`S_CLOSING` to `S_CLOSED`
3344 * (including eliminating the given Peer_socket from our data structures).
3345 *
3346 * Pre-condition: there is no socket `sock` such that `sock->m_originating_serv == serv`; i.e., there
3347 * are no sockets having to do with this server that have not yet been `accept()`ed.
3348 *
3349 * Pre-condition: `serv` is in `m_servs`; `serv->state() != S_OPEN`.
3350 *
3351 * Post-condition: `serv->state() == Server_socket::State::S_CLOSED` (and `serv` is no longer in `m_servs` or any
3352 * other Node structures, directly or indirectly) with `serv->m_disconnect_cause` set to `err_code` (or see
3353 * below).
3354 *
3355 * Any server socket that is in #m_servs MUST be eventually closed using this method. No
3356 * socket that is not in #m_servs must be passed to this method. In particular, do not call this
3357 * method during listen().
3358 *
3359 * @param local_port
3360 * Flow port of the server to delete.
3361 * @param serv
3362 * Socket to close.
3363 * @param err_code
3364 * Why is it being closed? Server_socket::m_disconnect_cause is set accordingly and logged.
3365 * @param defer_delta_check
3366 * Same meaning as in event_set_all_check_delta().
3367 */
3369 const Error_code& err_code, bool defer_delta_check);
3370
3371 /**
3372 * Sets Server_socket::m_state. If moving to `S_CLOSED`, resets the required data to their "undefined" values
3373 * (e.g., `Server_socket::m_local_port = #S_PORT_ANY`). Thread-safe.
3374 *
3375 * @param serv
3376 * Server socket under consideration.
3377 * @param state
3378 * New `m_state`.
3379 */
3381
3382 /**
3383 * Records that thread W shows this socket is not to listen to incoming connections and is to
3384 * abort any not-yet-established (i.e., not yet queued) and established-but-unclaimed (i.e.,
3385 * queued) connections; and sets Server_socket::m_disconnect_cause and Server_socket::m_state in `serv` accordingly.
3386 * Thread-safe.
3387 *
3388 * @param serv
3389 * Server socket under consideration.
3390 * @param disconnect_cause
3391 * The cause of the disconnect.
3392 * @param close
3393 * If `true`, the target public state should be the super-final `S_CLOSED`; if false, the target public state
3394 * should be the ominous `S_CLOSING`. The caller's responsibility is to decide which one it
3395 * is.
3396 */
3397 void serv_close_detected(Server_socket::Ptr serv, const Error_code& disconnect_cause, bool close);
3398
3399 /**
3400 * Records that an unestablished socket `sock` (Peer_socket::Int_state::S_SYN_RCVD) has just become established
3401 * and can be `accept()`ed (Peer_socket::Int_state::S_ESTABLISHED). Moves `sock` from
3402 * Server_socket::m_connecting_socks to Server_socket::m_unaccepted_socks (in `serv`).
3403 * To be called from thread W only. Thread-safe.
3404 *
3405 * @param serv
3406 * Server socket under consideration.
3407 * @param sock
3408 * Socket that was just moved to Peer_socket::Int_state::S_ESTABLISHED.
3409 */
3411
3412 /**
3413 * Records a new (just received SYN) peer socket from the given server socket. Adds `sock` to
3414 * Server_socket::m_connecting_socks (in `serv`) and maintains the Peer_socket::m_originating_serv (in `sock`)
3415 * invariant. To be called from thread W only. Thread-safe.
3416 *
3417 * @param serv
3418 * Server that originated `sock`.
3419 * @param sock
3420 * Socket that was just moved to Peer_socket::Int_state::S_SYN_RCVD.
3421 */
3423
3424 /**
3425 * Records that a `Server_socket`-contained (i.e., currently un-established, or established but not yet accepted
3426 * by user) Peer_socket is being closed and should be removed from the given Server_socket. To be
3427 * called from thread W only. Thread-safe.
3428 *
3429 * If `sock` is not contained in `*serv`, method does nothing.
3430 *
3431 * @param serv
3432 * Server socket under consideration.
3433 * @param sock
3434 * Socket to remove (moving from `S_SYN_RCVD` or `S_ESTABLISHED` to `S_CLOSED`).
3435 */
3437
3438 /**
3439 * Internal factory used for ALL Server_socket objects created by this Node (including subclasses).
3440 *
3441 * @param child_sock_opts
3442 * See Server_socket::Server_socket().
3443 * @return Pointer to newly constructed socket.
3444 */
3445 virtual Server_socket* serv_create(const Peer_socket_options* child_sock_opts);
3446
3447 // Methods dealing with individual Peer_sockets OR Server_sockets (determined via template at compile time).
3448
3449 /**
3450 * Implementation of core *blocking* transfer methods, namely Peer_socket::sync_send(), Peer_socket::sync_receive(),
3451 * and Server_socket::sync_accept() for all cases except when `sock->state() == Peer_socket::State::S_CLOSED`.
3452 * It is heavily templated and shared among those three implementations to avoid massive
3453 * copy/pasting, since the basic pattern of the blocking wrapper around Event_set::sync_wait() and
3454 * a non-blocking operation (Peer_socket::receive(), Peer_socket::send(), Server_socket::accept(), respectively)
3455 * is the same in all cases.
3456 *
3457 * Pre-conditions:
3458 * - current thread is not W;
3459 * - `sock->m_mutex` is locked;
3460 * - no changes to `*sock` have been made since `sock->m_mutex` was locked;
3461 * - `sock->state()` is OPEN (so `sock` is in #m_socks or #m_servs, depending on socket type at compile time);
3462 * - other arguments are as described below.
3463 *
3464 * This method completes the functionality of `sock->sync_send()`, `sock->sync_receive()`, and
3465 * `sock->sync_accept()`.
3466 *
3467 * @tparam Socket
3468 * Underlying object of the transfer operation (Peer_socket or Server_socket).
3469 * @tparam Non_blocking_func_ret_type
3470 * The return type of the calling transfer operation (`size_t` or Peer_socket::Ptr).
3471 * @param sock
3472 * Socket on which user called `sync_*()`.
3473 * @param non_blocking_func
3474 * When this method believes it should attempt a non-blocking transfer op, it will execute
3475 * `non_blocking_func()`.
3476 * If `non_blocking_func.empty()`, do not call `non_blocking_func()` --
3477 * return indicating no error so far, and let them do actual operation, if they want; we just tell them it
3478 * should be ready for them. This is known
3479 * as `null_buffers` mode or reactor pattern mode. Otherwise, do the successful operation and then
3480 * return. This is arguably more typical.
3481 * @param would_block_ret_val
3482 * The value that `non_blocking_func()` returns to indicate it was unable to perform the
3483 * non-blocking operation (i.e., no data/sockets available).
3484 * @param ev_type
3485 * Event type applicable to the type of operation this is. See Event_set::Event_type doc header.
3486 * @param wait_until
3487 * See `max_wait` argument on the originating `sync_*()` method. This is absolute timeout time point
3488 * derived from it; zero-valued if no timeout.
3489 * @param err_code
3490 * See this argument on the originating `sync_*()` method.
3491 * However, unlike that calling method's user-facing API, the present sync_op() method
3492 * does NOT allow null `err_code` (behavior undefined if `err_code` is null).
3493 * Corollary: we will NOT throw Runtime_error().
3494 * @return The value that the calling `sync_*()` method should return to its caller.
3495 * Corner/special case: If `non_blocking_func.empty()` (a/k/a "reactor pattern" mode), then
3496 * this will always return `would_block_ret_val`; the caller shall interpret
3497 * `bool(*err_code) == false` as meaning the socket has reached the desired state in time and without
3498 * error. In that special case, as of this writing, you can't just return this return value, since it's
3499 * always a zero/null/whatever.
3500 */
3501 template<typename Socket, typename Non_blocking_func_ret_type>
3502 Non_blocking_func_ret_type sync_op(typename Socket::Ptr sock,
3503 const Function<Non_blocking_func_ret_type ()>& non_blocking_func,
3504 Non_blocking_func_ret_type would_block_ret_val,
3505 Event_set::Event_type ev_type,
3506 const Fine_time_pt& wait_until,
3507 Error_code* err_code);
3508
3509 /**
3510 * Helper method that checks whether the given Peer_socket or Server_socket is CLOSED; if so, it
3511 * sets `*err_code` to the reason it was closed (which is in `sock->m_disconnect`) and returns `false`;
3512 * otherwise it returns `true` and leaves `*err_code` untouched. This exists to improve code reuse, as
3513 * this is a frequent operation for both socket types.
3514 *
3515 * Pre- and post-conditions: `sock->m_mutex` is locked.
3516 *
3517 * @tparam Socket_ptr
3518 * Peer_socket::Ptr or Server_socket::Ptr.
3519 * @param sock
3520 * The socket in question.
3521 * @param err_code
3522 * `*err_code` is set to `sock->m_disconnect_cause` if socket is closed.
3523 * @return `true` if state is not CLOSED; otherwise `false`.
3524 */
3525 template<typename Socket_ptr>
3526 static bool ensure_sock_open(Socket_ptr sock, Error_code* err_code);
3527
3528 // Methods dealing with individual Event_sets. Implementations are in event_set.cpp.
3529
3530 /**
3531 * Implementation of Event_set::async_wait() when `Event_set::state() == Event_set::State::S_INACTIVE`.
3532 *
3533 * Pre-conditions:
3534 * - current thread is not W;
3535 * - `event_set->m_mutex` is locked and just after entering async_wait();
3536 * - no changes to `*event_set` have been made since `m_mutex` was locked;
3537 * - `event_set->state() == Event_set::State::S_INACTIVE` (so `event_set` is in #m_event_sets);
3538 * - on_event is as originally passed into async_wait().
3539 *
3540 * This method completes the functionality of `event_set->async_wait()`.
3541 *
3542 * @param event_set
3543 * Event_set in question.
3544 * @param on_event
3545 * See Event_set::async_wait().
3546 * @param err_code
3547 * See Event_set::async_wait().
3548 * @return See Event_set::async_wait().
3549 */
3550 bool event_set_async_wait(Event_set::Ptr event_set, const Event_set::Event_handler& on_event,
3551 Error_code* err_code);
3552
3553 /**
3554 * Helper placed by event_set_async_wait() onto thread W to invoke event_set_check_baseline() but first ensure
3555 * that the `Event_set event_set` has not exited Event_set::State::S_WAITING (which would make any checking for
3556 * active events nonsense). If it has exited that state, does nothing. (That situation is possible due to
3557 * concurrently deleting the overarching Node (IIRC) and maybe other similar races.)
3558 *
3559 * @param event_set
3560 * Event_set in question.
3561 */
3563 /**
3564 * Checks each desired (Event_set::m_want) event in `event_set`; any that holds true is saved into `event_set`
3565 * (Event_set::m_can). This is the exhaustive, or "baseline," check. This should only be performed when
3566 * necessary, as it is typically slower than checking individual active sockets against the
3567 * Event_set ("delta" check).
3568 *
3569 * This check is skipped if `Event_set::m_baseline_check_pending == false` (for `event_set`).
3570 *
3571 * See Event_set::async_wait() giant internal comment for context on all of the above.
3572 *
3573 * Pre-conditions: `event_set` state is Event_set::State::S_WAITING; `event_set->m_mutex` is locked.
3574 *
3575 * This method, unlike most, is intended to be called from either W or U != W. All actions it
3576 * takes are on non-W-exclusive data (namely, actions on: `event_set`; and non-W-exclusive data in
3577 * Peer_socket and Server_socket, namely their state() and Receive/Send/Accept structures).
3578 *
3579 * @param event_set
3580 * Event_set in question.
3581 * @return `true` if and only if the check was performed; `false` returned if
3582 * `!event_set->m_baseline_check_pending`.
3583 */
3585
3586 /**
3587 * Check whether given Event_set contains any active sockets (Event_set::m_can); if so, signals the user (who
3588 * previously called `async_wait()` to set all this in motion): set state back to Event_set::State::S_INACTIVE from
3589 * Event_set::State::S_WAITING; calls the handler passed to `async_wait()`; forgets handler. If no active sockets,
3590 * does nothing.
3591 *
3592 * Pre-conditions: same as event_set_check_baseline().
3593 *
3594 * @param event_set
3595 * Event_set in question.
3596 */
3598
3599 /**
3600 * For each WAITING Event_set within the Node: checks for any events that hold, and if any do
3601 * hold, signals the user (calls handler, goes to INACTIVE, etc.). The logic for how it does so
3602 * is complex. For background, please see Event_set::async_wait() giant internal comment first.
3603 * Then read on here.
3604 *
3605 * For each WAITING Event_set: If baseline check (event_set_check_baseline()) is still required
3606 * and hasn't been performed, perform it. Otherwise, for efficiency perform a "delta" check,
3607 * wherein EVERY active (for all definitions of active: Readable, Writable, Acceptable) socket
3608 * detected since the last baseline check is checked against the desired event/socket pairs in the
3609 * Event_set. Any socket in both sets (active + desired) is saved in `event_set->m_can`. If
3610 * either the baseline or delta check yields at least one active event, signal user (call handler,
3611 * go INACTIVE, etc.).
3612 *
3613 * For the delta check just described, how does it know which sockets have been active since the
3614 * last check? Answer: `Node::m_sock_events` members (NOTE: not the same as `Event_set::m_can`, though
3615 * they are related). See #m_sock_events doc header for details.
3616 *
3617 * @param defer_delta_check
3618 * Set to `true` if and only if you know, for a FACT, that within a non-blocking amount of
3619 * time `event_set_all_check_delta(false)` will be called. For example, you may know
3620 * `event_set_all_check_delta(false)` will be called within the present boost.asio handler.
3621 * Then this method will only log and not perform the actual check, deferring to the
3622 * promised `event_set_all_check_delta(false)` call, by which point more events may have been
3623 * detected in #m_sock_events.
3624 */
3625 void event_set_all_check_delta(bool defer_delta_check);
3626
3627 /**
3628 * Implementation of Event_set::close() when `Event_set::state() != Event_set::State::S_CLOSED` for `event_set`.
3629 *
3630 * Pre-conditions:
3631 * - current thread is not W;
3632 * - `event_set->m_mutex` is locked and just after entering async_wait();
3633 * - no changes to `*event_set` have been made since `m_mutex` was locked;
3634 * - `event_set->state() != Event_set::State::S_CLOSED` (so `event_set` is in #m_event_sets).
3635 *
3636 * This method completes the functionality of `event_set->close()`.
3637 *
3638 * @param event_set
3639 * Event_set in question.
3640 * @param err_code
3641 * See Event_set::close().
3642 */
3643 void event_set_close(Event_set::Ptr event_set, Error_code* err_code);
3644
3645 /**
3646 * The guts of event_set_close_worker_check_state(): same thing, but assumes
3647 * `Event_set::state() == Event_set::State::S_CLOSED`, and Event_set::m_mutex is locked (for `event_set`).
3648 * May be called directly from thread W assuming those pre-conditions holds.
3649 *
3650 * @param event_set
3651 * Event_set in question.
3652 */
3653 void event_set_close_worker(Event_set::Ptr event_set);
3654
3655 /**
3656 * Thread W implementation of interrupt_all_waits(). Performs all the needed work, which is to
3657 * trigger any WAITING Event_set objects to fire their on-event callbacks, with the Boolean argument set
3658 * to `true`, indicating interrupted wait.
3659 *
3660 * Pre-condition: We're in thread W.
3661 */
3663
3664 /**
3665 * `signal_set` handler, executed on SIGINT and SIGTERM, if user has enabled this feature:
3666 * causes interrupt_all_waits_worker() to occur on thread W.
3667 *
3668 * Pre-condition: We're in thread W [sic].
3669 *
3670 * @param sys_err_code
3671 * boost.asio error code indicating the circumstances of the callback executing.
3672 * It is unusual for this to be truthy.
3673 * @param sig_number
3674 * Signal number of the signal that was detected.
3675 */
3676 void interrupt_all_waits_internal_sig_handler(const Error_code& sys_err_code, int sig_number);
3677
3678 // Constants.
3679
3680 /**
3681 * For a given unacknowledged sent packet P, the maximum number of times any individual packet
3682 * with higher sequence numbers than P may be acknowledged before P is considered Dropped (i.e.,
3683 * we give up on it). If we enable retransmission, that would trigger Fast Retransmit, using TCP's
3684 * terminology.
3685 */
3687
3688 /**
3689 * Time interval between performing "infrequent periodic tasks," such as stat logging. This
3690 * should be large enough to ensure that the tasks being performed incur no significant processor
3691 * use.
3692 */
3694
3695 // Data.
3696
3697 /**
3698 * This Node's global set of options. Initialized at construction; can be subsequently
3699 * modified by set_options(), although only the dynamic members of this may be modified.
3700 *
3701 * Accessed from thread W and user thread U != W. Protected by #m_opts_mutex. When reading, do
3702 * NOT access without locking (which is encapsulated in opt()).
3703 */
3705
3706 /// The mutex protecting #m_opts.
3708
3709 /**
3710 * The object used to simulate stuff like packet loss and latency via local means directly in the
3711 * code. If 0, no such simulation is performed. `shared_ptr<>` used for basic auto-`delete` convenience.
3712 */
3713 boost::shared_ptr<Net_env_simulator> m_net_env_sim;
3714
3715 /**
3716 * The main loop engine, functioning in the single-threaded-but-asynchronous callback-based
3717 * "reactor" style (or is it "proactor"?). The Node constructor creates a single new thread W, which then places
3718 * some callbacks onto this guy and invoke `m_task_engine.run()`, at which point the main loop
3719 * begins in thread W.
3720 *
3721 * Thus, per boost.asio's model, any work items (functions) placed
3722 * onto #m_task_engine (e.g.: `post(m_task_engine, do_something_fn);`) will execute in thread W,
3723 * as it's the one invoking `run()` at the time -- even if the placing itself is done on some
3724 * other thread, such as a user thread U. An example of the latter is a Peer_socket::send() implementation
3725 * might write to the socket's internal Send buffer in thread U, check whether it's currently possible
3726 * to send over the wire, and if and only if the answer is yes, `post(m_task_engine, S)`, where S
3727 * is a function/functor (created via lambdas usually) that will perform the hairy needed Node/socket
3728 * work on thread W.
3729 *
3730 * All threads may access this (no mutex required, as explicitly announced in boost.asio docs).
3731 *
3732 * Adding more threads that would call `m_task_engine.run()` would create a thread pool. With "strands" one
3733 * can avoid concurrency in this situation. An intelligent combination of those two concepts can lead to efficient
3734 * multi-core use without complex and/or inefficient locking. This is non-trivial.
3735 *
3736 * @see Class Node doc header for to-do items regarding efficient multi-core use and how that relates to
3737 * using an #m_task_engine thread pool and/or strands.
3738 */
3740
3741 /**
3742 * The UDP socket used to receive low-level packets (to assemble into application layer data) and send them
3743 * (vice versa).
3744 *
3745 * Only thread W can access this.
3746 *
3747 * Access to this may be highly contentious in high-traffic situations. Since only thread W accesses this, and that
3748 * thread does the vast bulk of the work of the entire Node, at least one known problem is that the internal OS
3749 * UDP receive buffer may be exceeded, as we may not read datagrams off this socket quickly enough.
3750 *
3751 * @see Class Node doc header for to-do items regarding the aforementioned UDP receive buffer overflow problem.
3752 */
3754
3755 /**
3756 * After we bind #m_low_lvl_sock to a UDP endpoint, this is a copy of that endpoint. Thus it
3757 * should contain the actual local address and port (even if user specified 0 for the latter,
3758 * say).
3759 *
3760 * This is equal to `Udp_endpoint()` until the constructor exits. After the constructor exits, its
3761 * value never changes, therefore all threads can access it without mutex. If the constructor
3762 * fails to bind, this remains equal to `Udp_endpoint()` forever.
3763 */
3765
3766 /**
3767 * OS-reported #m_low_lvl_sock UDP receive buffer maximum size, obtained right after we
3768 * OS-set that setting and never changed subsequently. Note the OS may not respect whatever value we
3769 * passed into the OS socket option setting call, or it may respect it but only approximately.
3770 */
3772
3773 /// Stores incoming raw packet data; re-used repeatedly for possible performance gains.
3775
3776 /// Flow port space for both client and server sockets. All threads may access this.
3778
3779 /// Sequence number generator (at least to generate ISNs). Only thread W can access this.
3781
3782 /**
3783 * Random number generator for picking security tokens; seeded on time at Node construction and generates
3784 * integers from the entire range. (Not thread-safe. Use only in thread W.)
3785 */
3787
3788 /**
3789 * The peer-to-peer connections this Node is currently tracking. Their states are not Peer_socket::State::S_CLOSED.
3790 * Only thread W can access this.
3791 */
3793
3794 /**
3795 * The server sockets this Node is currently tracking. Their states are not Server_socket::State::S_CLOSED.
3796 * Only thread W can access this.
3797 */
3799
3800 /**
3801 * Every Event_set to have been returned by event_set_create() and not subsequently reached
3802 * Event_set::State::S_CLOSED. Only thread W can access this.
3803 */
3805
3806 /**
3807 * All sockets that have been detected to be "ready" (by the Event_set doc header definition) at
3808 * any point since the last time #m_sock_events's contained sets were cleared (which happens initially and after each
3809 * event_set_all_check_delta() call). EVERY piece of code in thread W to potentially set a
3810 * socket's status to "ready" (e.g.: DATA received, error detected) MUST add that socket's handle
3811 * to this data structure. This enables the Event_set machinery to efficiently but thoroughly
3812 * detect every event in which the Event_set user is interested. The theory behind this is
3813 * described in the giant comment inside Event_set::async_wait().
3814 *
3815 * This maps Event_set::Event_type `enum` members to Event_set::Sockets socket sets, exactly the same way
3816 * Event_set::m_can and Event_set::m_want are set up.
3817 *
3818 * A question arises: why use this set to store such active sockets? Why not just call
3819 * event_set_all_check_delta() EVERY time we see a socket is now Readable, etc., thus handling it right
3820 * away and not needing to store it? Answer: we could. However, we want to collect as many
3821 * possibly active events as possible, without blocking, before performing the check. That way
3822 * the user is informed of as many events as possible, instead of the very first one (when there
3823 * could be hundreds more; for example if hundreds of DATA packets have arrived simultaneously).
3824 * The theory behind this is also discussed in Event_set::async_wait() giant comment. So we
3825 * insert into #m_sock_events and defer `event_set_all_check_delta(false)` to the end of the current
3826 * boost.asio handler, since we know we won't block (sleep) until the handler exits.
3827 *
3828 * Only thread W can access this.
3829 */
3831
3832 /**
3833 * Within a given low_lvl_recv_and_handle() or async part of async_wait_latency_then_handle_incoming() (async part)
3834 * call, by the time perform_accumulated_on_recv_tasks() is called, this stores exactly those sockets for which
3835 * possible ACK sending tasks have been accumulated during the low_lvl_recv_and_handle()/etc.
3836 * call. The idea is that, for efficiency and reduced overhead,
3837 * all simultaneously available incoming data are examined first, and some tasks are accumulated
3838 * to perform at the end. For example, all DATA packets to be acknowledged at the same time are
3839 * collected and then sent in as few ACKs as possible.
3840 *
3841 * Details on the acks to potentially send are stored within that Peer_socket itself (e.g.,
3842 * Peer_socket::m_rcv_pending_acks).
3843 *
3844 * This should be added to throughout the method, used in perform_accumulated_on_recv_tasks(), and
3845 * then cleared for the next run.
3846 *
3847 * Only thread W can access this.
3848 */
3849 boost::unordered_set<Peer_socket::Ptr> m_socks_with_accumulated_pending_acks;
3850
3851 /**
3852 * Within a given low_lvl_recv_and_handle() or async part of async_wait_latency_then_handle_incoming() call,
3853 * by the time perform_accumulated_on_recv_tasks() is called, this stores exactly those sockets for which
3854 * possible incoming-ACK handling tasks have been accumulated during the low_lvl_recv_and_handle()/etc.
3855 * call. The idea is that, for congestion control robustness,
3856 * all simultaneously available acknowledgments and rcv_wnd updates are collected first, and then
3857 * they're all handled together at the end.
3858 *
3859 * Details on the acks to potentially send are stored within that Peer_socket
3860 * itself (Peer_socket::m_rcv_acked_packets scan all).
3861 *
3862 * This should be added to throughout the method, used in perform_accumulated_on_recv_tasks(), and
3863 * then cleared for the next run.
3864 *
3865 * Only thread W can access this.
3866 */
3867 boost::unordered_set<Peer_socket::Ptr> m_socks_with_accumulated_acks;
3868
3869 /**
3870 * For debugging, when we detect loss of data we'd sent, we log the corresponding socket's state;
3871 * this is the last time this was done for any socket (or epoch if never). It's used to
3872 * throttle such messages, since they are CPU-intensive and disk-intensive (when logging to disk).
3873 */
3875
3876 /**
3877 * Promise that thread W sets to truthy `Error_code` if it fails to initialize or falsy once event loop is running.
3878 * The truthy payload can be returned or thrown inside an error::Runtime_exception if desired.
3879 */
3880 boost::promise<Error_code> m_event_loop_ready;
3881
3882 /// The future object through which the non-W thread waits for #m_event_loop_ready to be set to success/failure.
3883 boost::unique_future<Error_code> m_event_loop_ready_result;
3884
3885 /**
3886 * Signal set which we may or may not be using to trap SIGINT and SIGTERM in order to auto-fire
3887 * interrupt_all_waits(). `add()` is called on it at initialization if and only if that feature is enabled
3888 * by the user via `Node_options`. Otherwise this object just does nothing for the Node's lifetime.
3889 */
3891
3892 /// Worker thread (= thread W). Other members should be initialized before this to avoid race condition.
3894}; // class Node
3895
3896/**
3897 * @private
3898 *
3899 * The data nugget uniquely identifying a peer-to-peer connection from a remote endpoint to
3900 * a port in this Node. Its (unmodifiable after initialization) fields are to be constructed via direct
3901 * initialization (assuming the defaults are unacceptable).
3902 */
3904{
3905 // Data.
3906
3907 /// The other side of the connection.
3909 /// This side of the connection (within this Node).
3911
3912 // Methods.
3913
3914 /**
3915 * Hash value of this Socket_id for `unordered<>`.
3916 * @return Ditto.
3917 */
3918 size_t hash() const;
3919};
3920
3921// Free functions: in *_fwd.hpp.
3922
3923// However the following refer to inner type(s) and hence must be declared here and not _fwd.hpp.
3924
3925/**
3926 * @internal
3927 *
3928 * Free function that returns socket_id.hash(); has to be a free function named `hash_value()` for
3929 * boost.hash to pick it up.
3930 *
3931 * @relatesalso Node::Socket_id
3932 *
3933 * @param socket_id
3934 * Socket ID to hash.
3935 * @return socket_id.hash().
3936 */
3937size_t hash_value(const Node::Socket_id& socket_id);
3938
3939/**
3940 * @internal
3941 *
3942 * Whether `lhs` is equal to `rhs`.
3943 *
3944 * @relatesalso Node::Socket_id
3945 * @param lhs
3946 * Object to compare.
3947 * @param rhs
3948 * Object to compare.
3949 * @return See above.
3950 */
3951bool operator==(const Node::Socket_id& lhs, const Node::Socket_id& rhs);
3952
3953// Template implementations.
3954
3955template<typename Rep, typename Period>
3957 const boost::chrono::duration<Rep, Period>& max_wait,
3958 const boost::asio::const_buffer& serialized_metadata,
3959 Error_code* err_code,
3960 const Peer_socket_options* opts)
3961{
3962 assert(max_wait.count() > 0);
3963 return sync_connect_impl(to, util::chrono_duration_to_fine_duration(max_wait), serialized_metadata, err_code, opts);
3964}
3965
3966template<typename Rep, typename Period>
3968 const boost::chrono::duration<Rep, Period>& max_wait,
3969 Error_code* err_code, const Peer_socket_options* opts)
3970{
3971 return sync_connect_with_metadata(to, max_wait,
3972 boost::asio::buffer(&S_DEFAULT_CONN_METADATA, sizeof(S_DEFAULT_CONN_METADATA)),
3973 err_code, opts);
3974}
3975
3976template<typename Socket, typename Non_blocking_func_ret_type>
3977Non_blocking_func_ret_type Node::sync_op(typename Socket::Ptr sock,
3978 const Function<Non_blocking_func_ret_type ()>& non_blocking_func,
3979 Non_blocking_func_ret_type would_block_ret_val,
3980 Event_set::Event_type ev_type,
3981 const Fine_time_pt& wait_until,
3982 Error_code* err_code)
3983{
3984 using boost::adopt_lock;
3985 using boost::chrono::milliseconds;
3986 using boost::chrono::round;
3987
3988 // We are in user thread U != W.
3989
3990 {
3991 /* WARNING!!! sock->m_mutex is locked, but WE must unlock it before returning! Can't leave that
3992 * to the caller, because we must unlock at a specific point below, right before sync_wait()ing.
3993 * Use a Lock_guard that adopts an already-locked mutex. */
3994 typename Socket::Lock_guard lock(sock->m_mutex, adopt_lock);
3995
3996 if (!running())
3997 {
3999 return would_block_ret_val;
4000 }
4001 // else
4002
4003 /* Unlock. Why? Simply because we can't forbid other threads from accessing sock while we
4004 * shamelessly block (in sync_wait()). */
4005 } // Lock.
4006
4007 /* This is actually pretty simple. We create an Event_set with just the one event we care
4008 * about (e.g., sock is Writable) and sync_wait() for it. Then we invoke non_blocking_func()
4009 * (e.g., Node::send()) once that event holds. So, create Event_set. */
4010
4011 /* Note that we assume "this" remains valid throughout this method until we start sleeping
4012 * (sync_wait()). This is explicitly guaranteed by the "Thread safety" note in class Node doc
4013 * header (which says that Node deletion is allowed only once a blocking operation's sleep has
4014 * been entered). */
4015
4016 const Event_set::Ptr event_set = event_set_create(err_code);
4017 if (!event_set)
4018 {
4019 return would_block_ret_val; // *err_code is set. This is pretty weird but nothing we can do.
4020 }
4021 // else event_set ready.
4022
4023 // We must clean up event_set at any return point below.
4025 {
4026 // Eat any error when closing Event_set, as it's unlikely and not interesting to user.
4027 Error_code dummy_prevents_throw;
4028 event_set->close(&dummy_prevents_throw);
4029 });
4030
4031 // We care about just this event, ev_type.
4032 if (!(event_set->add_wanted_socket<Socket>(sock, ev_type, err_code)))
4033 {
4034 return would_block_ret_val; // *err_code is set. Node must have shut down or something.
4035 }
4036 // else go ahead and wait.
4037
4038 Non_blocking_func_ret_type op_result;
4039 const bool timeout_given = wait_until != Fine_time_pt();
4040 do
4041 {
4042 // We may have to call sync_wait() repeatedly; if timeout is given we must give less and less time each time.
4043 bool wait_result;
4044 if (timeout_given)
4045 {
4046 // Negative is OK but cleaner to clamp it to 0.
4047 const Fine_duration time_remaining = std::max(Fine_duration::zero(), wait_until - Fine_clock::now());
4048
4049 /* Do NOT log. We have waited already, so `this` Node may have been deleted, so get_logger() may be undefined!
4050 * @todo I don't like this. Want to log it. Maybe get rid of the allowing for `this` deletion during wait.
4051 * We don't allow it in async_*() case for instance. */
4052 /* FLOW_LOG_TRACE("Waiting again; timeout reduced "
4053 * "to [" << round<milliseconds>(time_remaining) << "] = [" << time_remaining << "]."); */
4054
4055 // Perform the wait until event detected, time_remaining expires, or wait is interrupted (a-la EINTR).
4056 wait_result = event_set->sync_wait(time_remaining, err_code);
4057 }
4058 else
4059 {
4060 // No timeout given. Perform the wait until event detected, or wait is interrupted (a-la EINTR).
4061 wait_result = event_set->sync_wait(err_code);
4062 }
4063
4064 if (!wait_result)
4065 {
4066 /* *err_code is set. Node must have shut down or something; or, maybe more likely, S_WAIT_INTERRUPTED
4067 * or S_WAIT_USER_TIMEOUT occurred. In all cases, it's correct to pass it on to our caller. */
4068 return would_block_ret_val;
4069 }
4070 // else sync_wait() has returned success.
4071
4072 // Warning: "this" may be have been deleted by any point below this line, unless specifically guaranteed.
4073
4074#ifndef NDEBUG
4075 const bool active = event_set->events_detected(err_code);
4076#endif
4077 assert(active); // Inactive but no error, so it must have been a timeout -- but that should've been error.
4078
4079 /* OK. sync_wait() reports event is ready (sock is active, e.g., Writable). Try to perform
4080 * non-blocking operation (e.g., Node::send()). We must lock again (to access m_node again,
4081 * plus it's a pre-condition of the non-blocking operation (e.g., Node::send()). In the
4082 * meantime sock may have gotten closed. Ensure that's not so (another pre-condition).
4083 *
4084 * Alternatively, in null_buffers mode, they want us to basically do a glorified sync_wait() for
4085 * one of the 3 events, depending on ev_type, and just return without performing any non_blocking_func();
4086 * in fact this mode is indicated by non_blocking_func.empty(). */
4087
4088 {
4089 typename Socket::Lock_guard lock(sock->m_mutex);
4090 if (sock->m_state == Socket::State::S_CLOSED) // As in the invoker of this method....
4091 {
4092 assert(sock->m_disconnect_cause);
4093 *err_code = sock->m_disconnect_cause;
4094 // Do NOT log. "this" Node may have been deleted, so get_logger() may be undefined!
4095
4096 return would_block_ret_val;
4097 }
4098 // else do it. Note that `this` is guaranteed to still exist if sock->m_state is not CLOSED.
4099
4100 if (non_blocking_func.empty())
4101 {
4102 FLOW_LOG_TRACE("Sync op of type [" << ev_type << "] with Event_set [" << event_set << "] in reactor pattern "
4103 "mode on object [" << sock << "] successful; returning without non-blocking op.");
4104 assert(!*err_code); // In reactor pattern mode: No error <=> yes, socket is in desired state now.
4105 return would_block_ret_val;
4106 }
4107
4108 op_result = non_blocking_func(); // send(), receive(), accept(), etc.
4109 } // Lock.
4110 // Cannot log below this line for aforementioned reasons. Also cannot log in subsequent iterations!
4111
4112 if (*err_code)
4113 {
4114 // *err_code is set. Any number of errors possible here; error on socket => socket is active.
4115 return would_block_ret_val;
4116 }
4117 // else no error.
4118
4119 /* If op_result > 0, then data was transferred (enqueued to Send buffer, dequeued from Receive
4120 * buffer or Accept queue, etc.); cool. If op_result == 0, sock is still not active. How
4121 * is that possible if sync_wait() returned non-zero events? Because some jerk in another
4122 * thread may also be non_blocking_func()ing at the same time. In that case we must try again
4123 * (must not return would_block_ret_val and no error). (And give less time, if timeout was
4124 * provided.) */
4125
4126 // Do NOT log as explained. @todo I don't like this. See similar @todo above.
4127 /* if (op_result == would_block_ret_val)
4128 * {
4129 * // Rare/interesting enough for INFO.
4130 * FLOW_LOG_INFO('[' << sock << "] got Active status "
4131 * "but the non-blocking operation still returned would-block. Another thread is interfering?");
4132 * } */
4133 }
4134 while (op_result == would_block_ret_val);
4135
4136 // Excellent. At least some data was transferred (e.g., enqueued on Send buffer). *err_code is success.
4137 return op_result;
4138} // Node::sync_op()
4139
4140template<typename Socket_ptr>
4141bool Node::ensure_sock_open(Socket_ptr sock, Error_code* err_code) // Static.
4142{
4143 // Pre-condition: sock is suitably locked. We are in thread U != W or W.
4144
4145 if (sock->m_state == Socket_ptr::element_type::State::S_CLOSED)
4146 {
4147 // CLOSED socket -- Node has disowned us.
4148 assert(!sock->m_node);
4149 assert(sock->m_disconnect_cause);
4150
4151 // Mark (already-determined) error in *err_code and log.
4152 FLOW_LOG_SET_CONTEXT(sock->get_logger(), sock->get_log_component()); // Static, so must do <--that to log this--v.
4153 FLOW_ERROR_EMIT_ERROR_LOG_INFO(sock->m_disconnect_cause);
4154 return false;
4155 }
4156 // else
4157 return true;
4158} // Node::ensure_sock_open()
4159
4160template<typename Opt_type>
4161bool Node::validate_static_option(const Opt_type& new_val, const Opt_type& old_val, const std::string& opt_id,
4162 Error_code* err_code) const
4163{
4164 using std::string;
4165
4166 if (new_val != old_val)
4167 {
4168 const string& opt_id_nice = Node_options::opt_id_to_str(opt_id);
4169 FLOW_LOG_WARNING("Option [" << opt_id_nice << "] is static, but attempted to change "
4170 "from [" << old_val << "] to [" << new_val << "]. Ignoring entire option set.");
4172 return false;
4173 }
4174 // else
4175
4176 return true;
4177}
4178
4179template<typename Opt_type>
4180Opt_type Node::opt(const Opt_type& opt_val_ref) const
4181{
4182 /* They've given the REFERENCE to the value they want to read. Another thread may write to that
4183 * value concurrently. Therefore, acquire ownership of the enclosing m_opts. Copy the value. Unlock.
4184 * Then return the copy. Most options are small (typically primitive types, typically integers;
4185 * or boost.chrono time values which are internally also usually just integers), so the copy
4186 * should not be a big deal. */
4187
4189 return opt_val_ref;
4190}
4191
4192template<typename Peer_socket_impl_type>
4194{
4195 return new Peer_socket_impl_type(get_logger(), &m_task_engine, opts);
4196}
4197
4198template<typename Server_socket_impl_type>
4200{
4201 return new Server_socket_impl_type(get_logger(), child_sock_opts);
4202}
4203
4204} // namespace flow::net_flow
Convenience class that simply stores a Logger and/or Component passed into a constructor; and returns...
Definition: log.hpp:1619
Logger * get_logger() const
Returns the stored Logger pointer, particularly as many FLOW_LOG_*() macros expect.
Definition: log.cpp:224
Interface that the user should implement, passing the implementing Logger into logging classes (Flow'...
Definition: log.hpp:1291
A user-set collection of sockets and desired conditions on those sockets (such as: "socket has data t...
Definition: event_set.hpp:254
Event_type
Type of event or condition of interest supported by class Event_set.
Definition: event_set.hpp:307
Objects of this class can be fed to Node to make it internally simulate network conditions like loss,...
An object of this class is a single Flow-protocol networking node, in the sense that: (1) it has a di...
Definition: node.hpp:937
void snd_flying_pkts_updated(Peer_socket::Ptr sock, Peer_socket::Sent_pkt_ordered_by_when_const_iter pkt_begin, const Peer_socket::Sent_pkt_ordered_by_when_const_iter &pkt_end, bool added)
Updates Peer_socket::m_snd_flying_bytes according to an operation (add packets, remove packets) calle...
void low_lvl_packet_sent(Peer_socket::Ptr sock, Low_lvl_packet::Const_ptr packet, size_t bytes_expected_transferred, const Error_code &sys_err_code, size_t bytes_transferred)
Completion handler for async_low_lvl_packet_send_impl(); called when the packet is either successfull...
Definition: low_lvl_io.cpp:497
void serv_peer_socket_init(Server_socket::Ptr serv, Peer_socket::Ptr sock)
Records a new (just received SYN) peer socket from the given server socket.
bool categorize_individual_ack(const Socket_id &socket_id, Peer_socket::Ptr sock, Ack_packet::Individual_ack::Const_ptr ack, bool *dupe_or_late, Peer_socket::Sent_pkt_ordered_by_when_iter *acked_pkt_it)
Helper of perform_accumulated_on_recv_tasks() that categorizes the given accumulated individual ackno...
void handle_data_to_established(const Socket_id &socket_id, Peer_socket::Ptr sock, boost::shared_ptr< Data_packet > packet, bool syn_rcvd_qd_packet)
Handles a just-deserialized, just-demultiplexed, low-level DATA packet delivered to the given peer so...
bool sock_is_writable(const boost::any &sock_as_any) const
Returns true if and only if calling sock->send() with at least some arguments would return either non...
Peer_socket_info sock_info(Peer_socket::Const_ptr sock)
Implementation of sock->info() for socket sock in all cases except when sock->state() == Peer_socket:...
void async_no_sock_low_lvl_packet_send(const util::Udp_endpoint &low_lvl_remote_endpoint, Low_lvl_packet::Const_ptr packet)
async_low_lvl_packet_send_impl() wrapper to call when packet is to be sent to the remote side of the ...
Definition: low_lvl_io.cpp:313
void receive_wnd_updated(Peer_socket::Ptr sock)
Placed by receive() onto W if it has dequeued data from Receive buffer and given it to the user,...
virtual Server_socket * serv_create(const Peer_socket_options *child_sock_opts)
Internal factory used for ALL Server_socket objects created by this Node (including subclasses).
void serv_set_state(Server_socket::Ptr serv, Server_socket::State state)
Sets Server_socket::m_state.
void async_sock_low_lvl_packet_send(Peer_socket::Ptr sock, Low_lvl_packet::Const_ptr &&packet, bool delayed_by_pacing)
async_low_lvl_packet_send_impl() wrapper to call when packet is to be sent to the remote side of the ...
Definition: low_lvl_io.cpp:307
void sock_track_new_data_after_gap_rexmit_off(Peer_socket::Ptr sock, boost::shared_ptr< const Data_packet > packet, size_t data_size, bool *slide, size_t *slide_size)
Helper for handle_data_to_established() that aims to register the given DATA packet as an out-of-orde...
bool sock_data_to_reassembly_q_unless_overflow(Peer_socket::Ptr sock, boost::shared_ptr< Data_packet > packet)
Helper for handle_data_to_established() that aims to register the given DATA packet as an out-of-orde...
void perform_accumulated_on_recv_tasks()
Performs all tasks to be performed at the end of low_lvl_recv_and_handle() or async part of async_wai...
Definition: node.cpp:375
static bool ensure_sock_open(Socket_ptr sock, Error_code *err_code)
Helper method that checks whether the given Peer_socket or Server_socket is CLOSED; if so,...
Definition: node.hpp:4141
void send_worker(Peer_socket::Ptr sock, bool defer_delta_check)
Thread W implemention of send(): synchronously or asynchronously send the contents of sock->m_snd_buf...
void interrupt_all_waits(Error_code *err_code=0)
Interrupts any blocking operation, a/k/a wait, and informs the invoker of that operation that the blo...
Definition: event_set.cpp:1392
void handle_accumulated_acks(const Socket_id &socket_id, Peer_socket::Ptr sock)
Helper of perform_accumulated_on_recv_tasks() that handles any incoming acknowledgments and rcv_wnd u...
Node_options options() const
Copies this Node's option set and returns that copy.
Definition: node.cpp:1107
void handle_incoming(util::Blob *packet_data, const util::Udp_endpoint &low_lvl_remote_endpoint)
Handles a just-received, not-yet-deserialized low-level packet.
Definition: node.cpp:426
~Node() override
Destroys Node.
Definition: node.cpp:139
void async_rcv_wnd_recovery(Peer_socket::Ptr sock, size_t rcv_wnd)
receive_wnd_updated() helper that continues rcv_wnd recovery: that is, sends unsolicited ACK with a r...
void log_accumulated_acks(Peer_socket::Const_ptr sock) const
Helper of handle_accumulated_acks() that logs the about-to-be-handled accumulated individual acknowle...
boost::asio::signal_set Signal_set
Short-hand for a signal set.
Definition: node.hpp:1433
void sock_free_memory(Peer_socket::Ptr sock)
Helper that clears all non-O(1)-space data structures stored inside sock.
static const size_t & S_NUM_PORTS
Total number of Flow ports in the port space, including S_PORT_ANY.
Definition: node.hpp:942
bool event_set_check_baseline(Event_set::Ptr event_set)
Checks each desired (Event_set::m_want) event in event_set; any that holds true is saved into event_s...
Definition: event_set.cpp:1017
void sock_load_info_struct(Peer_socket::Const_ptr sock, Peer_socket_info *stats) const
Given a Peer_socket, copies all stats info (as available via Peer_socket::info()) from various struct...
void log_snd_window(Peer_socket::Const_ptr sock, bool force_verbose_info_logging=false) const
Logs TRACE or DATA messages thats show the detailed state of the sending sequence number space.
void send_worker_check_state(Peer_socket::Ptr sock)
Helper placed by send() onto W to invoke send_worker() but ensures that the socket has not entered so...
size_t m_low_lvl_max_buf_size
OS-reported m_low_lvl_sock UDP receive buffer maximum size, obtained right after we OS-set that setti...
Definition: node.hpp:3771
bool set_options(const Node_options &opts, Error_code *err_code=0)
Dynamically replaces the current options set (options()) with the given options set.
Definition: node.cpp:1054
void sync_sock_low_lvl_rst_send(Peer_socket::Ptr sock)
Sends an RST to the other side of the given socket, synchronously.
void mark_data_packet_sent(Peer_socket::Ptr sock, const Sequence_number &seq_num)
Performs important book-keeping based on the event "DATA packet was sent to destination....
Definition: low_lvl_io.cpp:422
void handle_syn_ack_ack_to_syn_rcvd(const Socket_id &socket_id, Peer_socket::Ptr sock, boost::shared_ptr< const Syn_ack_ack_packet > syn_ack_ack)
Handles a just-deserialized, just-demultiplexed low-level SYN_ACK_ACK packet delivered to the given p...
Non_blocking_func_ret_type sync_op(typename Socket::Ptr sock, const Function< Non_blocking_func_ret_type()> &non_blocking_func, Non_blocking_func_ret_type would_block_ret_val, Event_set::Event_type ev_type, const Fine_time_pt &wait_until, Error_code *err_code)
Implementation of core blocking transfer methods, namely Peer_socket::sync_send(),...
Definition: node.hpp:3977
void serv_close_detected(Server_socket::Ptr serv, const Error_code &disconnect_cause, bool close)
Records that thread W shows this socket is not to listen to incoming connections and is to abort any ...
static const Fine_duration S_REGULAR_INFREQUENT_TASKS_PERIOD
Time interval between performing "infrequent periodic tasks," such as stat logging.
Definition: node.hpp:3693
size_t sock_max_packets_after_unrecvd_packet(Peer_socket::Const_ptr sock) const
Computes and returns the max size for Peer_socket::m_rcv_packets_with_gaps for sock.
bool async_sock_low_lvl_packet_send_paced(const Peer_socket::Ptr &sock, Low_lvl_packet::Ptr &&packet, Error_code *err_code)
Begins the process of asynchronously sending the given low-level packet to the remote Node specified ...
Definition: low_lvl_io.cpp:605
Peer_socket::Sent_pkt_ordered_by_when_iter categorize_pkts_as_dropped_on_acks(Peer_socket::Ptr sock, const boost::unordered_set< Peer_socket::order_num_t > &flying_now_acked_pkts)
Helper of perform_accumulated_on_recv_tasks() that determines the range of In-flight packets that sho...
void rcv_get_first_gap_info(Peer_socket::Const_ptr sock, bool *first_gap_exists, Sequence_number *seq_num_after_first_gap)
Helper for handle_data_to_established() that gets simple info about Peer_socket::m_rcv_packets_with_g...
bool snd_deqable(Peer_socket::Const_ptr sock) const
Return true if and only if there are enough data either in Peer_socket::m_snd_rexmit_q of sock (if re...
Fine_time_pt m_last_loss_sock_log_when
For debugging, when we detect loss of data we'd sent, we log the corresponding socket's state; this i...
Definition: node.hpp:3874
Server_socket::Ptr listen(flow_port_t local_port, Error_code *err_code=0, const Peer_socket_options *child_sock_opts=0)
Sets up a server on the given local Flow port and returns Server_socket which can be used to accept s...
void cancel_timers(Peer_socket::Ptr sock)
Cancel any timers and scheduled tasks active in the given socket.
void sock_pacing_time_slice_end(Peer_socket::Ptr sock, const Error_code &sys_err_code)
async_sock_low_lvl_packet_send_paced() pacing helper: If sock_pacing_process_q() ran out of the last ...
Definition: low_lvl_io.cpp:969
void sock_rcv_buf_now_readable(Peer_socket::Ptr sock, bool syn_rcvd_qd_packet)
Helper for handle_data_to_established() that assumes the given's socket Receive buffer is currently r...
boost::shared_ptr< Net_env_simulator > m_net_env_sim
The object used to simulate stuff like packet loss and latency via local means directly in the code.
Definition: node.hpp:3713
void async_wait_latency_then_handle_incoming(const Fine_duration &latency, util::Blob *packet_data, const util::Udp_endpoint &low_lvl_remote_endpoint)
Sets up handle_incoming(packet_data, low_lvl_remote_endpoint) to be called asynchronously after a spe...
Definition: low_lvl_io.cpp:255
void snd_flying_pkts_erase_one(Peer_socket::Ptr sock, Peer_socket::Sent_pkt_ordered_by_when_iter pkt_it)
Erases (for example if considered Acknowledged or Dropped) a packet struct from the "scoreboard" (Pee...
Opt_type opt(const Opt_type &opt_val_ref) const
Obtain a copy of the value of a given option in a thread-safe manner.
Definition: node.hpp:4180
bool sock_validate_options(const Peer_socket_options &opts, const Peer_socket_options *prev_opts, Error_code *err_code) const
Analogous to validate_options() but checks per-socket options instead of per-Node options.
Options_mutex m_opts_mutex
The mutex protecting m_opts.
Definition: node.hpp:3707
boost::shared_ptr< util::Timer > Timer_ptr
boost.asio timer wrapped in a ref-counted pointer.
Definition: node.hpp:1430
void event_set_close(Event_set::Ptr event_set, Error_code *err_code)
Implementation of Event_set::close() when Event_set::state() != Event_set::State::S_CLOSED for event_...
Definition: event_set.cpp:1273
void handle_accumulated_pending_acks(const Socket_id &socket_id, Peer_socket::Ptr sock)
Helper of perform_accumulated_on_recv_tasks() that handles any additional individual outgoing acknowl...
void receive_wnd_recovery_data_received(Peer_socket::Ptr sock)
Pertaining to the async_rcv_wnd_recovery() mechanism, this handles the event that we have received an...
static Peer_socket::order_num_t sock_get_new_snd_order_num(Peer_socket::Ptr sock)
Returns the "order number" to use for Peer_socket::Sent_packet::Sent_when structure corresponding to ...
Peer_socket::Options_mutex Options_mutex
Short-hand for high-performance, non-reentrant, exclusive mutex used to lock m_opts.
Definition: node.hpp:1436
Peer_socket::Ptr sync_connect_impl(const Remote_endpoint &to, const Fine_duration &max_wait, const boost::asio::const_buffer &serialized_metadata, Error_code *err_code, const Peer_socket_options *opts)
Implementation core of sync_connect*() that gets rid of templated or missing arguments thereof.
void event_set_check_baseline_assuming_state(Event_set::Ptr event_set)
Helper placed by event_set_async_wait() onto thread W to invoke event_set_check_baseline() but first ...
Definition: event_set.cpp:989
size_t max_block_size() const
The maximum number of bytes of user data per received or sent block on connections generated from thi...
Definition: node.cpp:1112
const util::Udp_endpoint & local_low_lvl_endpoint() const
Return the UDP endpoint (IP address and UDP port) which will be used for receiving incoming and sendi...
Definition: node.cpp:369
boost::asio::ip::udp::socket Udp_socket
Short-hand for UDP socket.
Definition: node.hpp:1427
bool serv_is_acceptable(const boost::any &serv_as_any) const
Returns true if and only if calling serv->accept() with at least some arguments would return either n...
void snd_flying_pkts_push_one(Peer_socket::Ptr sock, const Sequence_number &seq_num, Peer_socket::Sent_packet::Ptr sent_pkt)
Adds a new packet struct (presumably representing packet to be sent shortly) to the "scoreboard" (Pee...
void worker_run(const util::Udp_endpoint low_lvl_endpoint)
Worker thread W (main event loop) body.
Definition: node.cpp:151
Syn_packet::Ptr create_syn(Peer_socket::Const_ptr sock)
Helper that creates a new SYN packet object to the extent that is suitable for immediately passing to...
boost::unique_future< Error_code > m_event_loop_ready_result
The future object through which the non-W thread waits for m_event_loop_ready to be set to success/fa...
Definition: node.hpp:3883
bool sock_pacing_new_packet_ready(Peer_socket::Ptr sock, Low_lvl_packet::Ptr packet, Error_code *err_code)
async_sock_low_lvl_packet_send_paced() pacing helper: Handles a DATA or ACK packet that was just pass...
Definition: low_lvl_io.cpp:649
void close_abruptly(Peer_socket::Ptr sock, Error_code *err_code)
Implementation of non-blocking sock->close_abruptly() for socket sock in all cases except when sock->...
static void get_seq_num_range(const Packet_map_iter &packet_it, Sequence_number *seq_num_start, Sequence_number *seq_num_end)
Given an iterator into a Peer_socket::Sent_pkt_by_sent_when_map or Peer_socket::Recv_pkt_map,...
Peer_socket::Ptr sync_connect_with_metadata(const Remote_endpoint &to, const boost::chrono::duration< Rep, Period > &max_wait, const boost::asio::const_buffer &serialized_metadata, Error_code *err_code=0, const Peer_socket_options *opts=0)
A combination of sync_connect() and connect_with_metadata() (blocking connect, with supplied metadata...
Definition: node.hpp:3956
void event_set_close_worker(Event_set::Ptr event_set)
The guts of event_set_close_worker_check_state(): same thing, but assumes Event_set::state() == Event...
Definition: event_set.cpp:1332
Syn_ack_packet::Ptr create_syn_ack(Peer_socket::Const_ptr sock)
Like create_syn() but for SYN_ACK.
virtual Peer_socket * sock_create(const Peer_socket_options &opts)
Internal factory used for ALL Peer_socket objects created by this Node (including subclasses).
bool snd_buf_enqable(Peer_socket::Const_ptr sock) const
Return true if and only if there is enough free space in Peer_socket::m_snd_buf of sock to enqueue an...
bool can_send(Peer_socket::Const_ptr sock) const
Answers the perennial question of congestion and flow control: assuming there is a DATA packet to sen...
void async_no_sock_low_lvl_rst_send(Low_lvl_packet::Const_ptr causing_packet, const util::Udp_endpoint &low_lvl_remote_endpoint)
Sends an RST to the given UDP endpoint in response to the given incoming low-level packet that came f...
Definition: low_lvl_io.cpp:586
void sock_slide_rcv_next_seq_num(Peer_socket::Ptr sock, size_t slide_size, bool reassembly_in_progress)
Helper for handle_data_to_established() that aims to register a set of received DATA packet data as i...
void sock_log_detail(Peer_socket::Const_ptr sock) const
Logs a verbose state report for the given socket.
boost::unordered_map< flow_port_t, Server_socket::Ptr > Port_to_server_map
A map from the local Flow port to the local Server_socket listening on that port.
Definition: node.hpp:1453
static const size_t & S_NUM_EPHEMERAL_PORTS
Total number of Flow "ephemeral" ports (ones reserved locally at random with Node::listen(S_PORT_ANY)...
Definition: node.hpp:951
boost::unordered_set< Peer_socket::Ptr > m_socks_with_accumulated_pending_acks
Within a given low_lvl_recv_and_handle() or async part of async_wait_latency_then_handle_incoming() (...
Definition: node.hpp:3849
boost::unordered_set< Peer_socket::Ptr > m_socks_with_accumulated_acks
Within a given low_lvl_recv_and_handle() or async part of async_wait_latency_then_handle_incoming() c...
Definition: node.hpp:3867
friend size_t hash_value(const Socket_id &socket_id)
Definition: node.cpp:1162
static void advance_seq_num(Sequence_number *seq_num, boost::shared_ptr< const Data_packet > data)
Assuming *seq_num points to the start of data.m_data, increments *seq_num to point to the datum just ...
void async_low_lvl_ack_send(Peer_socket::Ptr sock, bool defer_delta_check, const Error_code &sys_err_code=Error_code())
Sends a low-level ACK packet, with all accumulated in Peer_socket::m_rcv_pending_acks of sock individ...
static Sequence_number snd_past_last_flying_datum_seq_num(Peer_socket::Const_ptr sock)
Obtain the sequence number for the datum just past the last (latest) In-flight (i....
util::Thread m_worker
Worker thread (= thread W). Other members should be initialized before this to avoid race condition.
Definition: node.hpp:3893
Sequence_number::Generator m_seq_num_generator
Sequence number generator (at least to generate ISNs). Only thread W can access this.
Definition: node.hpp:3780
Peer_socket::Ptr connect(const Remote_endpoint &to, Error_code *err_code=0, const Peer_socket_options *opts=0)
Initiates an active connect to the specified remote Flow server.
void event_set_all_check_delta(bool defer_delta_check)
For each WAITING Event_set within the Node: checks for any events that hold, and if any do hold,...
Definition: event_set.cpp:1129
void serv_peer_socket_closed(Server_socket::Ptr serv, Peer_socket::Ptr sock)
Records that a Server_socket-contained (i.e., currently un-established, or established but not yet ac...
bool validate_static_option(const Opt_type &new_val, const Opt_type &old_val, const std::string &opt_id, Error_code *err_code) const
Helper that compares new_val to old_val and, if they are not equal, logs and returns an error; used t...
Definition: node.hpp:4161
bool rcv_buf_deqable(Peer_socket::Const_ptr sock) const
Return true if and only if there are enough data in Peer_socket::m_rcv_buf of sock to give the user s...
void async_acknowledge_packet(Peer_socket::Ptr sock, const Sequence_number &seq_num, unsigned int rexmit_id, size_t data_size)
Causes an acknowledgment of the given received packet to be included in a future Ack_packet sent to t...
Socket_id_to_socket_map m_socks
The peer-to-peer connections this Node is currently tracking.
Definition: node.hpp:3792
unsigned int handle_incoming_with_simulation(util::Blob *packet_data, const util::Udp_endpoint &low_lvl_remote_endpoint, bool is_sim_duplicate_packet=false)
Helper for low_lvl_recv_and_handle() that calls handle_incoming() on the not-yet-deserialized low-lev...
Definition: low_lvl_io.cpp:187
bool validate_option_check(bool check, const std::string &check_str, Error_code *err_code) const
Helper that, if the given condition is false, logs and returns an error; used to check for option val...
Definition: node.cpp:1093
static const flow_port_t & S_FIRST_SERVICE_PORT
The port number of the lowest service port, making the range of service ports [S_FIRST_SERVICE_PORT,...
Definition: node.hpp:957
Peer_socket::Options_lock Options_lock
Short-hand for lock that acquires exclusive access to an Options_mutex.
Definition: node.hpp:1439
void low_lvl_recv_and_handle(Error_code sys_err_code)
Handles the pre-condition that m_low_lvl_sock has a UDP packet available for reading,...
Definition: low_lvl_io.cpp:46
friend bool operator==(const Socket_id &lhs, const Socket_id &rhs)
Definition: node.cpp:1157
static Socket_id socket_id(Peer_socket::Const_ptr sock)
Constructs the socket pair (connection ID) for the given socket.
Udp_socket m_low_lvl_sock
The UDP socket used to receive low-level packets (to assemble into application layer data) and send t...
Definition: node.hpp:3753
void async_low_lvl_packet_send_impl(const util::Udp_endpoint &low_lvl_remote_endpoint, Low_lvl_packet::Const_ptr packet, bool delayed_by_pacing, Peer_socket::Ptr sock)
Takes given low-level packet structure, serializes it, and initiates asynchronous send of these data ...
Definition: low_lvl_io.cpp:321
void handle_syn_ack_to_syn_sent(const Socket_id &socket_id, Peer_socket::Ptr sock, boost::shared_ptr< const Syn_ack_packet > syn_ack)
Handles a just-deserialized, just-demultiplexed low-level SYN_ACK packet delivered to the given peer ...
bool event_set_async_wait(Event_set::Ptr event_set, const Event_set::Event_handler &on_event, Error_code *err_code)
Implementation of Event_set::async_wait() when Event_set::state() == Event_set::State::S_INACTIVE.
Definition: event_set.cpp:917
size_t send(Peer_socket::Ptr sock, const Function< size_t(size_t max_data_size)> &snd_buf_feed_func, Error_code *err_code)
Implementation of non-blocking sock->send() for socket sock in all cases except when sock->state() ==...
static const size_t & S_NUM_SERVICE_PORTS
Total number of Flow "service" ports (ones that can be reserved by number with Node::listen()).
Definition: node.hpp:945
static const flow_port_t & S_FIRST_EPHEMERAL_PORT
The port number of the lowest ephemeral Flow port, making the range of ephemeral ports [S_FIRST_EPHEM...
Definition: node.hpp:963
void sock_set_int_state(Peer_socket::Ptr sock, Peer_socket::Int_state new_state)
Sets internal state of given socket to the given state and logs a TRACE message about it.
Peer_socket * sock_create_forward_plus_ctor_args(const Peer_socket_options &opts)
Returns a raw pointer to newly created Peer_socket or sub-instance like asio::Peer_socket,...
Definition: node.hpp:4193
boost::promise< Error_code > m_event_loop_ready
Promise that thread W sets to truthy Error_code if it fails to initialize or falsy once event loop is...
Definition: node.hpp:3880
bool sock_is_readable(const boost::any &sock_as_any) const
Returns true if and only if calling sock->receive() with at least some arguments would return either ...
bool async_sock_low_lvl_packet_send_or_close_immediately(const Peer_socket::Ptr &sock, Low_lvl_packet::Ptr &&packet, bool defer_delta_check)
Similar to async_sock_low_lvl_packet_send_paced() except it also calls close_connection_immediately(s...
log::Logger * this_thread_init_logger_setup(const std::string &thread_type, log::Logger *logger=0)
Helper to invoke for each thread in which this Node executes, whether or not it starts that thread,...
Definition: node.cpp:113
Peer_socket::Ptr handle_syn_to_listening_server(Server_socket::Ptr serv, boost::shared_ptr< const Syn_packet > syn, const util::Udp_endpoint &low_lvl_remote_endpoint)
Handles a just-deserialized, just-demultiplexed low-level SYN packet delivered to the given server so...
bool sock_data_to_rcv_buf_unless_overflow(Peer_socket::Ptr sock, boost::shared_ptr< Data_packet > packet)
Helper for handle_data_to_established() that aims to pass the payload of the given DATA packet to the...
bool sock_set_options(Peer_socket::Ptr sock, const Peer_socket_options &opts, Error_code *err_code)
Thread W implementation of sock->set_options().
bool running() const
Returns true if and only if the Node is operating.
Definition: node.cpp:420
Port_to_server_map m_servs
The server sockets this Node is currently tracking.
Definition: node.hpp:3798
bool sock_pacing_process_q(Peer_socket::Ptr sock, Error_code *err_code, bool executing_after_delay)
async_sock_low_lvl_packet_send_paced() pacing helper: Given that we are currently in the pacing time ...
Definition: low_lvl_io.cpp:849
Event_set::Ev_type_to_socks_map m_sock_events
All sockets that have been detected to be "ready" (by the Event_set doc header definition) at any poi...
Definition: node.hpp:3830
static const uint8_t S_DEFAULT_CONN_METADATA
Type and value to supply as user-supplied metadata in SYN, if user chooses to use [[a]sync_]connect()...
Definition: node.hpp:1403
const Node_options & validate_options(const Node_options &opts, bool init, Error_code *err_code) const
Given a new set of Node_options intended to replace (or initialize) a Node's m_opts,...
Definition: node.cpp:980
Server_socket * serv_create_forward_plus_ctor_args(const Peer_socket_options *child_sock_opts)
Like sock_create_forward_plus_ctor_args() but for Server_sockets.
Definition: node.hpp:4199
void setup_drop_timer(const Socket_id &socket_id, Peer_socket::Ptr sock)
Creates a new Drop Timer and saves it to sock->m_snd_drop_timer.
void handle_ack_to_established(Peer_socket::Ptr sock, boost::shared_ptr< const Ack_packet > ack)
Handles a just-deserialized, just-demultiplexed, low-level ACK packet delivered to the given peer soc...
Peer_socket::Ptr sync_connect(const Remote_endpoint &to, const boost::chrono::duration< Rep, Period > &max_wait, Error_code *err_code=0, const Peer_socket_options *opts=0)
The blocking (synchronous) version of connect().
Definition: node.hpp:3967
void listen_worker(flow_port_t local_port, const Peer_socket_options *child_sock_opts, Server_socket::Ptr *serv)
Thread W implementation of listen().
void handle_syn_ack_to_established(Peer_socket::Ptr sock, boost::shared_ptr< const Syn_ack_packet > syn_ack)
Handles a just-deserialized, just-demultiplexed, duplicate (equal to already-received SYN_ACK) low-le...
Signal_set m_signal_set
Signal set which we may or may not be using to trap SIGINT and SIGTERM in order to auto-fire interrup...
Definition: node.hpp:3890
void serv_peer_socket_acceptable(Server_socket::Ptr serv, Peer_socket::Ptr sock)
Records that an unestablished socket sock (Peer_socket::Int_state::S_SYN_RCVD) has just become establ...
void handle_data_to_syn_rcvd(Peer_socket::Ptr sock, boost::shared_ptr< Data_packet > packet)
Handles a just-deserialized, just-demultiplexed, low-level DATA packet delivered to the given peer so...
boost::unordered_map< Socket_id, Peer_socket::Ptr > Socket_id_to_socket_map
A map from the connection ID (= remote-local socket pair) to the local Peer_socket that is the local ...
Definition: node.hpp:1450
void setup_connection_timers(const Socket_id &socket_id, Peer_socket::Ptr sock, bool initial)
Assuming we've just sent SYN or SYN_ACK, sets up an asynchronous scheduled task to fire within some a...
void log_rcv_window(Peer_socket::Const_ptr sock, bool force_verbose_info_logging=false) const
Logs TRACE or DATA messages that show the detailed state of the receiving sequence number space.
size_t sock_rcv_wnd(Peer_socket::Const_ptr sock) const
Computes and returns the currently correct rcv_wnd value; that is the amount of space free in Receive...
Peer_socket::Ptr accept(Server_socket::Ptr serv, Error_code *err_code)
Implementation of non-blocking serv->accept() for server socket serv in all cases except when serv->s...
void connect_worker(const Remote_endpoint &to, const boost::asio::const_buffer &serialized_metadata, const Peer_socket_options *opts, Peer_socket::Ptr *sock)
Thread W implementation of connect().
bool drop_pkts_on_acks(Peer_socket::Ptr sock, const Peer_socket::Sent_pkt_ordered_by_when_iter &last_dropped_pkt_it, size_t *cong_ctl_dropped_pkts, size_t *cong_ctl_dropped_bytes, size_t *dropped_pkts, size_t *dropped_bytes, std::vector< Peer_socket::order_num_t > *pkts_marked_to_drop)
Helper of perform_accumulated_on_recv_tasks() that acts on the determination made by categorize_pkts_...
static const Peer_socket::Sent_packet::ack_count_t S_MAX_LATER_ACKS_BEFORE_CONSIDERING_DROPPED
For a given unacknowledged sent packet P, the maximum number of times any individual packet with high...
Definition: node.hpp:3686
bool async_low_lvl_syn_ack_ack_send_or_close_immediately(const Peer_socket::Ptr &sock, boost::shared_ptr< const Syn_ack_packet > &syn_ack)
Helper to create, fully fill out, and asynchronously send via async_sock_low_lvl_packet_send_or_close...
Node(log::Logger *logger, const util::Udp_endpoint &low_lvl_endpoint, Net_env_simulator *net_env_sim=0, Error_code *err_code=0, const Node_options &opts=Node_options())
Constructs Node.
Definition: node.cpp:40
util::Blob m_packet_data
Stores incoming raw packet data; re-used repeatedly for possible performance gains.
Definition: node.hpp:3774
Error_code sock_categorize_data_to_established(Peer_socket::Ptr sock, boost::shared_ptr< const Data_packet > packet, bool *dupe, bool *slide, size_t *slide_size)
Helper for handle_data_to_established() that categorizes the DATA packet received as either illegal; ...
void async_sock_low_lvl_rst_send(Peer_socket::Ptr sock)
Sends an RST to the other side of the given socket asynchronously when possible.
Event_set::Ptr event_set_create(Error_code *err_code=0)
Creates a new Event_set in Event_set::State::S_INACTIVE state with no sockets/events stored; returns ...
Definition: event_set.cpp:881
void interrupt_all_waits_worker()
Thread W implementation of interrupt_all_waits().
Definition: event_set.cpp:1421
void sock_set_state(Peer_socket::Ptr sock, Peer_socket::State state, Peer_socket::Open_sub_state open_sub_state=Peer_socket::Open_sub_state::S_CONNECTED)
Sets Peer_socket::m_state and Peer_socket::m_open_sub_state.
void receive_emptied_rcv_buf_while_disconnecting(Peer_socket::Ptr sock)
Placed by receive() onto W during a graceful close, after the Receive buffer had been emptied by the ...
void sock_disconnect_detected(Peer_socket::Ptr sock, const Error_code &disconnect_cause, bool close)
Records that thread W shows underlying connection is broken (graceful termination,...
size_t receive(Peer_socket::Ptr sock, const Function< size_t()> &rcv_buf_consume_func, Error_code *err_code)
Implementation of non-blocking sock->receive() for socket sock in all cases except when sock->state()...
void handle_connection_rexmit_timer_event(const Socket_id &socket_id, Peer_socket::Ptr sock)
Handles the triggering of the retransmit timer wait set up by setup_connection_timers(); it will re-s...
Node_options m_opts
This Node's global set of options.
Definition: node.hpp:3704
void async_low_lvl_recv()
Registers so that during the current or next m_task_engine.run(), the latter will wait for a receivab...
Definition: low_lvl_io.cpp:31
void close_connection_immediately(const Socket_id &socket_id, Peer_socket::Ptr sock, const Error_code &err_code, bool defer_delta_check)
A thread W method that handles the transition of the given socket from OPEN (any sub-state) to CLOSED...
void sock_disconnect_completed(Peer_socket::Ptr sock)
While in S_OPEN+S_DISCONNECTING state (i.e., after beginning a graceful close with sock_disconnect_de...
Fine_duration compute_rtt_on_ack(Peer_socket::Sent_packet::Const_ptr flying_pkt, const Fine_time_pt &time_now, Ack_packet::Individual_ack::Const_ptr ack, const Peer_socket::Sent_packet::Sent_when **sent_when) const
Helper of perform_accumulated_on_recv_tasks() that computes the RTT implied by a given individual ack...
void close_empty_server_immediately(const flow_port_t local_port, Server_socket::Ptr serv, const Error_code &err_code, bool defer_delta_check)
Handles the transition of the given server socket from S_LISTENING/S_CLOSING to S_CLOSED (including e...
Event_sets m_event_sets
Every Event_set to have been returned by event_set_create() and not subsequently reached Event_set::S...
Definition: node.hpp:3804
util::Rnd_gen_uniform_range< Peer_socket::security_token_t > m_rnd_security_tokens
Random number generator for picking security tokens; seeded on time at Node construction and generate...
Definition: node.hpp:3786
void interrupt_all_waits_internal_sig_handler(const Error_code &sys_err_code, int sig_number)
signal_set handler, executed on SIGINT and SIGTERM, if user has enabled this feature: causes interrup...
Definition: event_set.cpp:1456
Peer_socket::Ptr connect_with_metadata(const Remote_endpoint &to, const boost::asio::const_buffer &serialized_metadata, Error_code *err_code=0, const Peer_socket_options *opts=0)
Same as connect() but sends, as part of the connection handshake, the user-supplied metadata,...
void new_round_trip_time_sample(Peer_socket::Ptr sock, Fine_duration round_trip_time)
Handles a just-computed new RTT (round trip time) measurement for an individual packet earlier sent: ...
void perform_regular_infrequent_tasks(bool reschedule)
Performs low-priority tasks that should be run on an infrequent, regular basis, such as stat logging ...
Definition: node.cpp:1117
util::Udp_endpoint m_low_lvl_endpoint
After we bind m_low_lvl_sock to a UDP endpoint, this is a copy of that endpoint.
Definition: node.hpp:3764
bool ok_to_rexmit_or_close(Peer_socket::Ptr sock, const Peer_socket::Sent_pkt_ordered_by_when_iter &pkt_it, bool defer_delta_check)
Checks whether the given sent packet has been retransmitted the maximum number of allowed times; if s...
util::Task_engine m_task_engine
The main loop engine, functioning in the single-threaded-but-asynchronous callback-based "reactor" st...
Definition: node.hpp:3739
void sock_pacing_new_time_slice(Peer_socket::Ptr sock, const Fine_time_pt &now)
async_sock_low_lvl_packet_send_paced() pacing helper: Resets the socket's Send_pacing_data structure ...
Definition: low_lvl_io.cpp:765
Port_space m_ports
Flow port space for both client and server sockets. All threads may access this.
Definition: node.hpp:3777
void event_set_fire_if_got_events(Event_set::Ptr event_set)
Check whether given Event_set contains any active sockets (Event_set::m_can); if so,...
Definition: event_set.cpp:1084
void rst_and_close_connection_immediately(const Socket_id &socket_id, Peer_socket::Ptr sock, const Error_code &err_code, bool defer_delta_check)
Asynchronously send RST to the other side of the given socket and close_connection_immediately().
boost::unordered_set< Event_set::Ptr > Event_sets
A set of Event_set objects.
Definition: node.hpp:1456
void drop_timer_action(Peer_socket::Ptr sock, bool drop_all_packets)
Handles a Drop_timer (Peer_socket::m_snd_drop_timer) event in ESTABLISHED state by dropping the speci...
A peer (non-server) socket operating over the Flow network protocol, with optional stream-of-bytes an...
util::Mutex_non_recursive Options_mutex
Short-hand for high-performance, non-reentrant, exclusive mutex used to lock m_opts.
State
State of a Peer_socket.
Open_sub_state
The sub-state of a Peer_socket when state is State::S_OPEN.
@ S_CONNECTED
This Peer_socket was created through a passive connect (Node::accept() and the like) or an active con...
Sequence_number::seq_num_t order_num_t
Short-hand for order number type. 0 is reserved. Caution: Keep in sync with Drop_timer::packet_id_t.
Int_state
The state of the socket (and the connection from this end's point of view) for the internal state mac...
util::Lock_guard< Options_mutex > Options_lock
Short-hand for lock that acquires exclusive access to an Options_mutex.
Sent_pkt_by_sent_when_map::iterator Sent_pkt_ordered_by_when_iter
Short-hand for m_snd_flying_pkts_by_sent_when iterator type.
Sent_pkt_by_sent_when_map::const_iterator Sent_pkt_ordered_by_when_const_iter
Short-hand for m_snd_flying_pkts_by_sent_when const iterator type.
Internal net_flow class that maintains the available Flow-protocol port space, somewhat similarly to ...
Definition: port_space.hpp:94
An object of this type generates a series of initial sequence numbers (ISN) that are meant to be suff...
Definition: seq_num.hpp:373
An internal net_flow sequence number identifying a piece of data.
Definition: seq_num.hpp:126
A server socket able to listen on a single Flow port for incoming connections and return peer sockets...
State
State of a Server_socket.
An empty interface, consisting of nothing but a default virtual destructor, intended as a boiler-plat...
Definition: util.hpp:45
boost::shared_ptr< Peer_socket > Ptr
Short-hand for ref-counted pointer to mutable values of type Target_type::element_type (a-la T*).
Const_target_ptr Const_ptr
Short-hand for ref-counted pointer to immutable values of type Target_type::element_type (a-la T cons...
#define FLOW_ERROR_EMIT_ERROR(ARG_val)
Sets *err_code to ARG_val and logs a warning about the error using FLOW_LOG_WARNING().
Definition: error.hpp:202
#define FLOW_ERROR_EMIT_ERROR_LOG_INFO(ARG_val)
Identical to FLOW_ERROR_EMIT_ERROR(), but the message logged has flow::log::Sev::S_INFO severity inst...
Definition: error.hpp:218
#define FLOW_LOG_WARNING(ARG_stream_fragment)
Logs a WARNING message into flow::log::Logger *get_logger() with flow::log::Component get_log_compone...
Definition: log.hpp:152
#define FLOW_LOG_SET_CONTEXT(ARG_logger_ptr, ARG_component_payload)
For the rest of the block within which this macro is instantiated, causes all FLOW_LOG_....
Definition: log.hpp:405
#define FLOW_LOG_TRACE(ARG_stream_fragment)
Logs a TRACE message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
Definition: log.hpp:227
@ S_STATIC_OPTION_CHANGED
When setting options, tried to set an unchangeable (static) option.
@ S_NODE_NOT_RUNNING
Node not running.
Flow module containing the API and implementation of the Flow network protocol, a TCP-inspired stream...
Definition: node.cpp:25
uint16_t flow_port_t
Logical Flow port type (analogous to a UDP/TCP port in spirit but in no way relevant to UDP/TCP).
size_t hash_value(const Sequence_number &seq_num)
Free function that returns seq_num.hash(); has to be a free function named hash_value for boost....
Definition: seq_num.cpp:275
bool operator==(const Remote_endpoint &lhs, const Remote_endpoint &rhs)
Whether lhs is equal to rhs.
Definition: endpoint.cpp:60
const flow_port_t S_PORT_ANY
Special Flow port value used to indicate "invalid port" or "please pick a random available ephemeral ...
Definition: port_space.cpp:33
Auto_cleanup setup_auto_cleanup(const Cleanup_func &func)
Provides a way to execute arbitrary (cleanup) code at the exit of the current block.
Definition: util.hpp:282
boost::unique_lock< Mutex > Lock_guard
Short-hand for advanced-capability RAII lock guard for any mutex, ensuring exclusive ownership of tha...
Definition: util_fwd.hpp:265
Fine_duration chrono_duration_to_fine_duration(const boost::chrono::duration< Rep, Period > &dur)
Helper that takes a non-negative duration of arbitrary precision/period and converts it to Fine_durat...
Definition: util.hpp:31
boost::shared_ptr< void > Auto_cleanup
Helper type for setup_auto_cleanup().
Definition: util_fwd.hpp:205
boost::asio::io_service Task_engine
Short-hand for boost.asio event service, the central class of boost.asio.
Definition: util_fwd.hpp:135
boost::thread Thread
Short-hand for standard thread class.
Definition: util_fwd.hpp:78
Blob_with_log_context<> Blob
A concrete Blob_with_log_context that compile-time-disables Basic_blob::share() and the sharing API d...
Definition: blob_fwd.hpp:60
boost::asio::ip::udp::endpoint Udp_endpoint
Short-hand for the UDP endpoint (IP/port) type.
Definition: util_fwd.hpp:208
boost::system::error_code Error_code
Short-hand for a boost.system error code (which basically encapsulates an integer/enum error code and...
Definition: common.hpp:502
Fine_clock::duration Fine_duration
A high-res time duration as computed from two Fine_time_pts.
Definition: common.hpp:410
Fine_clock::time_point Fine_time_pt
A high-res time point as returned by Fine_clock::now() and suitable for precise time math in general.
Definition: common.hpp:407
unsigned char uint8_t
Byte. Best way to represent a byte of binary data. This is 8 bits on all modern systems.
Definition: common.hpp:385
boost::shared_ptr< const Individual_ack > Const_ptr
Short-hand for ref-counted pointer to immutable objects of this class.
The data nugget uniquely identifying a peer-to-peer connection from a remote endpoint to a port in th...
Definition: node.hpp:3904
const Remote_endpoint m_remote_endpoint
The other side of the connection.
Definition: node.hpp:3908
size_t hash() const
Hash value of this Socket_id for unordered<>.
Definition: node.cpp:1147
const flow_port_t m_local_port
This side of the connection (within this Node).
Definition: node.hpp:3910
A set of low-level options affecting a single Flow Node, including Peer_socket objects and other obje...
Definition: options.hpp:449
static std::string opt_id_to_str(const std::string &opt_id)
Helper that, for a given option m_blah, takes something like "m_blah_blah" and returns the similar mo...
Definition: options.cpp:160
Data store to keep timing related info when a packet is sent out.
uint16_t ack_count_t
Type used for m_acks_after_me.
A data store that keeps stats about the a Peer_socket connection.
Definition: info.hpp:456
A set of low-level options affecting a single Peer_socket.
Definition: options.hpp:36
Represents the remote endpoint of a Flow-protocol connection; identifies the UDP endpoint of the remo...
Definition: endpoint.hpp:93