Flow 2.0.0
Flow project: Full implementation reference.
node.hpp
Go to the documentation of this file.
1/* Flow
2 * Copyright 2023 Akamai Technologies, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the
5 * "License"); you may not use this file except in
6 * compliance with the License. You may obtain a copy
7 * of the License at
8 *
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in
12 * writing, software distributed under the License is
13 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
14 * CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing
16 * permissions and limitations under the License. */
17
18/// @file
19#pragma once
20
22#include "flow/log/log.hpp"
31#include "flow/util/util.hpp"
32#include <boost/unordered_map.hpp>
33
34/**
35 * Flow module containing the API and implementation of the *Flow network protocol*, a TCP-inspired stream protocol
36 * that uses UDP as underlying transport. See the large doc header on class net_flow::Node for the "root" of all
37 * documentation w/r/t `net_flow`, beyond the present brief sentences.
38 *
39 * ### Historical note ###
40 * Historically, the Flow project only existed in the first place to deliver the functionality now in this
41 * `namespace` flow::net_flow. However, since then, `net_flow` has become merely one of several Flow modules, each
42 * providing functionality independent of the others'. In the past, all/most `net_flow{}`
43 * contents resided directly in `namespace` ::flow, but now it has been segregated into its own namespace.
44 *
45 * `net_flow` may still be, by volume, the largest module (hence also perhaps the largest user of general-use modules
46 * like flow::log and flow::util). Nevertheless, it is no longer "special."
47 *
48 * @see Main class net_flow::Node.
49 */
50namespace flow::net_flow
51{
52// Types.
53
54/**
55 * An object of this class is a single Flow-protocol networking node, in the sense that: (1) it has a distinct IP
56 * address and UDP port; and (2) it speaks the Flow protocol over a UDP transport layer. Here we summarize class Node
57 * and its entire containing Flow module flow::net_flow.
58 *
59 * See also flow::asio::Node, a subclass that allows for full use of our API (its superclass) and turns our sockets
60 * into boost.asio I/O objects, able to participate with ease in all boost.asio event loops. If you're already very
61 * familiar with `boost::asio::ip::tcp`, you can skip to the asio::Node doc header. If not, recommend becoming
62 * comfortable with the asio-less API, then read the forementioned asio::Node doc header.
63 *
64 * The flow::asio::Node class doc header (as of this writing) includes a compact summary of all network operations
65 * supported by the entire hierarchy and hence deserves a look for your convenience.
66 *
67 * Using flow::net_flow, starting with the present class Node
68 * ----------------------------------------------------------
69 *
70 * Node is an important and central class of the `netflow` Flow module and thus deserves some semi-philosophical
71 * discussion, namely what makes a Node a Node -- why the name? Let's delve into the 2 aforementioned properties of a
72 * Node.
73 *
74 * ### A Node has a distinct IP address and UDP port: util::Udp_endpoint ###
75 * A Node binds to an IP address and UDP port, both of which are given (with the usual ephemeral port and
76 * IP address<->interface(s) nomenclature) as an argument at Node::Node() construction and can never change over the
77 * lifetime of the object. The IP and port together are a util::Udp_endpoint, which is a `using`-alias of boost.asio's
78 * `boost::asio::ip::udp::endpoint` . In the same network (e.g., the Internet) no two Node
79 * objects (even in separate processes; even on different machines) may be alive (as defined by
80 * `Node::running() == true`) with constructor-provided util::Udp_endpoint objects `R1` and `R2` such that `R1 == R2`.
81 * In particular, if `Node n1` exists, with `n1.running()` and `n1.local_low_lvl_endpoint() == R1`, and on the same
82 * machine one attempts to construct `Node n2(R2)`, such that `R1 == R2` (their IPs and ports are equal), then `n2`
83 * will fail to properly construct, hence `n2.running() == false` will be the case, probably due to port-already-bound
84 * OS error. (There are counter-examples with NAT'ed IP addresses and special values 0.0.0.0 and port 0, but please
85 * just ignore those and other pedantic objections and take the spirit of what I am saying. Ultimately, the point
86 * is:
87 *
88 * <em>A successfully constructed (`running() == true`) Node occupies the same IP-and-UDP "real estate" as would a
89 * a mere successfully bound UDP socket.</em>
90 *
91 * So all that was a long, overbearing way to emphasize that a Node binds to an IP address and UDP port, and a single
92 * such combo may have at most one Node on it (unless it has `!running()`).
93 * *That's why it is called a Node*: it's a node on the network, especially on Internet.
94 *
95 * ### A Node speaks the *Flow network protocol* to other, remote Nodes ###
96 * If `Node n1` is successfully constructed, and `Node n2` is as well, the two can communicate via a new protocol
97 * implemented by this Flow module. This protocol is capable of working with stream (TCP-like) sockets
98 * implemented on top of UDP in a manner analogous to how an OS's net-stack implements
99 * TCP over IP. So one could call this Flow/UDP. One can talk Flow/UDP to another
100 * Flow/UDP endpoint (a/k/a Node) only; no compatibility with any other protocol is supported.
101 * (This isn't, for example, an improvement to one side of TCP that is still compatible with legacy TCPs on
102 * the other end; though that is a fertile area for research in its own right.) The socket can also operate in
103 * unreliable, message boundary-preserving mode, controllable via a Flow-protocol-native socket option; in which case
104 * reliability is the responsibility of the `net_flow` user. By default, though, it's like TCP: message bounds are not
105 * preserved; reliability is guaranteed inside the protocol. `n1` and `n2` can be local in the same process, or local
106 * in the same machine, or remote in the same overall network -- as long as one is routable to the other, they can talk.
107 *
108 * For practical purposes, it's important to have idea of a single running() Node's "weight." Is it light-weight like
109 * a UDP or TCP socket? Is it heavy-weight like an Apache server instance? The answer is that it's MUCH close to
110 * the former: it is fairly light-weight. As of this writing, internally, it stores a table of peer and server sockets
111 * (of which there could be a handful or tons, depending on the user's own API calls prior); and uses at least one
112 * dedicated worker thread (essentially not visible to the user but conceptually similar to a UDP or TCP stack user's
113 * view of the kernel: it does stuff for one in the background -- for example it can wait for incoming connections,
114 * if asked). So, a Node is an intricate but fairly light-weight object that stores socket tables (proportional in
115 * size to the sockets currently required by the Node's user) and roughly a single worker thread performing low-level
116 * I/O and other minimally CPU-intensive tasks. A Node can get busy if a high-bandwidth network is sending or
117 * receiving intense traffic, as is the case for any TCP or UDP net-stack. In fact, a Node can be seen as a little
118 * Flow-protocol stack implemented on top of UDP transport. (Historical note: `class Node` used to be `class Stack`,
119 * but this implied a heavy weight and misleadingly discouraged multiple constructions in the same program; all that
120 * ultimately caused the renaming to Node.)
121 *
122 * ### Essential properties of Flow network protocol (Flow ports, mux/demuxing) ###
123 * A single Node supports 0 or more (an arbitrary # of) peer-to-peer connections to other `Node`s.
124 * Moreover, given two `Node`s `n1` and `n2`, there can similarly be 0 or more peer-to-peer connections
125 * running between the two. In order to allow this, a port (and therefore multiplexing/demultiplexing) system is
126 * a feature of Flow protocol. (Whether this features is necessary or even desirable is slightly controversial and
127 * not a settled matter -- a to-do on this topic can be found below.)
128 *
129 * More specifically, think of a *given* `Node n1` as analogous (in terms of is multiplexing capabilities) to
130 * one TCP stack running on a one-interface machine. To recap the TCP port-addressing scheme (assuming only 1
131 * interface): The TCP stack has approximately 2^16 (~65k) ports available. One may create and "bind" a server "socket"
132 * to (more or less, for our discussion) any 1 of these ports. Let's say a server socket is bound to port P1.
133 * If a remote TCP stack successfully connects to such a server-bound port, this results in a passively-connected
134 * client "socket," which -- also -- is bound to P1 (bear with me as to how this is possible). Finally, the TCP
135 * stack's user may bind an *actively* connecting client "socket" to another port P2 (P2 =/= P1; as P1 is reserved
136 * to that server and passively connected clients from that server). Recall that we're contriving a situation where
137 * there is only one other remote stack, so suppose there is the remote, 1-interface TCP stack.
138 * Now, let's say a packet arrives along an established connection from this stack.
139 * How does our local TCP stack determine to which connection this belongs? This is
140 * "demultiplexing." If the packet contains the info "destination port: P2," then that clearly belongs to the
141 * actively-connected client we mentioned earlier... but what if it's "dest. port: P1"? This could belong to any
142 * number of connections originally passive-connected by incoming server connection requests to port P1.
143 * Answer: the packet also contains a "source TCP port" field. So the *connection ID*
144 * (a/k/a *socket ID*) consists of BOTH pieces of data: (1) destination (local) port; (2) source (remote) port.
145 * (Recall that, symmetrically, the remote TCP stack had to have a client bind to some port,
146 * and that means no more stuff can bind to that port; so it is unique and can't clash with anything else -- inside that
147 * remote stack.) So this tuple uniquely identifies the connection in this scenario of a single-interface local TCP
148 * that can have both active client sockets and passive-client-socket-spawning server sockets; and talk to other stacks
149 * like it. Of course, there can be more than one remote TCP stack. So the 2-tuple (pair) becomes a 3-tuple (triplet)
150 * in our slightly simplified version of reality: (1) destination (local) TCP port; (2) source (remote) IP address;
151 * and (3) source (remote) TCP port. (In reality, the local TCP stack can bind
152 * to different interfaces, so it becomes a 4-tuple by adding in destination (local) IP address... but that's TCP and
153 * is of no interest to our analogy to Flow protocol.)
154 *
155 * What about Flow protocol? GIVEN `n1` and `n2`, it works just the same. We have a special, TCP-like, Flow port space
156 * WITHIN `n1` and similarly within `n2`. So if only `n1` and `n2` are involved, an `n1` Server_socket (class) object
157 * can listen() (<-- actual method) on a net_flow::flow_port_t (<-- alias to 2-byte unsigned as of this writing)
158 * port P1; Server_socket::accept() (another method) incoming connections, each still bound to port P1; and `n1` can
159 * also actively connect() (another method) to `n2` at some port over there. Then an incoming UDP packet's
160 * intended established connection is demuxed to by a 2-tuple: (1) destination (local) `flow_port_t`;
161 * (2) source (remote) `flow_port_t`.
162 *
163 * In reality, other remote `Node`s can of course be involved: `n3`, `n4`, whatever. As we've established, each Node
164 * lives at a UDP endpoint: util::Udp_endpoint (again, IP address + UDP port). Therefore, from the stand-point of
165 * a given local `Node n1`, each established peer-to-peer connection is identified fully by the 5-tuple (marked here
166 * with roman numerals):
167 * 1. Local `flow_port_t` within `n1`'s port-space (not dissimilar to TCP's port space in size and behavior). (I)
168 * 2. Remote endpoint identifying the remote Node: Remote_endpoint.
169 * 1. util::Udp_endpoint.
170 * 1. IP address. (II)
171 * 2. UDP port. (III)
172 * 3. Remote net_flow::flow_port_t. (IV)
173 *
174 * So, that is how it works. Of course, if this complexity is not really necessary for some application, then
175 * only really (II) and (III) are truly necessary. (I) and (IV) can just be chosen to be some agreed-upon
176 * constant port number. Only one connection can ever exist in this situation, and one would need to create
177 * more `Node`s one side or the other or both to achieve more connections between the same pair of IP addresses,
178 * but that's totally reasonable: it's no different from simply binding to more UDP ports. My point here is that
179 * the Flow-protocol-invented construct of "Flow ports" (given as `flow_port_t` values) can be used to conserve UDP
180 * ports; but they can also be NOT used, and one can just use more UDP ports, as a "regular" UDP-using pair of
181 * applications would, if more than one flow of information is necessary between those two apps. It is up to you.
182 * (Again, some arguments can be made for getting rid of (I) and (IV), after all. This possibility is discussed in
183 * a below to-do.)
184 *
185 * (Do note that, while we've emulated TCP's port scheme, there is no equivalent of IP's "interfaces." Each Node
186 * just has a bunch of ports; there is no port table belonging to each of N interfaces or any such thing.)
187 *
188 * ### flow::net_flow API overview ###
189 * This is a summary (and some of this is very briefly mentioned above); all the classes and APIs are much more
190 * deeply documented in their own right. Also, see above pointer to asio::Node whose doc header may be immediately
191 * helpful to experienced users. Meanwhile, to summarize:
192 *
193 * The Node hands out sockets as Peer_socket objects; it acts as a factory for them (directly) via its connect() and
194 * (indirectly) Server_socket::accept() families of methods. It is not possible to construct a Peer_socket
195 * independently of a Node, due to tight coordination between the Node and each Peer_socket. Moreover each Peer_socket
196 * is handed out via `boost::shared_ptr` smart pointer. While not strictly necessary, this is a situation where both
197 * the user and a central registry (Node) can own the Peer_socket at a given time, which is an ideal application for
198 * `shared_ptr<>` that can greatly simplify questions of object ownership and providing auto-`delete` to boot.
199 *
200 * Thus: `Node::listen(flow_port_t P)` yields a Server_socket::Ptr, which will listen for incoming connections on `P`.
201 * Server_socket::accept() (and similar) yields a Peer_socket::Ptr, one side of a peer-to-peer connection.
202 * On the other side, `Node::connect(Remote_endpoint R)` (where `R` contains `Udp_endpoint U`, where
203 * value equal to `U` had been earlier passed to constructor of the `listen()`ing `Node`; and `R` also contains
204 * `flow_port_t P`, passed to `Node::listen()`). connect(), too, yields a Peer_socket::Ptr. And thus, if all went
205 * well, each side now has a Peer_socket::Ptr `S1` and `S2`, which -- while originating quite differently --
206 * are now completely equal in capabilities: they are indeed *peer* sockets. They have methods like Peer_socket::send()
207 * and Peer_socket::receive().
208 *
209 * Further nuances can be explored in the various other doc headers, but I'll mention that both non-blocking behavior
210 * (meaning the call always returns immediately, even if unable to immediately perform the desired task such as
211 * accept a connection or receive 1 or more bytes) and blocking behavior as supported, as in (for example) a BSD
212 * sockets API. However, there is no "blocking" or "non-blocking" mode as in BSD or WinSock (personally I, Yuri, see it
213 * as an annoying anachronism). Instead you simply call a method named according to whether it will never block or
214 * (possibly -- if appropriate) block. The nomenclature convention is as follows: if the action is `X` (e.g.,
215 * `X` is `receive` or `accept`), then `->X()` is the non-blocking version; and `->sync_X()` is the blocking one.
216 * A non-blocking version always exists for any possible action; and a blocking version exists if it makes sense for it
217 * to exist. (Exception: Event_set::async_wait() explicitly includes `async_` prefix contrary to this convention.
218 * Partially it's because just calling it `wait()` -- convention or not -- makes it sound like it's going to block,
219 * whereas it emphatically will never do so. ALSO it's because it's a "special" method with unique properties
220 * including letting user execute their own code in a Node's internal worker thread. So rules go out the window a
221 * little bit for that method; hence the slight naming exception.)
222 *
223 * ### Nomenclature: "low-level" instead of "UDP" ###
224 * Side note: You will sometimes see the phrase `low_lvl` in various identifiers among `net_flow` APIs.
225 * `low_lvl` (low-level) really means "UDP" -- but theoretically some other packet-based transport could be used
226 * instead in the future; or it could even be an option to chooose between possible underlying protocols.
227 * For example, if `net_flow` moved to kernel-space, the transport could become IP, as it is for TCP.
228 * So this nomenclature is a hedge; and also it argubly is nicer/more generic: the fact it's UDP is immaterial; that
229 * it's the low-level (from our perspective) protocol is the salient fact. However, util::Udp_endpoint is thus named
230 * because it is very specifically a gosh-darned UDP port (plus IP address), so hiding from that by naming it
231 * `Low_Lvl_endpoint` (or something) seemed absurd.
232 *
233 * ### Event, readability, writability, etc. ###
234 * Any experienced use of BSD sockets, WinSock, or similar is probably wondering by now, "That sounds reasonable, but
235 * how does the API allow me to wait until I can connect/accept/read/write, letting me do other stuff in the meantime?"
236 * Again, one can use a blocking version of basically every operation; but then the wait for
237 * readability/writability/etc. may block the thread. One can work around this by creating multiple threads, but
238 * multi-threaded coding introduced various difficulties. So, the experienced socketeer will want to use non-blocking
239 * operations + an event loop + something that allow one to wait of various states (again, readability, writability,
240 * etc.) with various modes of operation (blocking, asynchronous, with or without a timeout, etc.).
241 * The most advanced and best way to get these capabilities is to use boost.asio integration (see asio::Node).
242 * As explained elsewhere (see Event_set doc header) this is sometimes not usable in practice. In that case:
243 * These capabilities are supplied in the class Event_set. See that class's doc header for further information.
244 * Event_set is the `select()` of this socket API. However it is significantly more convenient AND indeed supports
245 * a model that will allow one to use Flow-protocol sockets in a `select()`- or equivalent-based event loop, making
246 * `net_flow` module usable in a true server, such as a web server. That is, you don't just have to write a separate
247 * Flow event loop operating independently of your other sockets, file handles, etc. This is an important property in
248 * practice. (Again: Ideally you wouldn't need Event_set for this; asio::Node/etc. might be better to use.)
249 *
250 * ### Error reporting ###
251 * Like all Flow modules, `net_flow` uses error reporting conventions/semantics introduced in `namespace` ::flow
252 * doc header Error Reporting section.
253 *
254 * In particular, this module does add its own error code set. See `namespace` net_flow::error doc header which
255 * should point you to error::Code `enum`. All error-emitting `net_flow` APIs emit `Error_code`s assigned from
256 * error::Code `enum` values.
257 *
258 * ### Configurability, statistics, logging ###
259 * Great care is taken to provide visibility into the "black box" that is Flow-protocol. That is, while the API follows
260 * good practices wherein implementation is shielded away from the user, at the same time the *human* user has powerful
261 * tools to both examine the insides of the library/protocol's performance AND to tweak the parameters of its
262 * behavior. Picture a modern automobile: while you're driving at least, it's not going to let you look at or mess with
263 * its engine or transmission -- nor do you need to understand how they work; BUT, the onboard monitor
264 * will feature screens that tell you about its fuel economy performance, the engine's inner workings, and perhaps a
265 * few knobs to control the transmission's performance (for example). Same principles are followed here.
266 *
267 * More specifically:
268 * - *Configuration* Socket options are supported via Node_options and Peer_socket_options. These control many
269 * aspects of the library's behavior, for example which congestion control algorithm to use.
270 * These options can be set programmatically, through a config file, or through command line options.
271 * Particular care was taken to make the latter two features seamlessly available by
272 * leveraging boost.program_options.
273 * - *Statistics* Very detailed stats are kept in Peer_socket_receive_stats and Peer_socket_send_stats, combined
274 * with more data in Peer_socket_info. These can be accessed programmatically; their individual stats can also
275 * be accessed programmatically; or they can be logged to any `ostream`. Plus, the logging system periodically logs
276 * them (assuming this logging level is enabled).
277 * - *Logging* Like all Flow modules, `net_flow` uses logging conventions/semantics introduced in `namespace` ::flow
278 * doc header Logging section.
279 *
280 * ### Multiple Node objects ###
281 * As mentioned already many times, multiple Node objects can exist and function simultaneously (as long as they
282 * are not bound to the same conceptual util::Udp_endpoint, or to the same UDP port of at least one IP interface).
283 * However, it is worth emphasizing that -- practically speaking -- class Node is implemented in such a way as to make
284 * a given Node 100% independent of any other Node in the same process. They don't share working thread(s), data
285 * (except `static` data, probably just constants), any namespaces, port spaces, address spaces, anything. Each Node
286 * is independent both API-wise and in terms of internal implementation.
287 *
288 * ### Thread safety ###
289 * All operations safe for simultaneous execution on 2+ separate Node objects *or on the same Node*,
290 * or on any objects (e.g., Peer_socket) returned by Node. (Please note the *emphasized* phrase.)
291 * "Operations" are any Node or Node-returned-object method calls after construction and before destruction of the
292 * Node. (In particular, for example, one thread can listen() while another connect()s.) The same guarantee may or
293 * may not apply to other classes; see their documentation headers for thread safety information.
294 *
295 * ### Thread safety of destructor ###
296 * Generally it is not safe to destruct a Node (i.e., let Node::~Node() get called) while a Node operation is in
297 * progress on that Node (obviously, in another thread). There is one exception to this: if a blocking operation
298 * (any operation with name starting with `sync_`) has entered its blocking (sleep) phase, it is safe to delete the
299 * underlying Node. In practice this means simply that, while you need not lock a given Node with an external
300 * mutex while calling its various methods from different threads (if you really must use multiple threads this way),
301 * you should take care to probably join the various threads before letting a Node go away.
302 *
303 * Historical note re. FastTCP, Google BBR
304 * ---------------------------------------
305 *
306 * ### Historical note re. FastTCP ###
307 * One notable change in this `net_flow` vs. the original libgiga is this
308 * one lacks the FastTCP congestion control strategy. I omit the historical reasons for this for now
309 * (see to-do regarding re-introducing licensing/history/location/author info, in common.hpp).
310 *
311 * Addendum to the topic of congestion control: I am not that interested in FastTCP, as I don't see it as cutting-edge
312 * any longer. I am interested in Google BBR. It is a goal to implement Google BBR in `net_flow`, as that congestion
313 * control algorithm is seen by many as simply the best one available; a bold conclusion given how much research
314 * and given-and-take and pros-and-cons discussions have tramspired ever since the original Reno TCP became ubiquitous.
315 * Google BBR is (modulo whatever proprietary improvements Google chooses to implement in their closed-source software)
316 * publicly documented in research paper(s) and, I believe, available as Google open source.
317 *
318 * @todo flow::net_flow should use flow::cfg for its socket-options mechanism. It is well suited for that purpose, and
319 * it uses some ideas from those socket-options in the first place but is generic and much more advanced. Currently
320 * `net_flow` socket-options are custom-coded from long before flow::cfg existed.
321 *
322 * @todo `ostream` output operators for Node and asio::Node should exist. Also scour through all types; possibly
323 * some others could use the same. (I have been pretty good at already implementing these as-needed for logging; but
324 * I may have "missed a spot.")
325 *
326 * @todo Some of the `ostream<<` operators we have take `X*` instead of `const X&`; this should be changed to the latter
327 * for various minor reasons and for consistency.
328 *
329 * @todo Actively support IPv6 and IPv4, particularly in dual-stack mode (wherein net_flow::Server_socket would bind to
330 * an IPv6 endpoint but accept incoming V4 and V6 connections alike). It already supports it nominally, in that one
331 * can indeed listen on either type of address and connect to either as well, but how well it works is untested, and
332 * from some outside experience it might involve some subtle provisions internally.
333 *
334 * @todo Based on some outside experience, there maybe be problems -- particularly considering the to-do regarding
335 * dual-stack IPv6/v4 support -- in servers listening in multiple-IP situations; make sure we support these seamlessly.
336 * For example consider a machine with a WAN IP address and a LAN (10.x.x.x) IP address (and possibly IPv6 versions
337 * of each also) that (as is typical) binds on all of them at ANY:P (where P is some port; and ANY is the IPv6 version,
338 * with dual-stack mode ensuring V4 datagrams are also received). If a client connects to a LAN IP, while in our
339 * return datagrams we set the source IP to the default, does it work? Outside experience shows it might not,
340 * depending, plus even if in our protocol it does, it might be defeated by some firewall... the point is it requires
341 * investigation (e.g., mimic TCP itself; or look into what IETF or Google QUIC does; and so on).
342 *
343 * @internal
344 *
345 * Implementation notes
346 * --------------------
347 *
348 * In this section, and within implementation, I may simply say "Flow" instead of "Flow [network] protocol," for
349 * brevity. This is not meant to represent all of the containing Flow project nor refer to any module other
350 * that `net_flow`.
351 *
352 * ### Note on general design philosophy ###
353 * The protocol is TCP-like. However, it is not TCP. Moreover, it is not
354 * TCP even if you remove the fact that it sends UDP datagrams instead of IP packets. It follows the basic
355 * goals of TCP (stream of bytes, reliability, congestion control, etc.), and it borrows many of its internal
356 * techniques (congestion window, receive window ACKs, drop timeout, etc.), but it specifically will not
357 * inherit those limitations of TCP that are essentially historic and/or a result of having to be backwards-
358 * compatible with the other side which may be behind. For example, all ACKs in Flow are selective; whereas in
359 * TCP ACKs are generally cumulative, while selective ACKs (SACKs) are an advanced option that the other side
360 * may or may not support. Making certain modern decisions in Flow that TCP implementations simply cannot
361 * make means (1) a simpler protocol and implementation; and (2) potentially higher performance before we can
362 * try any advanced stuff like new congestion control strategies or FEC (forward error correction).
363 *
364 * Moreover, I tried to take care in not copying various classic TCP RFCs mindlessly. Instead, the idea was
365 * to evaluate the spirit, or intent, of each technical detail of a given RFC -- and then translate it into
366 * the (hopefully) more elegant and less historical baggage-encumbered world of Flow. This is particularly
367 * something I felt when translating the terse congestion control-related TCP RFCs into `net_flow` C++ code. (For
368 * a more specific explanation of what I mean, check out the "general design note" in the class doc header of
369 * the class Congestion_control_strategy.) Hopefully this means terse, concern-conflating TCP RFCs (and at
370 * times Linux kernel's somewhat less concern-conflating implementations of these RFCs) become clean, concern-
371 * separated Flow code. This means, also, a rather extremely high ratio of comments to code in many areas
372 * of this project. I wanted to clearly explain every non-trivial decision, even if it meant many long-winded
373 * English explanations.
374 *
375 * ### Basic implementation ###
376 * Constructor creates new thread, henceforth called thread W, which houses the Node's
377 * main loop and performs all blocking work (and generally most work) for the class; this exists until the
378 * destructor executes. In general, for simplicity (in a way) and consistency, even when a given call (e.g.,
379 * non-blocking connect()) could perform some preliminary work (e.g., argument checking and ephemeral port
380 * reservation) directly in the caller's thread and then place a callback (e.g., connect_worker()) on W for
381 * the real work (e.g., filling out packet, sending SYN, etc.), we instead choose to put all the work (even
382 * the aforementioned preliminary kind) onto W and (non-blockingly!) wait for that callback to finish, via a
383 * boost.thread `future`. While a bit slower, I find this simplifies the code by breaking it up less and
384 * keeping it in one place in the code base for a given operation (in this example, connect()). It can also
385 * reduce mutex usage in our code. Also it enables a slightly better user experience, as errors are reported
386 * earlier in the user code and state changes are not asynchronous, when they don't need to be.
387 *
388 * The above can apply to such things as non-blocking connect(), listen(). It probably wouldn't apply
389 * to Peer_socket::send(), Peer_socket::receive(), Server_socket::accept() for performance reasons.
390 *
391 * ### Note on threads W and U ###
392 * In classic Berkeley sockets, one often thinks of "the kernel" performing certain
393 * work in "the background," the results of which the user level code might access. For example, the kernel
394 * might receive IP packets on some port and deserialize them into a socket's receive buffer -- while the
395 * user's program is busy doing something completely unrelated like showing graphics in a video game -- then
396 * the `recv()` call would later transfer the deserialized stream into the user's own user-level buffer (e.g., a
397 * `char[]` array). In our terminology, "thread W" is the equivalent of "the kernel" doing stuff "in the
398 * background"; while "thread U" refers to the user's own thread where they do other stuff and at times access
399 * the results of the "kernel background" stuff. However, Flow is a user-level tool, so thread W is not
400 * actually running in the kernel... but conceptually it is like it.
401 *
402 * ### boost.asio ###
403 * The implementation heavily uses the boost.asio library, as recommended by ::flow doc
404 * header boost.asio section. This implements the main loop's flow
405 * (with a callback-based model), all UDP traffic, timers.
406 * Why use boost.asio and not home-made networking and event loop code? There are a few good reasons. For
407 * UDP networking, boost.asio provides a pain-free, portable, object-oriented wrapper around BSD
408 * sockets/WinSock (that nevertheless exposes the native resources like sockets/FDs *if* needed). For a high-
409 * performance event loop, boost.asio gives a flexible *proactor pattern* implementation, supporting arbitrary
410 * callbacks with lambdas or `bind()`, which is much easier and prettier than C-style function pointers with `void*`
411 * arguments a-la old-school HTTP servers written in C. For a complex event-driven loop like this, we'd have
412 * to implement some callback system anyway, and it would likely be far worse than boost.asio's, which is
413 * full-featured and proven through the years.
414 *
415 * To-dos and future features
416 * --------------------------
417 *
418 * @todo Receive UDP datagrams as soon as possible (avoid internal buffer overflow):
419 * The OS UDP net-stack buffers arriving datagrams until they're `recv()`d by the application
420 * layer. This buffer is limited; on my Linux test machine the default appears to buffer ~80 1k datagrams.
421 * With a high sender CWND and high throughput (e.g., over loopback), thread W -- which both reads off UDP
422 * datagrams and handles them, synchronously -- cannot always keep up, and the buffer fills up. This
423 * introduces Flow loss despite the fact that the datagram actually safely got to the destination; and this is
424 * with just ONE sender; in a server situation there could be thousands. In Linux I was able to raise, via
425 * `setsockopt()`, the buffer size to somewhere between 1000 and 2000 1k datagrams. This helps quite a bit.
426 * However, we may still overflow the buffer in busy situations (I have seen it, still with only one
427 * connection). So, the to-do is to solve this problem. See below to-dos for ideas.
428 * WARNING: UDP buffer overflow can be hard to detect and may just look like loss; plus the user not
429 * reading off the Receive buffer quickly enough will also incur similar-looking loss. If there
430 * were a way to detect the total # of bytes or datagrams pending on a socket, that would be cool,
431 * but `sock.available()` (where `sock` is a UDP socket) just gives the size of the first queued datagram.
432 * Congestion control (if effective) should prevent this problem, if it is a steady-state situation (i.e.,
433 * loss or queueing delay resulting from not keeping up with incoming datagrams should decrease CWND).
434 * However, if it happens in bursts due to high CPU use in the environment, then that may not help.
435 * NOTE 1: In practice a Node with many connections is running on a server and thus doesn't
436 * receive that much data but rather sends a lot. This mitigates the UDP-receive-buffer-overflow
437 * problem, as the Node receiving tons of data is more likely to be a client and thus have only one
438 * or two connections. Hence if we can handle a handful of really fast flows without such loss,
439 * we're good. (On other hand, ACKs are also traffic, and server may get a substantial amount of
440 * them. Much testing is needed.) Packet pacing on the sender side may also avoid this loss
441 * problem; on the other hand it may also decrease throughput.
442 * NOTE 2: Queue delay-based congestion control algorithms, such as FastTCP and Vegas, are highly
443 * sensitive to accurate RTT (round trip time) readings. Heavy CPU load can delay the recording of the
444 * "received" time stamp, because we call UDP `recv()` and handle the results all in one thread. Any solution,
445 * such as the dedicated thread proposed below, would _allow_ one to record the time stamp immediately upon receipt
446 * of the packet by the dedicated thread; while W would independently handle everything else. Is that a good
447 * idea though? Maybe not. If the CPU load is such that ACK-receiver-side can't get to the time-stamp-saving,
448 * RTT-measuring step without tricks like doing it immediately upon some low-level datagram receipt hook, then
449 * that CPU-pegged jungle is, in a way, part of the network and should probably be fairly counted as part of
450 * the RTT. So perhaps we should continue to take the RTT time stamp while actually handling the individual
451 * acknowledgments. Instead we should strive to use multi-core resources efficiently, so that the gap between
452 * receipt (on whatever thread) and acknowledgment processing (on whatever thread) is as small as possible.
453 * Then RTT is RTT, but we make it smaller via improved performance. Meanwhile, we hopefully also solve the
454 * original problem (internal kernel buffer overflowing and dropping datagrams).
455 *
456 * @todo Receive UDP datagrams as soon as possible (avoid internal buffer overflow): APPROACH 1 (CO-WINNER!):
457 * One approach is to note that, as of this writing, we call `m_low_lvl_sock.async_wait(wait_read)`;
458 * our handler is called once there is at least 1 message TO read;
459 * and then indeed our handler does read it (and any more messages that may also have arrived).
460 * Well, if we use actual `async_receive()` and an actual buffer instead,
461 * then boost.asio will read 1 (and no more, even if there are more)
462 * message into that buffer and have it ready in the handler. Assuming the mainstream case involves only 1
463 * message being ready, and/or assuming that reading at least 1 message each time ASAP would help significantly,
464 * this may be a good step toward relieving the problem, when it exists. The code becomes a tiny bit less
465 * elegant, but that's negligible. This seems like a no-brainer that should be included in any solution, but it
466 * by itself may not be sufficient, since more than 1 datagram may be waiting, and datagrams 2, 3, ... would
467 * still have to be read off by our code in the handler. So other approaches should still be considered.
468 *
469 * @todo Receive UDP datagrams as soon as possible (avoid internal buffer overflow): APPROACH 2:
470 * To eliminate the problem to the maximum extent possible, we can dedicate its own thread --
471 * call it W2 -- to reading #m_low_lvl_sock. We could also write to #m_low_lvl_sock on W2 (which would also
472 * allow us to use a different util::Task_engine from #m_task_engine to exclusively deal with W2 and
473 * the UDP socket #m_low_lvl_sock -- simpler in some ways that the strand-based solution described below).
474 * There are a couple of problems with this. One, it may delay receive ops due to send ops, which compromises
475 * the goals of this project in the first place. Two, send pacing code is in thread W (and moving it to W2
476 * would be complex and unhelpful); and once the decision to REALLY send a given datagram has been made,
477 * this send should occur right then and there -- queuing that task on W2 may delay it, compromising the
478 * quality of send pacing (whose entire nature is about being precise down to the microsecond or even better).
479 * Therefore, we'd like to keep `m_low_lvl_sock.async_send()` in thread W along with all other work (which
480 * allows vast majority of internal state to be accessed without locking, basically just the Send/Receive
481 * buffers excluded); except the chain of [`m_low_lvl_sock.async_receive()` -> post handler of received datagram
482 * onto thread W; and immediately `m_low_lvl_sock.async_receive()` again] would be on W2.
483 * AS WRITTEN, this is actually hard or impossible to do with boost.asio because of its design: #m_low_lvl_sock
484 * must belong to exactly one `Task_engine` (here, #m_task_engine), whereas to directly schedule a specific
485 * task onto a specific thread (as above design requires) would require separate `Task_engine` objects (1 per
486 * thread): boost.asio guarantees a task will run on *a* thread which is currently executing `run()` --
487 * if 2 threads are executing `run()` on the same service, it is unknown which thread a given task will land
488 * upon, which makes the above design (AS WRITTEN) impossible. (Side note: I'm not sure it's possible in plain C
489 * with BSD sockets either. A naive design, at least, might have W `select()` on `m_low_lvl_sock.native()` for
490 * writability as well other stuff like timers, while W2 `select()`s on same for readability; then the two
491 * threads perform UDP `send()` and `recv()`, respectively, when so instructed by `select()`s. Is it allowed
492 * to use `select()` on the same socket concurrently like that? StackOverflow.com answers are not clear cut, and
493 * to me, at least, it seems somewhat dodgy.) However, an equivalent design IS possible (and supported cleanly by
494 * boost.asio): In addition to the 2 threads, set up 2 strands, S and S2. All work except the #m_low_lvl_sock
495 * reads and posting of the handler onto S will be scheduled with a strand S. All work regarding
496 * #m_low_lvl_sock reads and posting of its handler onto S will be scheduled with a strand S2.
497 * Recall that executing tasks in 1 strand, with 2+ threads executing `run()`, guarantees that no 2+ of
498 * those tasks will execute simultaneously -- always in series. This is actually -- in terms of efficiency
499 * and thread safety -- equivalent to the above W/W2 design. Since the S tasks will always execute serially,
500 * no locking is necessary to prevent concurrent execution; thus what we know today as thread W tasks (which
501 * need no locking against each other) will be equally thread safe; and same holds for the new S2 tasks
502 * (which are considerably simpler and fewer in number). So that's the thread safety aspect; but why is
503 * efficiency guaranteed? The answer becomes clear if one pictures the original thread W/W2 design;
504 * basically little task blocks serially pepper thread W timeline; and same for W2. By doing it with strands
505 * S and S2 (running on top of threads W and W2), the only thing that changes is that the little blocks
506 * might at random be swapped between threads. So the series of tasks T1 -> T2 -> T3 -> T4 meant for
507 * for S might actually jump between W and W2 randomly; but whenever thread W2 is chosen instead of
508 * thread W, that leaves an empty "space" in thread W, which will be used by the S2 task queue if it
509 * needs to do work at the same time. So tasks may go on either thread, but the strands ensure
510 * that both are used with maximum efficiency while following the expected concurrency constraints
511 * (that each strand's tasks are to be executed in series). Note, however, that #m_low_lvl_sock (the
512 * socket object) is not itself safe for concurrent access, so we WILL need a lock to protect the tiny/short
513 * calls `m_low_lvl_sock.async_receive()` and `m_low_lvl_sock.async_send()`: we specifically allow that
514 * a read and write may be scheduled to happen simultaneously, since the two are independent of each
515 * other and supposed to occur as soon as humanly possible (once the desire to perform either one
516 * is expressed by the code -- either in the pacing module in strand S or the read handler in S2).
517 * In terms of nomenclature, if we do this, it'd be more fair to call the threads W1 and W2 (as they
518 * become completely equal in this design). (In general, any further extensions of this nature (if
519 * we want still more mostly-independent task queues to use available processor cores efficiently),
520 * we would add 1 strand and 1 worker thread per each such new queue. So basically there's a thread pool
521 * of N threads for N mostly-independent queues, and N strands are used to use that pool efficiently
522 * without needing to lock any data that are accessed exclusively by at most 1 queue's tasks only.
523 * Resources accessed by 2 or more task queues concurrently would need explicit locking (e.g.,
524 * #m_low_lvl_sock in this design).) So then where we start thread W today, we'd start the thread
525 * pool of 2 threads W1, W2, with each executing `m_task_engine.run()`. Before the run()s execute,
526 * the initial tasks (each wrapped in strand S or S2, as appropriate) need to be posted onto
527 * #m_task_engine; this can even occur in the user thread U in the constructor, before W1 and W2
528 * are created. The destructor would `m_task_engine.stop()` (like today), ending each thread's
529 * `run()` and trigger the imminent end of that thread; at which point destructor can `W1.join()` and `W2.join()`
530 * (today it's just `W.join()`).
531 *
532 * @todo Receive UDP datagrams as soon as possible (avoid internal buffer overflow): APPROACH 3:
533 * Suppose we take APPROACH 1 (no-brainer) plus APPROACH 2. Certain decisions in the latter were made for
534 * certain stated reasons, but let's examine those more closely. First note that the APPROACH 1 part will
535 * ensure that, given a burst of incoming datagrams, the first UDP `recv()` will occur somewhere inside boost.asio,
536 * so that's definitely a success. Furthermore, strand S will invoke `m_low_lvl_sock.async_send()` as soon as
537 * the pacing module decides to do so; if I recall correctly, boost.asio will invoke the UDP `send()` right then
538 * and there, synchronously (at least I wrote that unequivocally in a Node::async_low_lvl_packet_send_impl() comment).
539 * Again, that's as good as we can possibly want. Finally, for messages 2, 3, ... in that incoming datagram burst,
540 * our handler will (indirectly but synchronously) perform the UDP `recv()`s in strand S2. Here we're somewhat
541 * at boost.asio's mercy, but assuming its strand task scheduling is as efficient as possible, it should occur
542 * on the thread that's free, and either W1 or W2 should be free given the rest of the design. Still, boost.asio
543 * docs even say that different strands' tasks are NOT guaranteed to be invoked concurrently (though common
544 * sense implies they will be when possible... but WHAT IF WE'RE WRONG!!!?). Also, we don't know how much
545 * computational overhead is involved in making strands work so nicely (again, hopefully it's well written...
546 * but WHAT IF!!!?). A negative is the small mutex section around the two #m_low_lvl_sock calls; not complex
547 * and probably hardly a performance concern, but still, it's a small cost. Finally, using strands -- while
548 * not ugly -- does involve a bit more code, and one has to be careful not to forget to wrap each handler with
549 * the appropriate strand (there is no compile- or runtime error if we forget!) So what can we change about
550 * APPROACH 2 to avoid those negatives? As stated in that approach's description, we could have thread W
551 * not deal with #m_low_lvl_sock at all; thread W2 would have a separate `Task_engine` handling only
552 * #m_low_lvl_sock (so no mutex needed). W2 would do both sends and receives on the socket; and absolutely
553 * nothing else (to ensure it's as efficient as possible at getting datagrams off the kernel buffer, solving
554 * the original problem). Yes, the receiving would have to share time with the sends, but assuming nothing
555 * else interferes, this feels like not much of a cost (at least compared with all the heavy lifting thread W
556 * does today anyway). Each receive would read off all available messages into raw buffers and pass those
557 * (sans any copying) on to thread W via `post(W)`. The other negative, also already mentioned, is that
558 * once pacing module (in thread W) decides that a datagram should be sent, the `post(W2)` for the task that
559 * would peform the send introduces a delay between the decision and the actual UDP `send()` done by boost.asio.
560 * Thinking about it now, it is questionable to me how much of a cost that really is. Without CPU contention,
561 * we can measure it; I expect it to be quite cheap, but I coudl be wrong. With CPU contention -- which would
562 * have to come from many datagrams arriving at the same time -- I'm not sure. It wouldn't be overly hard to
563 * test; basically flood with UDP traffic over loopback and log the delay between W deciding to send datagram
564 * and W2 calling `m_low_lvl_sock.async_send_to()` (obviously use #Fine_clock!). All in all, if we name
565 * the dedicated thread approach described here as APPROACH 3, then APPROACH 3 is appealingly simpler than
566 * APPROACH 2; and in most ways appears like it'd be at least as efficient and good at solving the original
567 * problem as APPROACH 2. The only danger that worries me is this business with messing up pacing (and no,
568 * moving pacing into W2 just endangers the receiving efficiency and introduces thread safety problems and
569 * complexity) by having it compete with receiving during incoming-traffic-heavy times. Ultimately, I'd
570 * recommend timing this "danger zone" as described a bit earlier (also time delay introduced by `post(W2)`
571 * without any traffic coming in); and if it looks good, do APPROACH 3. Otherwise spring for APPROACH 2.
572 *
573 * @todo Receive UDP datagrams as soon as possible (avoid internal buffer overflow): APPROACH 4:
574 * It now occurs to me how to solve the main questionable part about APPROACH 3. If we fear that the reads and
575 * writes in thread W2 may compete for CPU, especially the reads delaying timing-sensitive paced writes,
576 * then we can eliminate the problem by taking W2's own util::Task_engine (which would be separate from
577 * #m_task_engine) and have two equal threads W2' and W2'' start up and then each call `Task_engine::run()`.
578 * Without having to use any strand, this will essentially (as documented in boost.asio's doc overview)
579 * establish a thread pool of 2 threads and then perform the receive and send tasks at random on whichever
580 * thread(s) is/are available at a particular time. Since there's no strand(s) to worry about, the underlying
581 * overhead in boost.asio is probably small, so there's nothing to fear about efficiency. In fact, in this
582 * case we've now created 3 separate threads W, W2', W2'', all doing things independently of each other, which
583 * is an excellent feature in terms of using multiple cores. Do note we will now need a mutex and very short
584 * critical sections around the calls to `m_low_lvl_sock::async_receive()` and `m_low_lvl_sock::async_send()`,
585 * but as noted before this seems extremely unlikely to have any real cost due to the shortess of critical
586 * sections in both threads. If this is APPROACH 4, then I'd say just time how much absolute delay is
587 * introduced by a `post(W2')` or `post(W2'')` of the async send call compared to directly making such a
588 * call on thread W, as is done today. I suspect it's small, in which case the action is go for
589 * APPROACH 4... finally.
590 *
591 * @todo Receive UDP datagrams as soon as possible (avoid internal buffer overflow): APPROACH 5 (CO-WINNER!):
592 * Actually, the thing I've been dismissing in approaches 2-4, which was to combine the pacing logic with the
593 * actual `m_low_lvl_sock.async_send()` (like today) but in their own dedicated thread, now seems like the best
594 * way to solve the one questionable aspect of APPROACH 4. So, APPROACH 4, but: Move the pacing stuff into
595 * the task queue associated with threads W2' and W2''. So then these 2 threads/cores will be available for
596 * 2 task queues: one for pacing timers + datagram sending over #m_low_lvl_sock (with mutex); the other for
597 * receiving over #m_low_lvl_sock (with same mutex). Now, the only "delay" is moved to before pacing occurs:
598 * whatever minimal time cost exists of adding a queue from thread W to thread W2' or W2'' occurs just before
599 * the pacing logic, after which chances are the datagram will be placed on a pacing queue anyway and sent
600 * off somewhat later; intuitively this is better than the delay occurring between pacing logic and the
601 * actual UDP send. Note, also, that the timing-sensitive pacing logic now gets its own thread/core and should
602 * thus work better vs. today in situations when thread W is doing a lot of work. This is even more logical
603 * than APPROACH 4 in that sense; the pacing and sending are concern 1 and get their own thread (essentially;
604 * really they get either W2' or W2'' for each given task); the receiving is concern 2 and gets its own thread
605 * (same deal); and all the rest is concern 3 and remains in its own thread W (until we can think of ways to
606 * split that into concerns; but that is another to-do). Only one mutex with 2 very small critical sections,
607 * as in APPROACH 4, is used. The only subtlety regarding concurrent data access is in
608 * Node::mark_data_packet_sent(), which is called just before `m_low_lvl_sock.async_send()`, and which
609 * finalizes certain aspects of Peer_socket::Sent_packet::Sent_when; most notably
610 * Peer_socket::Sent_packet::Sent_when::m_sent_time (used in RTT calculation upon ACK receipt later).
611 * This is stored in Peer_socket::m_snd_flying_pkts_by_sent_when, which today is not protected by mutex due
612 * to only being accessed from thread W; and which is extremely frequently accessed. So either we protect
613 * the latter with a mutex (out of the question: it is too frequently accessed and would quite possibly
614 * reduce performance) or something else. Currently I think Node::mark_data_packet_sent() should just
615 * be placed onto #m_task_engine (thread W) via `post()` but perhaps take all or most of the items to
616 * update Sent_when with as arguments, so that they (especially `Sent_when::m_sent_time`) could be determined
617 * in thread W2' or W2'' but written thread-safely in W. (There is no way some other thread W task would mess
618 * with this area of Peer_socket::m_snd_flying_pkts_by_sent_when before the proposed mark_data_packet_sent()
619 * was able to run; thread W had just decided to send that packet over wire in the first place; so there's no
620 * reason to access it until ACK -- much later -- or some kind of socket-wide catastrophe.) All that put
621 * together I dub APPROACH 5. Thus, APPROACH 1 + APPROACH 5 seems like the best idea of all, distilling all
622 * the trade-offs into the the fastest yet close to simplest approach.
623 *
624 * @todo More uniform diagnostic logging: There is much diagnostic logging in the
625 * implementation (FLOW_ERROR*(), etc.), but some of it lacks useful info like `sock` or `serv` (i.e., the
626 * `ostream` representations of Peer_socket and Server_socket objects, which include the UDP/Flow endpoints
627 * involved in each socket). The messages that do include these have to do so explicitly. Provide some
628 * macros to automatically insert this info, then convert the code to use the macros in most places. Note that
629 * existing logging is rather exhaustive, so this is not the biggest of deals but would be nice for ease of coding
630 * (and detailed logging).
631 *
632 * @todo It may be desirable to use not boost.asio's out-of-the-box UDP receive routines but rather extensions
633 * capable of some advanced features, such as `recvmsg()` -- which can obtain kernel receipt time stamps and
634 * destination IP address via the `cmsg` feature. This would tie into various other to-dos listed around here.
635 * There is, as of this writing, also a to-do in the top-level `flow` namespace doc header about bringing some code
636 * into a new `io` namespace/Flow module; this includes the aforementioned `recvmsg()` wrapper.
637 *
638 * @todo It may be desirable to further use `recvmmsg()` for UDP input; this allows to read multiple UDP datagrams
639 * with one call for performance.
640 *
641 * @todo By the same token, wrapping `sendmsg()` and `sendmmsg()` may allow for futher perf and feature
642 * improvements -- in some ways potentially symmetrically to `recvmsg()` and `recvmmsg()` respectively.
643 * However, as of this writing, I (ygoldfel) see this more of an opportunistic "look into it" thing and not something
644 * of active value; whereas `recv[m]msg()` bring actual features we actively desire for practical reasons.
645 *
646 * @todo Send and Receive buffer max sizes: These are set to some constants right now. That's not
647 * optimal. There are two competing factors: throughput and RAM. If buffer is too small, throughput can
648 * suffer in practice, if the Receiver can't read the data fast enough (there are web pages that show this).
649 * Possibly with today's CPUs it's no longer true, but I don't know. If buffer is too large and with a lot of
650 * users, a LOT of RAM can be eaten up (however note that a server will likely be mostly sending, not
651 * receiving, therefore it may need smaller Receive buffers). Therefore, as in Linux 2.6.17+, the buffer
652 * sizes should be adaptively sized. It may be non-trivial to come up with a good heuristic, but we may be
653 * able to copy Linux. The basic idea would probably be to use some total RAM budget and divide it up among
654 * the # of sockets (itself a constant estimate, or adaptive based on the number of sockets at a given time?).
655 * Also, buffer size should be determined on the Receive side; the Send side should make its buffer to be of
656 * equal size. Until we implement some sensible buffer sizing, it might be a good idea (for demo purposes with
657 * few users) to keep the buffers quite large. However, flow control (receive window) is now implemented and
658 * should cope well with momentary Receive buffer exhaustion.
659 * Related facts found on the web: In Linux, since a long time ago, Send buffer size is determined by other
660 * side's Receive buffer size (probably sent over in the SYN or SYN-ACK as the receive window). Also, in
661 * older Linuxes, Receive buffer defaults to 128k but can be manually set. Supposedly the default can lead to
662 * low throughput in high-speed (gigabit+) situations. Thus Linux 2.6.17+ apparently made the Receive buffer
663 * size adaptive.
664 *
665 * @todo Drop Acknowledgments: DCCP, a somewhat similar UDP-based protocol, uses the concept of
666 * Data-Dropped acknowledgments. If a packet gets to the receiver, but the receiver is forced to drop it (for
667 * example, no Receive buffer space; not sure if there are other reasons in Flow protocol), then the sender will only
668 * find out about this by inferring the drop via Drop Timeout or getting acknowledgments for later data. That
669 * may take a while, and since receiver-side drops can be quite large, it would be more constructive for the
670 * receive to send an un-ACK of sorts: a Data-Dropped packet informing the sender that specific data were
671 * dropped. The sender can then update his congestion state (and retransmit if we enable that). See RFC 4340
672 * and 4341.
673 *
674 * @todo Add extra-thread-safe convention for setting options: We can provide a thread-safe (against other user
675 * threads doing the same thing) macro to set a given option. set_options() has a documented, albeit in
676 * practice not usually truly problematic, thread safety flaw if one calls options(), modifies the result,
677 * then calls set_options(). Since another thread may modify the Node's options between the two calls, the
678 * latter call may unintentionally revert an option's value. Macro would take an option "name" (identifier
679 * for the Node_options member), a Node, and a target value for the option and would work together with a
680 * helper method template to obtain the necessary lock, make the assignment to the internal option, and give
681 * up the lock. The implementation would probably require Node to expose its internal stored Node_options
682 * for the benefit of this macro only. Anyway, this feature is not super-important, as long as the user is
683 * aware that modifying options from multiple threads simultaneously may result in incorrect settings being
684 * applied.
685 *
686 * @todo The preceding to-do regarding Node_options applies to Peer_socket_options stored in Peer_socket in
687 * in an analogous way.
688 *
689 * @todo Consider removing Flow ports and even Server_socket:
690 * As explained above, we add the concept of a large set of available Flow ports within each
691 * Node, and each Node itself has a UDP port all to itself. So, for example, I could bind a Node to UDP
692 * port 1010, and within that listen() on Flow ports 1010 (unrelated to UDP port 1010!) and 1011. In
693 * retrospect, though, is that complexity necessary? We could save quite a few lines of code, particularly in
694 * the implementation (class Port_space, for example) and the protocol (extra bytes for Flow source and target
695 * ports, for example). (They're fun and pretty lines, but the absence of lines is arguably even prettier
696 * albeit less fun. On the other hand, bugs aren't fun, and more code implies a higher probability of bugs,
697 * maintenance errors, etc.) The interface would also be a bit simpler; and not just due to fewer items in
698 * Remote_endpoint (which would in fact reduce to util::Udp_endpoint and cease to exist). Consider Server_socket;
699 * currently listen() takes a #flow_port_t argument and returns a Server_socket which is listening; calling
700 * accept() (etc.) on the latter yields Peer_socket, as the other side connects. Without Flow ports, there is
701 * no argument to listen(); in fact, Server_socket itself is not strictly necessary and could be folded into
702 * Node, with listen() becoming essentially something that turns on the "are we listening?" Boolean state,
703 * while stop_listening() would turn it off (instead of something like `Server_socket::close()`). (Important
704 * note: That was not an endorsement of removing Server_socket. Arguably it is still a nice abstraction.
705 * Removing it would certainly remove some boiler-plate machinery to do with Server_socket's life cycle, on
706 * the other hand. Perhaps it's best to take a two-step appraoch; remove Flow ports first; then after a long
707 * time, assuming it becomes clear that nothing like them is going to come back, remove Server_socket as
708 * well.) A key question is, of course what would we lose? At first glance, Flow port allows multiple
709 * connections on a single UDP-port-taking Flow server, including multiple connections from one client (e.g.,
710 * with differing connection parameters such as reliability levels among the different connections, or
711 * "channels")... but actually that'd still work without Flow ports, assuming the "one client's" multiple
712 * connections can bind to different (presumably ephemeral) UDP ports; since the tuple (source host, source
713 * UDP port) is still enough to distinguish from the 2+ "channels" of the same "client" connecting to the one
714 * Flow Node (vs. today's tuple: source host, source UDP port, source Flow port, destination Flow port; see
715 * `struct` Socket_id). However, without Flow ports, it is not possible for one Node to connect to another
716 * Node twice, as each Node by definition is on one port. Is this important? Maybe, maybe not; for NAT
717 * purposes it can be important to use only 1 port; but that typically applies only to the server, while the
718 * client can send packets from anywhere. However, gaming applications can be very demanding and for the most
719 * restrictive NAT types might desire only a single port used on both sides. So whether to remove Flow ports
720 * is somewhat questionable, now that they exist; but arguably they didn't need to be added in the first
721 * place, until they were truly needed. I'd probably leave them alone, since they do exist.
722 *
723 * @todo Multi-core/multi-threading: The existing implementation already has a nice multi-threaded property,
724 * namely that each Node (object that binds to a single UDP endpoint/port) is entirely independent of any other
725 * such object -- they have entirely separate data, and each one does all its work on a separate thread.
726 * So to make use of multiple cores/processors, one can set up multiple Node objects. (Obviously this only makes
727 * sense for apps where using multiple ports is acceptable or even desired. E.g., a server could listen
728 * on N different UDP ports, where N=# of cores.) However, it would be nice for a single Node to be as
729 * multi-core/processor-friendly as possible. This is partially addressed by the "Dedicated thread to receive
730 * UDP datagrams ASAP" to-do item elsewhere. We could go further. boost.asio lets one easily go from
731 * 1 thread to multiple threads by simply starting more worker threads like W (W1, W2, ...) and executing
732 * `m_task_engine::run()` in each one -- note that #m_task_engine is shared (sans lock). Then subsequent
733 * handlers (timer-fired handlers, ack-received handlers, data-received handlers, and many more) would be
734 * assigned evenly to available threads currently executing run(). However, then all data these handlers
735 * access would need to be protected by a mutex or mutexes, which would be a significant increase in
736 * complexity and maintenance headaches compared to existing code, which features mutexes for data accessed
737 * both by W and the user thread(s) U -- which excludes most Node, Server_socket, Peer_socket state --
738 * essentially the user-visible "state" enums, and the Receive and Send buffers; but hugely complex things
739 * like the scoreboards, etc. etc., needed no mutex protection, but with this change they would need it.
740 * Actually, if the implementation essentially uses one mutex M, and every handler locks it for the entirety
741 * of its execution, then one isn't really making use of multi-cores/etc. anyway. One could make use of
742 * boost.asio "strands" to avoid the need for the mutex -- just wrap every handler in a shared strand S,
743 * and no locking is needed; boost.asio will never execute two handlers simultaneously in different threads.
744 * While this would arguably make the code simpler, but in terms of performance it wouldn't make any
745 * difference anyway, as it is functionally equivalent to the lock-M-around-every-operation solution (in
746 * fact, internally, it might even amount to exactly that anyway). So that's probably not worth it.
747 * We need to have more mutexes or strands, based on some other criterion/criteria. After a datagram is demuxed,
748 * vast majority of work is done on a particular socket independently of all others. Therefore we could
749 * add a mutex (or use an equivalent) into the socket object and then lock on that mutex. Multiple
750 * threads could then concurrently handle multiple sockets. However, for this to be used, one would have to
751 * use a single Node (UDP endpoint) with multiple sockets at the same time. Without any changes at all, one can
752 * get the same concurrency by instead setting up multiple Node objects. Other than a bit of lost syntactic sugar
753 * (arguably) -- multiple Node objects needed, each one having to initialize with its own set of options,
754 * for example -- this is particularly cost-free on the client side, as each Node can just use its own ephemeral
755 * UDP port. On the server side the network architecture has to allow for multiple non-ephemeral ports, and
756 * the client must know to (perhaps randomly) connect to one of N UDP ports/endpoints on the server, which is
757 * more restrictive than on the client. So perhaps there are some reasons to add the per-socket concurrency -- but
758 * I would not put a high priority on it. IMPORTANT UPDATE: Long after the preceding text was written, flow::async
759 * Flow module was created containing flow::async::Concurrent_task_loop interface. That looks at the general
760 * problem of multi-tasking thread pools and what's the user-friendliest-yet-most-powerful way of doing it.
761 * While the preceding discussion in this to-do has been left unchanged, one must first familiarize self with
762 * flow::async; and *then* read the above, both because some of those older ideas might need reevaluation; and because
763 * some of those ideas may have been implemented by flow::async and are now available easily.
764 *
765 * @todo In Node::low_lvl_packet_sent(), the UDP `async_send()` handler, there is an inline to-do about specially
766 * treating the corner case of the `would_block` and `try_again` boost.asio errors being reported (a/k/a POSIX
767 * `EAGAIN`/`EWOULDBLOCK`). Please see that inline comment for details.
768 *
769 * @todo Class Node `private` section is very large. It's so large that the implementations of the methods therein
770 * are split up among different files such as `flow_peer_socket.cpp`, `flow_event_set.cpp`, `flow_low_lvl_io.cpp`, etc.
771 * Consider the following scheme to both better express this separation as well as enforce which of a given method
772 * group's method(s) are meant to be called by outside code vs. being helpers thereof: Introduce `static`-method-only
773 * inner classes (and, conceivably, even classes within those classes) to enforce this grouping (`public` methods
774 * and `private` methods enforcing what is a "public" helper vs. a helper's helper).
775 *
776 * @todo Make use of flow::async::Concurrent_task_loop or flow::async::Single_thread_task_loop, instead of manually
777 * setting up a thread and util::Task_engine, for #m_worker. I, Yuri, wrote the constructor, worker_run(), destructor,
778 * and related setup/teardown code as my very first boost.asio activity ever. It's solid, but flow::async just makes
779 * it easier and more elegant to read/maintain; plus this would increase Flow-wide consistency. It would almost
780 * certainly reduce the number of methods and, even nicely, state (such as #m_event_loop_ready).
781 *
782 * Misc. topic: Doxygen markup within a Doxygen command
783 * ----------------------------------------------------
784 * This section may seem random: Indeed, it is the meat of a similarly named ("Doxygen markup," etc.) subsection of
785 * the doc header on the very-general namespace ::flow. As noted in that subsection, this Node class is a more
786 * convenient place to explain this, because it is a large class with many examples available. Without further ado:
787 *
788 * ::flow doc header discusses which items should be accompanied by Doxygen comment snippets. More specifically, each
789 * item is accompanied by a Doxygen "command". `"@param param_name Explain parameter here."`, for example, documents
790 * the parameter `param_name` with the text following that. (Some commands are implicit; namely without
791 * an explicit `"@brief"`, the first sentence is the brief description of the class/function/whatever.)
792 *
793 * However, all that doesn't talk about formatting the *insides* of paragraphs in these commands. Essentially
794 * we are saying just use English. However, Doxygen uses certain markup language conventions when interpreting
795 * those paragraphs. For example `backticks` will turn the thing inside the ticks into an inline code snippet,
796 * in fixed-width font. There are a few things to watch out for with this:
797 *
798 * - Don't accidentally enable markup when you don't mean to. E.g., an * asterisk as the first character
799 * in a paragraph will cause a bullet point to appear. Also, sometimes you do want to
800 * use some character for semantical reason X which Doxygen shares with you, but automatic markup
801 * handling might make it come out a little wrong. Just learn these through practice
802 * and check over the generated web page(s) before checking in the code.
803 * - DO use markup within reason for certain COMMON items. Do not overdo it: mostly it should be things
804 * you're keen to do even if there were NO Doxygen or Javadoc involved. Bullet point lists are an example.
805 * Basically: If you were going to do something anyway, why not have it come out nicely in the doc page(s)?
806 * - Make use of auto-linking (a/k/a automatic liwnk generation) liberally. This is when Doxygen sees a certain
807 * pattern within a Doxygen comment and understands it a reference to some other object, like a class or
808 * method; so in the output this will come out as a link to that thing. The nice thing is that, usually,
809 * within raw code it looks fine/normal; AND the generated page has the convenient linking functionality.
810 * However, if enabling linking in a certain case is too tedious, feel free to not.
811 *
812 * That said, I will now list all of the pieces of markup that are allowed within comments inside Flow code.
813 * Try not to add to this list without a very good reason. Simplicity is a virtue.
814 *
815 * - Bullet points: Just a dash after one indent level: `" - Item text."`. Nesting allowed.
816 * - Numbered points: just type out the numbers explicitly instead of auto-numbering: `" 2. Item text."`; not
817 * `" -# Item text."`. Yes, it leaves the numbering to you, but more importantly the raw comment remains
818 * readable, and you can refer to the numbers (e.g., "according to the condition in item 3" makes more sense
819 * when you can see a `3.` nearby).
820 * - Emphasis: Just one asterisk before and after the emphasized stuff: *word*, *multiple words*. No "_underscores_"
821 * please. In general try to avoid too much emphasis, as asterisks are common characters and can confuse
822 * Doxygen. Plus, you shouldn't need to emphasize stuff THAT much. Plus, feel free to use CAPITALS to emphasize
823 * instead.
824 * - Inline code snippets: Backticks. `single_word`, `an_expression != other_expression * 2`. Definitely use
825 * this liberally: it helps readability of BOTH raw code and looks delightful in the generated web page(s).
826 * However, note explanation below regarding how this relates to auto-linking.
827 * - Syntax-highlighted code spans: Use three tildes `"~~~"` to begin and end a code snippet. This MUST be
828 * used for multi-line code snippets; and CAN be used instead of `backticks` for single-line code snippets.
829 * The output will be a separate paragraph, just like the raw code should be. More precisely, the tildes
830 * and code should follow a single absolute indentation level:
831 *
832 * ~~~
833 * if (some_condition) // Generated output will also be syntax-highlighted.
834 * {
835 * obj.call_it(arg1, "quote");
836 * return false;
837 * }
838 * ~~~
839 *
840 * - Large heading in a long doc header: Use the format seen in this comment itself: Words, underlined by a
841 * row of dashes ("----", etc.) on the next line. This results in a fairly large-fonted title.
842 * - Small heading is a long doc header: IF AND ONLY IF you need a sub-heading under a large heading
843 * (which would probably be in only quite long doc headers indeed), use the ### format. Again use the
844 * format seen in this very doc header. This results in a slightly large-fonted title (pretty close to
845 * normal).
846 * - Avoid any other levels of heading. At that point things have become too complex.
847 * - Escape from Doxygen formatting: To ensure Doxygen interprets a bunch of characters literally, when you
848 * know there is danger of it applying unwanted formatting, surround it in quotes. The quotes will
849 * be included in the output just like in the raw code; but anything inside quotes will be output verbatim
850 * even if full of Doxygen markup or commands. For example, if I don't want a to-do item to begin in
851 * the middle of this paragraph, but I do want to refer to how a to-do is declared is Doxygen comments,
852 * I will surround it in quotes: To declare a to-do, use the `"@todo"` command. Note that in that example
853 * I put `backticks` around the text to format the whole thing a certain way; any formatting already in
854 * effect will continue through the "quoted" text; but no formatting inside the "quotes" will go in effect.
855 * Plus, it looks appropriate in raw code. Best of all worlds.
856 *
857 * The most tricky yet powerful technique to learn here is the interplay between auto-linking and `inline code
858 * snippets`. Before discussing that in some detail, note the auto-linking which is allowed in this source
859 * code:
860 *
861 * - Class/`struct`/union names are auto-linked: for example, just Peer_socket. That's because every
862 * class/`struct`/union for us starts with a Capital letter. Easy!
863 * - A method local to the class/`struct` being documented, like running() in this class Node which we are
864 * documenting right now, is auto-linked as written.
865 * - ANY member explicitly specified as belonging to a class/`struct`/union or namespace is
866 * auto-linked as written. It can be a member function, variable, alias, nested class, or *anything* else.
867 * The presence of `"::"` will auto-link whatever it is. Note this is a very powerful auto-linking technique;
868 * the vast majority of things are members of something, even if's merely a namespace, so if you absolutely must
869 * auto-link something, there is always at least one straightforward way: a *fully or partially qualified name*.
870 * It will be simple/readable as raw source and equally simple/readable AND linked in the doc output. The only
871 * *possible* (not really probable, but it certainly happens) down-side is it can be too verbose.
872 * - Example: alias-type: Drop_timer::timer_wait_id_t; method: Port_space::return_port().
873 * - Macros are not members of anything and thus cannot be auto-linked by qualifying them. However, read below
874 * on how to auto-link them too.
875 * - A free (non-member) function or functional macro will generally auto-link, even if it's in some other namespace.
876 * - This can result in name collisions, if some function `f()` is in two namespaces meaning two entirely
877 * different things. And not just functions but anything else, like classes, can thus collide.
878 * That is, after all, why namespaces exist! Just be careful and qualify things with namespace paths
879 * when needed (or even just for clarity).
880 * - For non-functions/methods: Things like variables/constants, type aliases, `enum` values will not auto-link
881 * if seen "naked." For example S_PORT_ANY is, to Doxygen, just a word. We use either `"#"` or `"::"` to force
882 * auto-linking. Here is how to decide which one to use:
883 * - `"#id"`: To refer to a *member* of anything (compound type, namespace) member named `id`,
884 * such that the *currently documented item* is also a member of that [anything] -- either at the same depth
885 * (e.g., in the same class) or deeper (e.g., `id` is in namespace `flow`, while we are in namespace
886 * `flow::net_flow`, or in class `flow::Some_class`, or both -- `flow::net_flow::Some_class`).
887 * Example: #Udp_socket (member alias), #S_NUM_PORTS (member constant), #m_low_lvl_sock (member variable).
888 * - `"::id"`: To refer to an item in the global namespace. Almost always, this will be an (outer) namespace.
889 * Global-namespace members that are not themselves namespaces are strongly discouraged elsewhere.
890 * Example: ::flow (namespace).
891 * - A functional macro is formatted the same as a free function in global namespace: e.g., FLOW_LOG_WARNING().
892 * - If a non-functional macro needs to be documented (VERY rare or non-existent given our coding style), use
893 * this special format: `"#MACRO_NAME"`. `"::"` is inappropriate, since a macro does not belong to a namespace
894 * (global or otherwise), and that would look confusing in raw code.
895 *
896 * Now finally here are the guidelines about what to use: `backticks`, an auto-linked symbol, or both.
897 * Suppose there is some comment *snippet* X that you are considering how to format.
898 *
899 * - If X is just a piece of English language and not referring to or quoting code per se, then do not format
900 * it. Type it verbatim: "The socket's state is ESTABLISHED here." Even though ESTABLISHED may be thought
901 * of as code, here it's referring more to a concept (the state "ESTABLISHED") rather than code snippet.
902 * `S_ESTABLISHED` is a different story, on the other hand, and that one you must either backtick (as I just
903 * did there) or auto-link; read on for guidelines on that.
904 * - If the *entirety* of X is an identifier:
905 * - Auto-link it, WITHOUT backticks, if to auto-link it you would just write X verbatim anyway.
906 * For example, mentioning Peer_socket just like that will auto-link it. So, that's great. Do NOT
907 * add backticks, as that increases code verbosity and adds very little (making the auto-linked `Peer_socket`
908 * also use a fixed-width font; meh).
909 * - Auto-link it, WITHOUT backticks, if you would like the convenience of it being auto-linked in the output.
910 * - Do NOT auto-link it, but DO add `backticks`, if you do not need the convenience of the auto-linked output.
911 * Backticks are easy: auto-linking can be a little tricky/verbose. So in that case just `backtick` it
912 * for readable raw source AND pretty output; without worrying about subtleties of proper auto-linking.
913 * - If X consists of some identifiers but also contains non-identifiers:
914 * - The non-code parts should be verbatim.
915 * - ALL code parts should be in `backticks`.
916 * - IF you want the convenience of some parts of the output being auto-linked, auto-link those parts.
917 * - IF you'd prefer shorter and clearer raw code, then don't auto-link where doing so would require extra
918 * raw code characters.
919 * - Example: Suppose X is: "The allowed range is [S_FIRST_SERVICE_PORT + 1, S_FIRST_EPHEMERAL_PORT + 2)."
920 * Variant 1 will auto-link but a bit longer and less readable as raw code. Variant 2 will forego auto-linking
921 * but is short and readable as raw code.
922 * - *Variant 1*: The allowed range is [`#S_FIRST_SERVICE_PORT + 1`, `#S_FIRST_EPHEMERAL_PORT + 2`).
923 * - *Variant 2*: The allowed range is [`S_FIRST_SERVICE_PORT + 1`, `S_FIRST_EPHEMERAL_PORT + 2`).
924 * - Example: Suppose X is: "The condition holds if sock->m_local_port != 2223." Variant 1 is brief and readable.
925 * Variant 2 is readable enough but much longer. However, it will very conveniently auto-link to
926 * that obscure data member for the web page reader's pleasure, the convenience of which shouldn't be dismissed.
927 * - *Variant 1*: The condition holds if `sock->m_local_port != 2223`.
928 * - *Variant 2*: The condition holds if `Peer_socket::m_local_port != 2223` (for `sock`).
929 */
930class Node :
932 public log::Log_context,
933 private boost::noncopyable
934{
935public:
936 // Constants.
937
938 /// Total number of Flow ports in the port space, including #S_PORT_ANY.
939 static const size_t& S_NUM_PORTS;
940
941 /// Total number of Flow "service" ports (ones that can be reserved by number with Node::listen()).
942 static const size_t& S_NUM_SERVICE_PORTS;
943
944 /**
945 * Total number of Flow "ephemeral" ports (ones reserved locally at random with
946 * `Node::listen(S_PORT_ANY)` or Node::connect()).
947 */
948 static const size_t& S_NUM_EPHEMERAL_PORTS;
949
950 /**
951 * The port number of the lowest service port, making the range of service ports
952 * [#S_FIRST_SERVICE_PORT, #S_FIRST_SERVICE_PORT + #S_NUM_SERVICE_PORTS - 1].
953 */
955
956 /**
957 * The port number of the lowest ephemeral Flow port, making the range of ephemeral ports
958 * [#S_FIRST_EPHEMERAL_PORT, #S_FIRST_EPHEMERAL_PORT + #S_NUM_EPHEMERAL_PORTS - 1].
959 */
961
962 // Constructors/destructor.
963
964 /**
965 * Constructs Node.
966 * Post-condition: Node ready for arbitrary use. (Internally this includes asynchronously
967 * waiting for any incoming UDP packets on the given endpoint.)
968 *
969 * Does not block. After exiting this constructor, running() can be used to determine whether
970 * Node initialized or failed to do so; or one can get this from `*err_code`.
971 *
972 * ### Potential shared use of `Logger *logger` ###
973 * All logging, both in this thread (from which the constructor executes) and any potential internally
974 * spawned threads, by this Node and all objects created through it (directly
975 * or otherwise) will be through this Logger. `*logger` may have been used or not used
976 * for any purpose whatsoever prior to this constructor call. However, from now on,
977 * Node will assume that `*logger` will be in exclusive use by this Node and no other code until
978 * destruction. It is strongly recommended that all code refrains from further use of
979 * `*logger` until the destructor ~Node() exits. Otherwise, quality of this Node's logging (until destruction)
980 * may be lowered in undefined fashion except for the following formal guarantees: the output will not
981 * be corrupted from unsafe concurrent logging; and the current thread's nickname (for logging purposes only) will
982 * not be changed at any point. Less formally, interleaved or concurrent use of the same Logger might
983 * result in such things as formatters from Node log calls affecting output of your log calls or vice versa.
984 * Just don't, and it'll look good.
985 *
986 * @param low_lvl_endpoint
987 * The UDP endpoint (IP address and UDP port) which will be used for receiving incoming and
988 * sending outgoing Flow traffic in this Node.
989 * E.g.: `Udp_endpoint(Ip_address_v4::any(), 1234)` // UDP port 1234 on all IPv4 interfaces.
990 * @param logger
991 * The Logger implementation through which all logging from this Node will run.
992 * See notes on logger ownership above.
993 * @param net_env_sim
994 * Network environment simulator to use to simulate (fake) external network conditions
995 * inside the code, e.g., for testing. If 0, no such simulation will occur. Otherwise the
996 * code will add conditions such as loss and latency (in addition to any present naturally)
997 * and will take ownership of the the passed in pointer (meaning, we will `delete` as we see fit;
998 * and you must never do so from now on).
999 * @param err_code
1000 * See flow::Error_code docs for error reporting semantics. error::Code generated:
1001 * error::Code::S_NODE_NOT_RUNNING (Node failed to initialize),
1002 * error::Code::S_OPTION_CHECK_FAILED.
1003 * @param opts
1004 * The low-level per-Node options to use. The default uses reasonable values that
1005 * normally need not be changed. No reference to opts is saved; it is only copied.
1006 * See also Node::set_options(), Node::options(), Node::listen(), Node::connect(),
1007 * Peer_socket::set_options(), Peer_socket::options().
1008 */
1009 explicit Node(log::Logger* logger, const util::Udp_endpoint& low_lvl_endpoint,
1010 Net_env_simulator* net_env_sim = 0, Error_code* err_code = 0,
1011 const Node_options& opts = Node_options());
1012
1013 /**
1014 * Destroys Node. Closes all Peer_socket objects as if by `sock->close_abruptly()`. Then closes all
1015 * Server_socket objects Then closes all Event_set objects as if by `event_set->close()`.
1016 * @todo Server_socket objects closed as if by what?
1017 *
1018 * Frees all resources except the objects still shared by `shared_ptr<>`s returned to the Node
1019 * user. All `shared_ptr<>` instances inside Node sharing the latter objects are, however,
1020 * eliminated. Therefore any such object will be deleted the moment the user also eliminates all
1021 * her `shared_ptr<>` instances sharing that same object; any object for which that is already the
1022 * case is deleted immediately.
1023 *
1024 * Does not block.
1025 *
1026 * Note: as a corollary of the fact this acts as if `{Peer|Server_}socket::close_abruptly()` and
1027 * Event_set::close(), in that order, were called, all event waits on the closed
1028 * sockets (`sync_send()`, `sync_receive()`, `sync_accept()`, Event_set::sync_wait(),
1029 * Event_set::async_wait()) will execute their on-event behavior (`sync_send()` return,
1030 * `sync_receive()` return, `sync_accept()` return, `sync_wait()` return and invoke handler, respectively).
1031 * Since Event_set::close() is executed soon after the sockets close, those Event_set objects are
1032 * cleared. Therefore, the user on-event behavior handling may find that, despite a given
1033 * event firing, the containing Event_set is empty; or they may win the race and see an Event_set
1034 * with a bunch of `S_CLOSED` sockets. Either way, no work is possible on these sockets.
1035 *
1036 * Rationale for previous paragraph: We want to wake up any threads or event loops waiting on
1037 * these sockets, so they don't sit around while the underlying Node is long since destroyed. On
1038 * the other hand, we want to free any resources we own (including socket handles inside
1039 * Event_set). This solution satisfies both desires. It does add a bit of non-determinism
1040 * (easily handled by the user: any socket in the Event_set, even if user wins the race, will be
1041 * `S_CLOSED` anyway). However it introduces no actual thread safety problems (corruption, etc.).
1042 *
1043 * @todo Provide another method to shut down everything gracefully?
1044 */
1045 ~Node() override;
1046
1047 // Methods.
1048
1049 /**
1050 * Returns `true` if and only if the Node is operating. If not, all attempts to use this object or
1051 * any objects generated by this object (Peer_socket::Ptr, etc.) will result in error.
1052 * @return Ditto.
1053 */
1054 bool running() const;
1055
1056 /**
1057 * Return the UDP endpoint (IP address and UDP port) which will be used for receiving incoming and
1058 * sending outgoing Flow traffic in this Node. This is similar to to the value passed to the
1059 * Node constructor, except that it represents the actual bound address and port (e.g., if you
1060 * chose 0 as the port, the value returned here will contain the actual emphemeral port randomly chosen by
1061 * the OS).
1062 *
1063 * If `!running()`, this equals Udp_endpoint(). The logical value of the returned util::Udp_endpoint
1064 * never changes over the lifetime of the Node.
1065 *
1066 * @return See above. Note that it is a reference.
1067 */
1069
1070 /**
1071 * Initiates an active connect to the specified remote Flow server. Returns a safe pointer to a
1072 * new Peer_socket. The socket's state will be some substate of `S_OPEN` at least initially. The
1073 * connection operation, involving network traffic, will be performed asynchronously.
1074 *
1075 * One can treat the resulting socket as already connected; its Writable and Readable status can
1076 * be determined; once Readable or Writable one can receive or send, respectively.
1077 *
1078 * Port selection: An available local Flow port will be chosen and will be available for
1079 * information purposes via sock->local_port(), where `sock` is the returned socket. The port will
1080 * be in the range [Node::S_FIRST_EPHEMERAL_PORT, Node::S_FIRST_EPHEMERAL_PORT +
1081 * Node::S_NUM_EPHEMERAL_PORTS - 1]. Note that there is no overlap between that range and the
1082 * range [Node::S_FIRST_SERVICE_PORT, Node::S_FIRST_SERVICE_PORT + Node::S_NUM_SERVICE_PORTS - 1].
1083 *
1084 * @param to
1085 * The remote Flow port to which to connect.
1086 * @param err_code
1087 * See flow::Error_code docs for error reporting semantics. error::Code generated:
1088 * error::Code::S_OUT_OF_PORTS, error::Code::S_INTERNAL_ERROR_PORT_COLLISION,
1089 * error::Code::S_OPTION_CHECK_FAILED.
1090 * @param opts
1091 * The low-level per-Peer_socket options to use in the new socket.
1092 * If null (typical), the per-socket options template in Node::options() is used.
1093 * If not null, the given per-socket options are first validated and, if valid, used.
1094 * If invalid, it is an error. See also Peer_socket::set_options(),
1095 * Peer_socket::options().
1096 * @return Shared pointer to Peer_socket, which is in the `S_OPEN` main state; or null pointer,
1097 * indicating an error.
1098 */
1099 Peer_socket::Ptr connect(const Remote_endpoint& to, Error_code* err_code = 0,
1100 const Peer_socket_options* opts = 0);
1101
1102 /**
1103 * Same as connect() but sends, as part of the connection handshake, the user-supplied metadata,
1104 * which the other side can access via Peer_socket::get_connect_metadata() after accepting the
1105 * connection.
1106 *
1107 * @note It is up to the user to serialize the metadata portably. One recommended convention is to
1108 * use `boost::endian::native_to_little()` (and similar) before connecting; and
1109 * on the other side use the reverse (`boost::endian::little_to_native()`) before using the value.
1110 * Packet dumps will show a flipped (little-endian) representation, while with most platforms the conversion
1111 * will be a no-op at compile time. Alternatively use `native_to_big()` and vice-versa.
1112 * @note Why provide this metadata facility? After all, they could just send the data upon connection via
1113 * send()/receive()/etc. Answers: Firstly, this is guaranteed to be delivered (assuming successful
1114 * connection), even if reliability (such as via retransmission) is disabled in socket options (opts
1115 * argument). For example, if a reliability mechanism (such as FEC) is built on top of the Flow layer,
1116 * parameters having to do with configuring that reliability mechanism can be bootstrapped reliably
1117 * using this mechanism. Secondly, it can be quite convenient (albeit not irreplaceably so) for
1118 * connection-authenticating techniques like security tokens known by both sides.
1119 * @param to
1120 * See connect().
1121 * @param serialized_metadata
1122 * Data copied and sent to the other side during the connection establishment. The other side can get
1123 * equal data using Peer_socket::get_connect_metadata(). The returned socket `sock` also stores it; it's
1124 * similarly accessible via sock->get_connect_metadata() on this side.
1125 * The metadata must fit into a single low-level packet; otherwise
1126 * error::Code::S_CONN_METADATA_TOO_LARGE error is returned.
1127 * @param err_code
1128 * See connect(). Added error: error::Code::S_CONN_METADATA_TOO_LARGE.
1129 * @param opts
1130 * See connect().
1131 * @return See connect().
1132 */
1134 const boost::asio::const_buffer& serialized_metadata,
1135 Error_code* err_code = 0,
1136 const Peer_socket_options* opts = 0);
1137
1138 /**
1139 * The blocking (synchronous) version of connect(). Acts just like connect() but instead of
1140 * returning a connecting socket immediately, waits until the initial handshake either succeeds or
1141 * fails, and then returns the socket or null, respectively. Additionally, you can specify a
1142 * timeout; if the connection is not successful by this time, the connection attempt is aborted
1143 * and null is returned.
1144 *
1145 * Note that there is always a built-in Flow protocol connect timeout that is mandatory
1146 * and will report an error if it expires; but it may be too long for your purposes, so you can
1147 * specify your own that may expire before it. The two timeouts should be thought of as fundamentally
1148 * independent (built-in one is in the lower level of Flow protocol; the one you provide is at the application
1149 * layer), so don't make assumptions about Flow's behavior and set a timeout if you know you need one -- even
1150 * if in practice it is longer than the Flow one (which as of this writing can be controlled via socket option).
1151 *
1152 * The following are the possible outcomes:
1153 * 1. Connection succeeds before the given timeout expires (or succeeds, if no timeout given).
1154 * Socket is at least Writable at time of return. The new socket is returned, no error is
1155 * returned via `*err_code`.
1156 * 2. Connection fails before the given timeout expires (or fails, if no timeout given). null
1157 * is returned, `*err_code` is set to reason for connection failure. (Note that a built-in
1158 * handshake timeout -- NOT the given user timeout, if any -- falls under this category.)
1159 * `*err_code == error::Code::S_WAIT_INTERRUPTED` means the wait was interrupted (similarly to POSIX's `EINTR`).
1160 * 3. A user timeout is given, and the connection does not succeed before it expires. null is
1161 * returned, and `*err_code` is set to error::Code::S_WAIT_USER_TIMEOUT.
1162 * (Rationale: consistent with Server_socket::sync_accept(),
1163 * Peer_socket::sync_receive(), Peer_socket::sync_send() behavior.)
1164 *
1165 * Tip: Typical types you might use for `max_wait`: `boost::chrono::milliseconds`,
1166 * `boost::chrono::seconds`, `boost::chrono::high_resolution_clock::duration`.
1167 *
1168 * @tparam Rep
1169 * See `boost::chrono::duration` documentation (and see above tip).
1170 * @tparam Period
1171 * See `boost::chrono::duration` documentation (and see above tip).
1172 * @param to
1173 * See connect().
1174 * @param max_wait
1175 * The maximum amount of time from now to wait before giving up on the wait and returning.
1176 * `"duration<Rep, Period>::max()"` will eliminate the time limit and cause indefinite wait
1177 * -- however, not really, as there is a built-in connection timeout that will expire.
1178 * @param err_code
1179 * See flow::Error_code docs for error reporting semantics. error::Code generated:
1180 * error::Code::S_WAIT_INTERRUPTED, error::Code::S_WAIT_USER_TIMEOUT, error::Code::S_NODE_NOT_RUNNING,
1181 * error::Code::S_CANNOT_CONNECT_TO_IP_ANY, error::Code::S_OUT_OF_PORTS,
1182 * error::Code::S_INTERNAL_ERROR_PORT_COLLISION,
1183 * error::Code::S_CONN_TIMEOUT, error::Code::S_CONN_REFUSED,
1184 * error::Code::S_CONN_RESET_BY_OTHER_SIDE, error::Code::S_NODE_SHUTTING_DOWN,
1185 * error::Code::S_OPTION_CHECK_FAILED.
1186 * @param opts
1187 * See connect().
1188 * @return See connect().
1189 */
1190 template<typename Rep, typename Period>
1191 Peer_socket::Ptr sync_connect(const Remote_endpoint& to, const boost::chrono::duration<Rep, Period>& max_wait,
1192 Error_code* err_code = 0,
1193 const Peer_socket_options* opts = 0);
1194
1195 /**
1196 * A combination of sync_connect() and connect_with_metadata() (blocking connect, with supplied
1197 * metadata).
1198 *
1199 * @param to
1200 * See sync_connect().
1201 * @param max_wait
1202 * See sync_connect().
1203 * @param serialized_metadata
1204 * See connect_with_metadata().
1205 * @param err_code
1206 * See sync_connect(). Added error: error::Code::S_CONN_METADATA_TOO_LARGE.
1207 * @param opts
1208 * See sync_connect().
1209 * @return See sync_connect().
1210 */
1211 template<typename Rep, typename Period>
1213 const boost::chrono::duration<Rep, Period>& max_wait,
1214 const boost::asio::const_buffer& serialized_metadata,
1215 Error_code* err_code = 0,
1216 const Peer_socket_options* opts = 0);
1217
1218 /**
1219 * Equivalent to `sync_connect(to, duration::max(), err_code, opt)s`; i.e., sync_connect() with no user
1220 * timeout.
1221 *
1222 * @param to
1223 * See other sync_connect().
1224 * @param err_code
1225 * See other sync_connect().
1226 * @param opts
1227 * See sync_connect().
1228 * @return See other sync_connect().
1229 */
1231 const Peer_socket_options* opts = 0);
1232
1233 /**
1234 * Equivalent to `sync_connect_with_metadata(to, duration::max(), serialized_metadata, err_code, opts)`; i.e.,
1235 * sync_connect_with_metadata() with no user timeout.
1236 *
1237 * @param to
1238 * See sync_connect().
1239 * @param serialized_metadata
1240 * See connect_with_metadata().
1241 * @param err_code
1242 * See sync_connect(). Added error: error::Code::S_CONN_METADATA_TOO_LARGE.
1243 * @param opts
1244 * See sync_connect().
1245 * @return See sync_connect().
1246 */
1248 const boost::asio::const_buffer& serialized_metadata,
1249 Error_code* err_code = 0,
1250 const Peer_socket_options* opts = 0);
1251
1252 /**
1253 * Sets up a server on the given local Flow port and returns Server_socket which can be used to
1254 * accept subsequent incoming connections to this server. Any subsequent incoming connections
1255 * will be established asynchronously and, once established, can be claimed (as Peer_socket
1256 * objects) via Server_server::accept() and friends.
1257 *
1258 * Port specification: You must select a port in the range [Node::S_FIRST_SERVICE_PORT,
1259 * Node::S_FIRST_SERVICE_PORT + Node::S_NUM_SERVICE_PORTS - 1] or the special value #S_PORT_ANY.
1260 * In the latter case an available port in the range [Node::S_FIRST_EPHEMERAL_PORT,
1261 * Node::S_FIRST_EPHEMERAL_PORT + Node::S_NUM_EPHEMERAL_PORTS - 1] will be chosen for you.
1262 * Otherwise we will use the port you explicitly specified.
1263 *
1264 * Note that using #S_PORT_ANY in this context typically makes sense only if you somehow
1265 * communicate `serv->local_port()` (where `serv` is the returned socket) to the other side through
1266 * some other means (for example if both client and server are running in the same program, you
1267 * could just pass it via variable or function call). Note that there is no overlap between the
1268 * two aforementioned port ranges.
1269 *
1270 * @param local_port
1271 * The local Flow port to which to bind.
1272 * @param err_code
1273 * See flow::Error_code docs for error reporting semantics. error::Code generated:
1274 * error::Code::S_NODE_NOT_RUNNING, error::Code::S_PORT_TAKEN,
1275 * error::Code::S_OUT_OF_PORTS, error::Code::S_INVALID_SERVICE_PORT_NUMBER,
1276 * error::Code::S_INTERNAL_ERROR_PORT_COLLISION.
1277 * @param child_sock_opts
1278 * If null, any Peer_sockets that `serv->accept()` may return (where `serv` is the returned
1279 * Server_socket) will be initialized with the options set equal to
1280 * `options().m_dyn_sock_opts`. If not null, they will be initialized with a copy of
1281 * `*child_sock_opts`. No reference to `*child_sock_opts` is saved.
1282 * @return Shared pointer to Server_socket, which is in the Server_socket::State::S_LISTENING state at least
1283 * initially; or null pointer, indicating an error.
1284 */
1285 Server_socket::Ptr listen(flow_port_t local_port, Error_code* err_code = 0,
1286 const Peer_socket_options* child_sock_opts = 0);
1287
1288 /**
1289 * Creates a new Event_set in Event_set::State::S_INACTIVE state with no sockets/events stored; returns this
1290 * Event_set.
1291 *
1292 * @param err_code
1293 * See flow::Error_code docs for error reporting semantics. error::Code generated:
1294 * error::Code::S_NODE_NOT_RUNNING.
1295 * @return Shared pointer to Event_set; or null pointer, indicating an error.
1296 */
1298
1299 /**
1300 * Interrupts any blocking operation, a/k/a wait, and informs the invoker of that operation that the
1301 * blocking operation's outcome was being interrupted. Conceptually, this causes a similar fate as a POSIX
1302 * blocking function exiting with -1/`EINTR`, for all such functions currently executing. This may be called
1303 * from any thread whatsoever and, particularly, from signal handlers as well.
1304 *
1305 * Before deciding to call this explicitly from signal handler(s), consider using the simpler
1306 * Node_options::m_st_capture_interrupt_signals_internally instead.
1307 *
1308 * The above is vague about how an interrupted "wait" exhibits itself. More specifically, then:
1309 * Any operation with name `sync_...()` will return with an error, that error being
1310 * #Error_code error::Code::S_WAIT_INTERRUPTED. Event_set::async_wait()-initiated wait will end, with the handler
1311 * function being called, passing the Boolean value `true` to that function. `true` indicates the wait was
1312 * interrupted rather than successfully finishing with 1 or more active events (`false` would've indicated th
1313 * latter, more typical situation).
1314 *
1315 * Note that various calsses have `sync_...()` operations, including Node (Node::sync_connect()),
1316 * Server_socket (Server_socket::sync_accept()), and Peer_socket (Peer_socket::sync_receive()).
1317 *
1318 * @param err_code
1319 * See flow::Error_code docs for error reporting semantics. error::Code generated:
1320 * error::Code::S_NODE_NOT_RUNNING.
1321 */
1322 void interrupt_all_waits(Error_code* err_code = 0);
1323
1324 /**
1325 * Dynamically replaces the current options set (options()) with the given options set.
1326 * Only those members of `opts` designated as dynamic (as opposed to static) may be different
1327 * between options() and `opts`. If this is violated, it is an error, and no options are changed.
1328 *
1329 * Typically one would acquire a copy of the existing options set via options(), modify the
1330 * desired dynamic data members of that copy, and then apply that copy back by calling
1331 * set_options(). Warning: this technique is only safe if other (user) threads do not call
1332 * set_options() simultaneously. There is a to-do to provide a thread-safe maneuver for when this is
1333 * a problem (see class Node doc header).
1334 *
1335 * @param opts
1336 * The new options to apply to this socket. It is copied; no reference is saved.
1337 * @param err_code
1338 * See flow::Error_code docs for error reporting semantics. error::Code generated:
1339 * error::Code::S_OPTION_CHECK_FAILED, error::Code::S_NODE_NOT_RUNNING.
1340 * @return `true` on success, `false` on error.
1341 */
1342 bool set_options(const Node_options& opts, Error_code* err_code = 0);
1343
1344 /**
1345 * Copies this Node's option set and returns that copy. If you intend to use set_options() to
1346 * modify a Node's options, we recommend you make the modifications on the copy returned by
1347 * options().
1348 *
1349 * @return Ditto.
1350 */
1351 Node_options options() const;
1352
1353 /**
1354 * The maximum number of bytes of user data per received or sent block on connections generated
1355 * from this Node, unless this value is overridden in the Peer_socket_options argument to
1356 * listen() or connect() (or friend). See Peer_socket_options::m_st_max_block_size.
1357 *
1358 * @return Ditto.
1359 */
1360 size_t max_block_size() const;
1361
1362protected:
1363
1364 // Methods.
1365
1366 // Basic setup/teardown/driver/general methods.
1367
1368 /**
1369 * Returns a raw pointer to newly created Peer_socket or sub-instance like asio::Peer_socket, depending on
1370 * the template parameter.
1371 *
1372 * @tparam Peer_socket_impl_type
1373 * Either net_flow::Peer_socket or net_flow::asio::Peer_socket, as of this writing.
1374 * @param opts
1375 * See, for example, `Peer_socket::connect(..., const Peer_socket_options&)`.
1376 * @return Pointer to new object of type Peer_socket or of a subclass.
1377 */
1378 template<typename Peer_socket_impl_type>
1380
1381 /**
1382 * Like sock_create_forward_plus_ctor_args() but for Server_sockets.
1383 *
1384 * @tparam Server_socket_impl_type
1385 * Either net_flow::Server_socket or net_flow::asio::Server_socket, as of this writing.
1386 * @param child_sock_opts
1387 * See, for example, `Peer_socket::accept(..., const Peer_socket_options* child_sock_opts)`
1388 * @return Pointer to new object of type Server_socket or of a subclass.
1389 */
1390 template<typename Server_socket_impl_type>
1392
1393 // Constants.
1394
1395 /**
1396 * Type and value to supply as user-supplied metadata in SYN, if user chooses to use
1397 * `[[a]sync_]connect()` instead of `[[a]sync_]connect_with_metadata()`. If you change this value, please
1398 * update Peer_socket::get_connect_metadata() doc header.
1399 */
1401
1402private:
1403 // Friends.
1404
1405 /**
1406 * Peer_socket must be able to forward `send()`, `receive()`, etc. to Node.
1407 * @see Peer_socket.
1408 */
1409 friend class Peer_socket;
1410 /**
1411 * Server_socket must be able to forward `accept()`, etc. to Node.
1412 * @see Server_socket.
1413 */
1414 friend class Server_socket;
1415 /**
1416 * Event_set must be able to forward `close()`, `event_set_async_wait()`, etc. to Node.
1417 * @see Event_set.
1418 */
1419 friend class Event_set;
1420
1421 // Types.
1422
1423 /// Short-hand for UDP socket.
1424 using Udp_socket = boost::asio::ip::udp::socket;
1425
1426 /// boost.asio timer wrapped in a ref-counted pointer.
1427 using Timer_ptr = boost::shared_ptr<util::Timer>;
1428
1429 /// Short-hand for a signal set.
1430 using Signal_set = boost::asio::signal_set;
1431
1432 /// Short-hand for high-performance, non-reentrant, exclusive mutex used to lock #m_opts.
1434
1435 /// Short-hand for lock that acquires exclusive access to an #Options_mutex.
1437
1438 struct Socket_id;
1439 // Friend of Node: For ability to reference private `struct` Node::Socket_id.
1440 friend size_t hash_value(const Socket_id& socket_id);
1441 friend bool operator==(const Socket_id& lhs, const Socket_id& rhs);
1442
1443 /**
1444 * A map from the connection ID (= remote-local socket pair) to the local Peer_socket that is
1445 * the local portion of the connection. Applies to peer-to-peer (not server) sockets.
1446 */
1447 using Socket_id_to_socket_map = boost::unordered_map<Socket_id, Peer_socket::Ptr>;
1448
1449 /// A map from the local Flow port to the local Server_socket listening on that port.
1450 using Port_to_server_map = boost::unordered_map<flow_port_t, Server_socket::Ptr>;
1451
1452 /// A set of Event_set objects.
1453 using Event_sets = boost::unordered_set<Event_set::Ptr>;
1454
1455 // Methods.
1456
1457 // Basic setup/teardown/driver/general methods.
1458
1459 /**
1460 * Worker thread W (main event loop) body. Does not exit unless told to do so by Node's
1461 * destruction (presumably from a non-W thread, as W is not exposed to Node user).
1462 *
1463 * @param low_lvl_endpoint
1464 * See that parameter on Node constructor. Intentionally passed by value, to
1465 * avoid race with user's Udp_endpoint object disappearing before worker_run() can
1466 * use it.
1467 */
1468 void worker_run(const util::Udp_endpoint low_lvl_endpoint);
1469
1470 /**
1471 * Helper to invoke for each thread in which this Node executes, whether or not it starts that thread,
1472 * that applies certain common settings to all subsequent logging from that thread.
1473 *
1474 * E.g., it might nickname the thread (w/r/t logging) and set a certain style of printing duration units (short
1475 * like "ms" or long like "milliseconds"): these probably won't change for the rest of the Node's logging.
1476 *
1477 * @param thread_type
1478 * Roughly 3-letter character sequence identifying the thread's purpose, to be included in the thread's logged
1479 * nickname in subsequent log message prefixes; or empty string to let the thread's nickname stay as-is.
1480 * @param logger
1481 * The Logger whose logging to configure(); or null to assume `this->get_logger()` (which is typical but may
1482 * not yet be available, say, during object construction).
1483 * @return Address of the Logger that was configured (either `logger` or `this->get_logger()`).
1484 */
1485 log::Logger* this_thread_init_logger_setup(const std::string& thread_type, log::Logger* logger = 0);
1486
1487 /**
1488 * Given a new set of Node_options intended to replace (or initialize) a Node's #m_opts, ensures
1489 * that these new option values are legal. In all cases, values are checked for individual and
1490 * mutual validity. Additionally, unless init is true, which means we're being called from
1491 * constructor, ensures that no `static` data member is different between #m_opts and opts. If any
1492 * validation fails, it is an error.
1493 *
1494 * Pre-condition: If `!init`, #m_opts_mutex is locked.
1495 *
1496 * @todo Is it necessary to return `opts` now that we've switched to C++11 or better?
1497 *
1498 * @param opts
1499 * New option values to validate.
1500 * @param init
1501 * True if called from constructor; false if called from set_options().
1502 * @param err_code
1503 * See flow::Error_code docs for error reporting semantics. error::Code generated:
1504 * error::Code::S_OPTION_CHECK_FAILED, error::Code::S_STATIC_OPTION_CHANGED.
1505 * @return `opts`. The only reason we return this is so that it can be called during the
1506 * construction's initializer section (go, C++03!).
1507 */
1508 const Node_options& validate_options(const Node_options& opts, bool init, Error_code* err_code) const;
1509
1510 /**
1511 * Helper that compares `new_val` to `old_val` and, if they are not equal, logs and returns an error;
1512 * used to ensure static options are not changed.
1513 *
1514 * @tparam Opt_type
1515 * Type of a Node_options, etc., data member.
1516 * @param new_val
1517 * Proposed new value for the option.
1518 * @param old_val
1519 * Current value of the option.
1520 * @param opt_id
1521 * The name of the option, suitable for logging; this is presumably obtained using the
1522 * macro `#` technique.
1523 * @param err_code
1524 * See Peer_socket::set_options().
1525 * @return `true` on success, `false` on validation error.
1526 */
1527 template<typename Opt_type>
1528 bool validate_static_option(const Opt_type& new_val, const Opt_type& old_val, const std::string& opt_id,
1529 Error_code* err_code) const;
1530
1531 /**
1532 * Helper that, if the given condition is false, logs and returns an error; used to check for
1533 * option value validity when setting options.
1534 *
1535 * @param check
1536 * `false` if and only if some validity check failed.
1537 * @param check_str
1538 * String describing which condition was checked; this is presumably obtained using the
1539 * macro # technique.
1540 * @param err_code
1541 * See Peer_socket::set_options().
1542 * @return `true` on success, `false` on validation error.
1543 */
1544 bool validate_option_check(bool check, const std::string& check_str, Error_code* err_code) const;
1545
1546 /**
1547 * Obtain a copy of the value of a given option in a thread-safe manner. Because #m_opts may be
1548 * modified at any time -- even if the desired option is static and not being modified, this is
1549 * still unsafe -- #m_opts must be locked, the desired value must be copied, and #m_opts must be
1550 * unlocked. This method does so.
1551 *
1552 * Do NOT read option values without opt().
1553 *
1554 * @tparam Opt_type
1555 * The type of the option data member.
1556 * @param opt_val_ref
1557 * A reference (important!) to the value you want; this may be either a data member of
1558 * `this->m_opts` or the entire `this->m_opts` itself.
1559 * @return A copy of the value at the given reference.
1560 */
1561 template<typename Opt_type>
1562 Opt_type opt(const Opt_type& opt_val_ref) const;
1563
1564 /**
1565 * Performs low-priority tasks that should be run on an infrequent, regular basis, such as stat
1566 * logging and schedules the next time this should happen. This is the timer handler for that
1567 * timer.
1568 *
1569 * @param reschedule
1570 * If `true`, after completing the tasks, the timer is scheduled to run again later;
1571 * otherwise it is not.
1572 */
1573 void perform_regular_infrequent_tasks(bool reschedule);
1574
1575 /* Methods dealing with low-level packet I/O. Implementations are in low_lvl_io.cpp. The
1576 * line between these methods and the ones further down (like handle_incoming())) is blurred, but basically the
1577 * latter methods deal with each incoming packet after it has been read off wire and gone through
1578 * the network simulator (if any is active). By contrast the methods just below
1579 * (low_lvl_io.cpp) deal with receiving and sending low-level packets (including packet
1580 * pacing) and network condition simulation (if active) -- basically leaving the core protocol
1581 * logic to the aforementioned core logic methods. */
1582
1583 // Input.
1584
1585 /**
1586 * Registers so that during the current or next `m_task_engine.run()`, the latter will wait for a receivable UDP
1587 * packet and, when one is available, will call low_lvl_recv_and_handle().
1588 *
1589 * Pre-condition: we are in thread W.
1590 */
1591 void async_low_lvl_recv();
1592
1593 /**
1594 * Handles the pre-condition that #m_low_lvl_sock has a UDP packet available for reading, or that there
1595 * was an error in waiting for this pre-condition. If no error (`!sys_err_code`) then the packet is read
1596 * (thus erased) from the OS UDP net-stack's packet queue. The packet is then properly handled (for
1597 * example it may result in more data decoded into an appropriate Peer_socket's stream buffer).
1598 *
1599 * @param sys_err_code
1600 * Error code of the operation.
1601 */
1602 void low_lvl_recv_and_handle(Error_code sys_err_code);
1603
1604 /**
1605 * Helper for low_lvl_recv_and_handle() that calls handle_incoming() on the not-yet-deserialized low-level
1606 * packet just read off the UDP socket, but first handles simulation of various network conditions
1607 * like latency, loss, and duplication. Pre-condition is that a UDP receive just successfully
1608 * got the data, or that a simulation thereof occurred.
1609 *
1610 * @param packet_data
1611 * See handle_incoming(). Note that, despite this method possibly acting asynchronously (e.g.,
1612 * if simulating latency), `*packet_data` ownership is retained by the immediate caller.
1613 * Caller must not assume anything about its contents upon return and is free to do anything else to it
1614 * (e.g., read another datagram into it).
1615 * @param low_lvl_remote_endpoint
1616 * See handle_incoming().
1617 * @param is_sim_duplicate_packet
1618 * `false` if `packet_data` contains data actually just read from UDP socket.
1619 * `true` if `packet_data` contains data placed there as a simulated duplicate packet.
1620 * The latter is used to prevent that simulated duplicated packet from itself getting
1621 * duplicated or dropped.
1622 * @return The number of times handle_incoming() was called *within* this call (before this call
1623 * returned); i.e., the number of packets (e.g., packet and/or its duplicate) handled
1624 * immediately as opposed to dropped or scheduled to be handled later.
1625 */
1626 unsigned int handle_incoming_with_simulation(util::Blob* packet_data,
1627 const util::Udp_endpoint& low_lvl_remote_endpoint,
1628 bool is_sim_duplicate_packet = false);
1629
1630 /**
1631 * Sets up `handle_incoming(packet_data, low_lvl_remote_endpoint)` to be called asynchronously after a
1632 * specified period of time. Used to simulate latency.
1633 *
1634 * @param latency
1635 * After how long to call handle_incoming().
1636 * @param packet_data
1637 * See handle_incoming_with_simulation().
1638 * @param low_lvl_remote_endpoint
1639 * See handle_incoming_with_simulation().
1640 */
1642 util::Blob* packet_data,
1643 const util::Udp_endpoint& low_lvl_remote_endpoint);
1644
1645 // Output.
1646
1647 /**
1648 * async_low_lvl_packet_send_impl() wrapper to call when `packet` is to be sent to the remote side of
1649 * the connection `sock`. In particular, this records certain per-socket stats accordingly.
1650 *
1651 * @param sock
1652 * Socket whose remote side to target when sending.
1653 * @param packet
1654 * See async_low_lvl_packet_send_impl().
1655 * @param delayed_by_pacing
1656 * See async_low_lvl_packet_send_impl().
1657 */
1659 bool delayed_by_pacing);
1660
1661 /**
1662 * async_low_lvl_packet_send_impl() wrapper to call when `packet` is to be sent to the remote side of
1663 * the connection `sock`. In particular, this records certain per-socket stats accordingly.
1664 *
1665 * @param low_lvl_remote_endpoint
1666 * UDP endpoint for the Node to which to send the packet.
1667 * @param packet
1668 * See async_low_lvl_packet_send_impl().
1669 */
1670 void async_no_sock_low_lvl_packet_send(const util::Udp_endpoint& low_lvl_remote_endpoint,
1672
1673 /**
1674 * Takes given low-level packet structure, serializes it, and initiates
1675 * asynchronous send of these data to the remote Node specified by the given UDP endpoint.
1676 * The local and target ports are assumed to be already filled out in `*packet`.
1677 * Once the send is possible (i.e., UDP net-stack is able to buffer it for sending; or there is an
1678 * error), low_lvl_packet_sent() is called (asynchronously).
1679 *
1680 * Takes ownership of `packet`; do not reference it in any way after this method returns.
1681 *
1682 * @note This method exiting in no way indicates the send succeeded (indeed,
1683 * the send cannot possibly initiate until this method exits).
1684 *
1685 * @param low_lvl_remote_endpoint
1686 * UDP endpoint for the Node to which to send the packet.
1687 * @param packet
1688 * Pointer to packet structure with everything filled out as desired.
1689 * @param delayed_by_pacing
1690 * `true` if there was a (pacing-related) delay between when higher-level code decided to send this packet
1691 * and the execution of this method; `false` if there was not, meaning said higher-level code executed us
1692 * immediately (synchronously), though not necessarily via a direct call (in fact that's unlikely; hence
1693 * `_impl` in the name).
1694 * @param sock
1695 * Peer_socket associated with this connection; null pointer if none is so associated.
1696 * If not null, behavior undefined unless `low_lvl_remote_endpoint == sock->remote_endpoint().m_udp_endpoint`.
1697 */
1698 void async_low_lvl_packet_send_impl(const util::Udp_endpoint& low_lvl_remote_endpoint,
1699 Low_lvl_packet::Const_ptr packet, bool delayed_by_pacing, Peer_socket::Ptr sock);
1700
1701 /**
1702 * Completion handler for async_low_lvl_packet_send_impl(); called when the packet is either
1703 * successfully fed to the UDP net-stack for sending, or when there is an error in doing so.
1704 *
1705 * @warning It is important to pass `packet` to this, because the serialization operation produces
1706 * a bunch of pointers into `*packet`; if one does not pass it here through the
1707 * boost.asio send call, `*packet` might get deleted, and then send op will try to access
1708 * pointer(s) to invalid memory.
1709 * @param packet
1710 * Ref-counted pointer to the packet that was hopefully sent.
1711 * Will be destroyed at the end of low_lvl_packet_sent() unless a copy of this pointer is
1712 * saved elsewhere before that point. (Usually you should indeed let it be destroyed.)
1713 * @param sock
1714 * See async_low_lvl_packet_send_impl(). Note the null pointer is allowed.
1715 * @param bytes_expected_transferred
1716 * Size of the serialization of `*packet`, that being the total # of bytes we want sent
1717 * over UDP.
1718 * @param sys_err_code
1719 * Result of UDP send operation.
1720 * @param bytes_transferred
1721 * Number of bytes transferred assuming `!err_code`.
1722 * Presumably that would equal `bytes_expected_transferred`, but we will see.
1723 */
1724 void low_lvl_packet_sent(Peer_socket::Ptr sock, Low_lvl_packet::Const_ptr packet, size_t bytes_expected_transferred,
1725 const Error_code& sys_err_code, size_t bytes_transferred);
1726
1727 /**
1728 * Performs important book-keeping based on the event "DATA packet was sent to destination."
1729 * The affected data structures are: Sent_packet::m_sent_when (for the Sent_packet in question),
1730 * Peer_socket::m_snd_last_data_sent_when, Drop_timer Peer_socket::m_snd_drop_timer (in `*sock`).
1731 * sock->m_snd_drop_timer. More information is in the doc headers for
1732 * those data members.
1733 *
1734 * @param sock
1735 * Socket for which the given DATA packet is sent.
1736 * @param seq_num
1737 * The first sequence number for the sent DATA packet.
1738 * Sent_packet::m_sent_when for its Sent_packet should contain the time at which send_worker() removed
1739 * the data from Send buffer and packetized it; it's used to log the difference between
1740 * that time and now.
1741 */
1742 void mark_data_packet_sent(Peer_socket::Ptr sock, const Sequence_number& seq_num);
1743
1744 /**
1745 * Sends an RST to the given UDP endpoint in response to the given incoming low-level packet that
1746 * came from that endpoint, when there is no associated Peer_socket for that remote endpoint/local port combo.
1747 * An error is unlikely, but if it happens there is no reporting other than logging.
1748 *
1749 * You should use this to reply with an RST in situations where no Peer_socket is applicable; for
1750 * example if anything but a SYN or RST is sent to a server port. In situations where a
1751 * Peer_socket is applicable (which is most of the time an RST is needed), use
1752 * async_sock_low_lvl_rst_send().
1753 *
1754 * @param causing_packet
1755 * Packet we're responding to (used at least to set the source and destination Flow ports
1756 * of the sent packet).
1757 * @param low_lvl_remote_endpoint
1758 * Where `causing_packet` came from (the Node low-level endpoint).
1759 */
1761 const util::Udp_endpoint& low_lvl_remote_endpoint);
1762
1763 /**
1764 * Begins the process of asynchronously sending the given low-level packet to the remote Node
1765 * specified by the given Peer_socket. The method, if this feature is applicable and enabled,
1766 * applies packet pacing (which attempts to avoid burstiness by spreading out packets without
1767 * changing overall sending rate). Therefore the given packet may be sent as soon as a UDP send
1768 * is possible according to OS (which is typically immediate), or later, if pacing delays it. Once it is
1769 * time to send it, async_sock_low_lvl_packet_send() is used.
1770 *
1771 * Takes ownership of packet; do not reference it in any way after this method returns.
1772 *
1773 * Note that an error may occur in asynchronous operations triggered by this method; if this
1774 * happens the socket will be closed via close_connection_immediately().
1775 *
1776 * @param sock
1777 * Socket whose `remote_endpoint()` specifies to what Node and what Flow port within that
1778 * Node this socket will go.
1779 * @param packet
1780 * Pointer to packet structure with everything except the source, destination, and
1781 * retransmission mode fields (essentially, the public members of Low_lvl_packet proper but
1782 * not its derived types) filled out as desired.
1783 */
1785 Low_lvl_packet::Ptr&& packet);
1786
1787 /**
1788 * async_sock_low_lvl_packet_send_paced() pacing helper: Handles a DATA or ACK packet that was just
1789 * passed into async_sock_low_lvl_packet_send_paced(), i.e., is available for sending. That is, either
1790 * sends the packet via async_sock_low_lvl_packet_send() immediately or queues it for sending later.
1791 *
1792 * Pre-conditions: pacing is enabled for the socket in options; an SRTT value has been computed
1793 * (is not undefined); packet is DATA or ACK; packet is fully filled out; `sock` is in OPEN state;
1794 * invariants described for `struct` Send_pacing_data hold.
1795 *
1796 * Note that an error may occur in asynchronous operations triggered by this method; if this
1797 * happens the socket will be closed via close_connection_immediately().
1798 *
1799 * Takes ownership of packet; do not reference it in any way after this method returns.
1800 *
1801 * @param sock
1802 * Socket under consideration.
1803 * @param packet
1804 * Packet to send.
1805 */
1807
1808 /**
1809 * async_sock_low_lvl_packet_send_paced() pacing helper: Resets the socket's Send_pacing_data structure
1810 * to reflect that a new pacing time slice should begin right now. The slice start is set to now,
1811 * its period is set based on the current SRTT and congestion window (so that packets are evenly
1812 * spread out over the next SRTT); and the number of full packets allowed over this time slice are
1813 * computed.
1814 *
1815 * Pre-conditions: pacing is enabled for the socket in options; an SRTT value has been computed
1816 * (is not undefined); `sock` is in OPEN state; invariants described for `struct` Send_pacing_data
1817 * hold.
1818 *
1819 * @see `struct` Send_pacing_data doc header.
1820 * @param sock
1821 * Socket under consideration. Should be in OPEN state.
1822 * @param now
1823 * For performance (so that we don't need to acquire the current time again), this is the
1824 * very recent time point at which it was determined it is time for a new pacing time slice.
1825 */
1827
1828 /**
1829 * async_sock_low_lvl_packet_send_paced() pacing helper: Given that we are currently in the pacing time
1830 * slice in `sock->m_snd_pacing_data`, sends as many queued packets as possible given the time
1831 * slice's budget, and if any remain queued after this, schedules for them to be sent in the next
1832 * time slice.
1833 *
1834 * Pre-conditions: pacing is enabled for the socket in options; an SRTT value has been computed
1835 * (is not undefined); `sock` is in OPEN state; invariants described for `struct` Send_pacing_data
1836 * hold; the current time is roughly within the current pacing time slice.
1837 *
1838 * Note that an error may occur in asynchronous operations triggered by this method; if this
1839 * happens to socket will be closed via close_connection_immediately(). However if the error
1840 * happens IN this method (`false` is returned), it is up to the caller to handle the error as
1841 * desired.
1842 *
1843 * @param sock
1844 * Socket under consideration.
1845 * @param executing_after_delay
1846 * `true` if executing from a pacing-related timer handler; `false` otherwise (i.e.,
1847 * if sock_pacing_new_packet_ready() is in the call stack).
1848 */
1849 void sock_pacing_process_q(Peer_socket::Ptr sock, bool executing_after_delay);
1850
1851 /**
1852 * async_sock_low_lvl_packet_send_paced() pacing helper: If sock_pacing_process_q() ran out of the last
1853 * time slice's budget and still had packets to send, this is the handler that triggers when the
1854 * out-of-budget time slice ends. Sets up a new time slice starting now and tries to send as many
1855 * queud packets as possible with the new budget; if still more packets remain after this,
1856 * schedules yet another timer.
1857 *
1858 * This may also be called via `cancel()` of the timer. In this case, the pre-condition is that
1859 * `sock->state() == Peer_socket::State::S_CLOSED`; the method will do nothing.
1860 *
1861 * Otherwise, pre-conditions: Send_pacing_data::m_packet_q for `sock` is NOT empty; the byte budget for
1862 * the current time slice is less than the packet at the head `m_packet_q`; `sock` is in OPEN state;
1863 * invariants described for `struct` Send_pacing_data hold; the current time is roughly just past
1864 * the current pacing time slice.
1865 *
1866 * Note that an error may occur in asynchronous operations triggered by this method; if this
1867 * happens to socket will be closed via close_connection_immediately(). However if the error
1868 * happens IN this method (`false` is returned), it is up to the caller to handle the error as
1869 * desired.
1870 *
1871 * @param sock
1872 * Socket under consideration.
1873 * @param sys_err_code
1874 * boost.asio error code.
1875 */
1876 void sock_pacing_time_slice_end(Peer_socket::Ptr sock, const Error_code& sys_err_code);
1877
1878 /**
1879 * Sends an RST to the other side of the given socket asynchronously when possible. An error is
1880 * unlikely, but if it happens there is no reporting other than logging.
1881 *
1882 * @param sock
1883 * Socket the remote side of which will get the RST.
1884 */
1886
1887 /**
1888 * Sends an RST to the other side of the given socket, synchronously. An error is
1889 * unlikely, but if it happens there is no reporting other than logging. Will block (though
1890 * probably not for long, this being UDP) if #m_low_lvl_sock is in blocking mode.
1891 *
1892 * @param sock
1893 * Socket the remote side of which will get the RST.
1894 */
1896
1897 // Methods for core protocol logic dealing with deserialized packets before demuxing to Peer_socket or Server_socket.
1898
1899 /**
1900 * Handles a just-received, not-yet-deserialized low-level packet. A rather important method....
1901 *
1902 * @param packet_data
1903 * Packet to deserialize and handle. Upon return, the state of `*packet_data` is not known; and caller retains
1904 * ownership of it (e.g., can read another datagram into it if desired).
1905 * @param low_lvl_remote_endpoint
1906 * From where the packet came.
1907 */
1908 void handle_incoming(util::Blob* packet_data,
1909 const util::Udp_endpoint& low_lvl_remote_endpoint);
1910
1911 /**
1912 * Performs all tasks to be performed at the end of low_lvl_recv_and_handle() or
1913 * async part of async_wait_latency_then_handle_incoming(), as determined over the course of the execution
1914 * of either of those methods. This includes at least performing event_set_all_check_delta() for
1915 * anything in #m_sock_events, etc., and any accumulated ACK-related tasks stored in the Peer_sockets
1916 * in #m_socks_with_accumulated_pending_acks and similar. This is done for efficiency and to
1917 * reduce network overhead (for example, to combine several individual acknowledgments into one
1918 * ACK packet).
1919 */
1921
1922 // Methods dealing with individual Peer_sockets. Implementations are in peer_socket.cpp.
1923
1924 /**
1925 * Handles a just-deserialized, just-demultiplexed low-level SYN_ACK packet delivered to the given
1926 * peer socket in `S_SYN_SENT` state. So it will hopefully send back a SYN_ACK_ACK, etc.
1927 *
1928 * @param socket_id
1929 * Connection ID (socket pair) identifying the socket in #m_socks.
1930 * @param sock
1931 * Peer socket in Peer_socket::Int_state::S_SYN_SENT internal state.
1932 * @param syn_ack
1933 * Deserialized immutable SYN_ACK.
1934 */
1936 Peer_socket::Ptr sock,
1937 boost::shared_ptr<const Syn_ack_packet> syn_ack);
1938
1939 /**
1940 * Handles a just-deserialized, just-demultiplexed, duplicate (equal to already-received SYN_ACK)
1941 * low-level SYN_ACK packet delivered to the given peer socket in `S_ESTABLISHED` state. This will
1942 * hopefully reply with SYN_ACK_ACK again. Reasoning for this behavior is given in
1943 * handle_incoming() at the call to this method.
1944 *
1945 * @param sock
1946 * Peer socket in Peer_socket::Int_state::S_ESTABLISHED internal state with sock->m_active_connect.
1947 * @param syn_ack
1948 * Deserialized immutable SYN_ACK.
1949 */
1951 boost::shared_ptr<const Syn_ack_packet> syn_ack);
1952
1953 /**
1954 * Handles a just-deserialized, just-demultiplexed, low-level DATA packet delivered to the given
1955 * peer socket in `S_ESTABLISHED` state. This will hopefully reply with ACK and deliver the data to
1956 * the Receive buffer, where the user can receive() them.
1957 *
1958 * Also similarly handles packets received and queued earlier while in `S_SYN_RCVD` state.
1959 *
1960 * @param socket_id
1961 * Connection ID (socket pair) identifying the socket in #m_socks.
1962 * @param sock
1963 * Peer socket in Peer_socket::Int_state::S_ESTABLISHED internal state.
1964 * @param packet
1965 * Deserialized DATA packet. (For performance when moving data to Receive
1966 * buffer, this is modifiable.)
1967 * @param syn_rcvd_qd_packet
1968 * If `true`, this packet was saved during Peer_socket::Int_state::S_SYN_RCVD by handle_data_to_syn_rcvd() and
1969 * is being handled now that socket is Peer_socket::Int_state::S_ESTABLISHED. If `false`, this packet was
1970 * received normally during `S_ESTABLISHED` state.
1971 */
1973 Peer_socket::Ptr sock,
1974 boost::shared_ptr<Data_packet> packet,
1975 bool syn_rcvd_qd_packet);
1976
1977 /**
1978 * Helper for handle_data_to_established() that categorizes the DATA packet received as either
1979 * illegal; legal but duplicate of a previously received DATA packet;
1980 * legal but out-of-order; and finally legal and in-order. Illegal means sender can never validly send
1981 * such sequence numbers in a DATA packet. Legal means it can, although network problems may still lead to
1982 * the received DATA being not-useful in some way. Out-of-order means that `packet` occupies seq. numbers
1983 * past the start of the first unreceived data, or "first gap," which starts at Peer_socket::m_rcv_next_seq_num.
1984 * In-order, therefore, means `packet` indeed begins exactly at Peer_socket::m_rcv_next_seq_num (which means typically
1985 * one should increment the latter by `packet->m_data.size()`).
1986 *
1987 * No statistics are marked down on `sock`; the caller should proceed depending on the output as described
1988 * just below.
1989 *
1990 * If a truthy value is returned, packet is illegal; other outputs are meaningless. Otherwise, falsy is returned;
1991 * and: If `*dupe`, then packet is a legal dupe; and other outputs are meaningless. Otherwise, `!*dupe`. and:
1992 * `*slide` if and only if the packet is in-order (hence receive window left edge should "slide" right).
1993 * `*slide_size` is the number of bytes by which Peer_socket::m_rcv_next_seq_num should increment ("slide");
1994 * it is meaningful if and only if `*slide`.
1995 *
1996 * (Aside: Every attempt to detect illegality is made, within reason, but NOT every illegal behavior can be detected
1997 * as such; but defensive coding strives that a failure to detect such leads to nothing worse than meaningless data
1998 * received by user.)
1999 *
2000 * @param sock
2001 * See handle_data_to_established().
2002 * @param packet
2003 * See handle_data_to_established(). Note it is read-only, however.
2004 *.@param dupe
2005 * Output for whether the packet is a dupe (true if so). Meaningless if truthy is returned.
2006 * @param slide
2007 * Output for whether the packet consists of the next data to be passed to Receive buffer.
2008 * Meaningless if truthy is returned, or else if `*dupe` is set to `true`.
2009 * @param slide_size
2010 * By how much to increment Peer_socket::m_rcv_next_seq_num due to this in-order packet.
2011 * Meaningless unless `*slide` is set to `true`.
2012 * @return Success if `packet` is legal; the recommended error to accompany the connection-breaking RST due
2013 * to the illegal `packet`, otherwise.
2014 */
2016 boost::shared_ptr<const Data_packet> packet,
2017 bool* dupe, bool* slide, size_t* slide_size);
2018
2019 /**
2020 * Helper for handle_data_to_established() that aims to pass the payload of the given DATA packet to
2021 * the given socket's Receive buffer for user consumption; but detects and reports overflow if appropriate,
2022 * instead. Certain relevant stats are logged in all cases. `packet.m_data` is emptied due to moving it
2023 * elsewhere -- for performance (recommend saving its `.size()` before-hand, if needed for later) --
2024 * and the implications on rcv_wnd recovery (if any) are handled. `true` is returned assuming no overflow.
2025 *
2026 * If overflow detected, only statistical observations and logs are made, and `false` is returned.
2027 *
2028 * @param sock
2029 * See handle_data_to_established().
2030 * @param packet
2031 * See handle_data_to_established().
2032 * @return `false` on overflow; `true` on success.
2033 */
2035 boost::shared_ptr<Data_packet> packet);
2036
2037 /**
2038 * Helper for handle_data_to_established() that assumes the given's socket Receive buffer is currently
2039 * readable and handles implications on the Event_set subsystem.
2040 *
2041 * @param sock
2042 * See handle_data_to_established().
2043 * @param syn_rcvd_qd_packet
2044 * See handle_data_to_established().
2045 */
2046 void sock_rcv_buf_now_readable(Peer_socket::Ptr sock, bool syn_rcvd_qd_packet);
2047
2048 /**
2049 * Helper for handle_data_to_established() that aims to register the given DATA packet as an out-of-order
2050 * packet in `sock->m_rcv_packets_with_gaps` -- in retransmission-off mode. The retransmission-on counterpart
2051 * is, roughly speaking, sock_data_to_reassembly_q_unless_overflow().
2052 *
2053 * This assumes that sock_categorize_data_to_established() returned
2054 * `*slide == false`. However, due to overflow considerations
2055 * this helper itself set its own `*slide` (and `*slide_size`) value. The `*slide` argument should be
2056 * interpereted the same way as from sock_categorize_data_to_established(); `*slide_size` (meaningful if
2057 * and only if `*slide = true` is set) specifies by how much Peer_socket::m_rcv_next_seq_num must now increment.
2058 * (Note, then, that in the caller this can only set `*slide` from `false` to `true`; or not touch it.)
2059 *
2060 * @param sock
2061 * See handle_data_to_established().
2062 * @param packet
2063 * See handle_data_to_established(). Note it is read-only, however.
2064 * @param data_size
2065 * Original `packet->m_data.size()` value; by now presumbly that value is 0, but we want the original.
2066 * @param slide
2067 * Same semantics as in sock_categorize_data_to_established() (except it is always set; no "illegal" case).
2068 * @param slide_size
2069 * By how much to increment Peer_socket::m_rcv_next_seq_num due certain overflow considerations.
2070 */
2072 boost::shared_ptr<const Data_packet> packet,
2073 size_t data_size,
2074 bool* slide, size_t* slide_size);
2075
2076 /**
2077 * Helper for handle_data_to_established() that aims to register the given DATA packet as an out-of-order
2078 * packet in the reassembly queue `sock->m_rcv_packets_with_gaps` -- in retransmission-on mode; but detects
2079 * and reports overflow if appropriate, instead. Certain relevant stats are logged in all cases.
2080 * `packet.m_data` is emptied due to moving it elsewhere -- for performance (recommend saving its `.size()`
2081 * before-hand, if needed for later) -- and the implications on rcv_wnd recovery (if any) are handled.
2082 * `true` is returned assuming no overflow. The retransmission-off counterpart
2083 * is, roughly speaking, sock_track_new_data_after_gap_rexmit_off().
2084 *
2085 * If overflow detected, only statistical observations and logs are made, and `false` is returned.
2086 *
2087 * This assumes that sock_categorize_data_to_established() returned `*slide == false`.
2088 *
2089 * @param sock
2090 * See handle_data_to_established().
2091 * @param packet
2092 * See handle_data_to_established().
2093 * @return `false` on overflow; `true` on success.
2094 */
2096 boost::shared_ptr<Data_packet> packet);
2097
2098 /**
2099 * Helper for handle_data_to_established() that aims to register a set of received DATA packet data as in-order
2100 * payload in the structures Peer_socket::m_rcv_packets_with_gaps and Peer_socket::m_rcv_next_seq_num
2101 * in `sock`. Both structures are updated given the precondition that a set of data had arrived with data
2102 * starting at `sock->m_rcv_next_seq_num`. If `reassembly_in_progress` (which should be `true` if and only
2103 * if retransmission is on), then the reassembly queue is popped into `sock->m_rcv_buf` to the appropriate
2104 * extent (as the just-arrived packet may have bridged the entire gap to the first packet in that queue).
2105 *
2106 * Certain relevant stats are logged in all cases. Note that it's possible to simulate DATA packets' receipt
2107 * without actually having received such a packet. This method will slide the window as directed regardless.
2108 *
2109 * @param sock
2110 * See handle_data_to_established().
2111 * @param slide_size
2112 * By how much to increment (slide right) Peer_socket::m_rcv_packets_with_gaps.
2113 * See handle_data_to_established().
2114 * @param reassembly_in_progress
2115 * Basically, `sock->rexmit_on()`.
2116 */
2117 void sock_slide_rcv_next_seq_num(Peer_socket::Ptr sock, size_t slide_size, bool reassembly_in_progress);
2118
2119 /**
2120 * Computes and returns the max size for Peer_socket::m_rcv_packets_with_gaps for `sock`.
2121 *
2122 * @param sock
2123 * An open socket.
2124 * @return See above.
2125 */
2127
2128 /**
2129 * Causes an acknowledgment of the given received packet to be included in a future Ack_packet
2130 * sent to the other side. That ACK low-level UDP packet is not sent in this handler, even if
2131 * the low-level UDP socket is currently writable. The sending of this packet is performed
2132 * asynchronously in the manner of `boost::asio::post(io_context&)`.
2133 *
2134 * Note that the Ack_packet may include other packets being acknowledged; and that ACK may be
2135 * artificially delayed for reasons like the desire to accumulate more acknowledgments before
2136 * sending ACK (to cut down on overhead).
2137 *
2138 * @param sock
2139 * Peer socket in Peer_socket::Int_state::S_ESTABLISHED.
2140 * @param seq_num
2141 * Sequence number of first datum in the packet to be acknowledged.
2142 * @param rexmit_id
2143 * Which attempt are we acknowledging (0 = initial send, 1 = first retransmission, 2 =
2144 * second retransmission, ...). Always 0 if retransmission is off.
2145 * @param data_size
2146 * Number of bytes in the user data in the packet to be acknowledged.
2147 */
2148 void async_acknowledge_packet(Peer_socket::Ptr sock, const Sequence_number& seq_num, unsigned int rexmit_id,
2149 size_t data_size);
2150
2151 /**
2152 * Helper of perform_accumulated_on_recv_tasks() that handles any additional individual outgoing
2153 * acknowledgments accumulated during the currently running receive handler. Pre-conditions:
2154 * executed from perform_accumulated_on_recv_tasks(); `!(Peer_socket::m_rcv_pending_acks).empty()`
2155 * for `sock`; Peer_socket::m_rcv_pending_acks_size_at_recv_handler_start (for `sock`) has been set;
2156 * `sock` is in #m_socks_with_accumulated_pending_acks.
2157 *
2158 * If state is not Peer_socket::Int_state::S_ESTABLISHED, method does nothing except possibly log.
2159 *
2160 * @param socket_id
2161 * Connection ID (socket pair) identifying the socket in #m_socks.
2162 * @param sock
2163 * Peer socket.
2164 */
2166
2167 /**
2168 * Helper for handle_data_to_established() that gets simple info about
2169 * Peer_socket::m_rcv_packets_with_gaps in `sock`.
2170 *
2171 * @param sock
2172 * Socket to examine.
2173 * @param first_gap_exists
2174 * Pointer to value to set to true if and only if !(Peer_socket::m_rcv_packets_with_gaps).empty()
2175 * in `sock`. If the Peer_socket::m_rcv_packets_with_gaps invariant fully holds, this means that
2176 * there is at least one gap of unreceived packets between some received packets and other received packets,
2177 * by sequence number order.
2178 * @param seq_num_after_first_gap
2179 * Pointer to value that will be set to the first sequence number of the first element of
2180 * `sock->m_rcv_packets_with_gaps`; untouched if `!*first_gap_exists` at return.
2181 */
2183 bool* first_gap_exists, Sequence_number* seq_num_after_first_gap);
2184
2185 /**
2186 * Logs TRACE or DATA messages that show the detailed state of the receiving sequence number
2187 * space. Quite slow if DATA log level is enabled or `force_verbose_info_logging` is `true`.
2188 *
2189 * @param sock
2190 * Socket whose data to log.
2191 * @param force_verbose_info_logging
2192 * If `true`, then the method acts as if DATA logging is enabled, i.e., the maximum amount of
2193 * information is logged (but with INFO verbosity). You should only do this if you know
2194 * for a fact that this is being called infrequently (such as from
2195 * perform_regular_infrequent_tasks()).
2196 */
2197 void log_rcv_window(Peer_socket::Const_ptr sock, bool force_verbose_info_logging = false) const;
2198
2199 /**
2200 * Handles a just-deserialized, just-demultiplexed, low-level ACK packet delivered to the given
2201 * peer socket in Peer_socket::Int_state::S_ESTABLISHED state. This will hopefully
2202 * update internal data structures and inform congestion control (or queue that to be done by the end of the
2203 * current receive handler, low_lvl_recv_and_handle() or async part of async_wait_latency_then_handle_incoming().
2204 *
2205 * @param sock
2206 * Peer socket in Peer_socket::Int_state::S_ESTABLISHED.
2207 * @param ack
2208 * Deserialized immutable ACK.
2209 */
2211 boost::shared_ptr<const Ack_packet> ack);
2212
2213 /**
2214 * Helper of perform_accumulated_on_recv_tasks() that handles any incoming acknowledgments and
2215 * rcv_wnd updates accumulated during the currently running receive handler. Pre-conditions:
2216 * executed from perform_accumulated_on_recv_tasks(); Peer_socket::m_rcv_acked_packets and
2217 * Peer_socket::m_snd_pending_rcv_wnd (in `sock`) have been set; `sock` is in
2218 * #m_socks_with_accumulated_acks.
2219 *
2220 * If `sock` is not in Peer_socket::Int_state::S_ESTABLISHED, method does nothing except possibly log.
2221 *
2222 * @param socket_id
2223 * Connection ID (socket pair) identifying the socket in #m_socks.
2224 * @param sock
2225 * Peer socket.
2226 */
2228
2229 /**
2230 * Helper of perform_accumulated_on_recv_tasks() that categorizes the given accumulated individual acknowledgment
2231 * w/r/t legality and validity; determines the DATA packet being acked if possible; logs and record stats accordingly;
2232 * and closes underlying socket if ack is illegal.
2233 *
2234 * In all cases, all relevant (to the categorization of the given ack) information is logged and stats are recorded.
2235 *
2236 * Furthermore, if the ack is illegal, the socket is closed (while `false` is returned). Otherwise, `true` is
2237 * returned, and `*dupe_or_late` is set to indicate whether the ack is valid or not. If valid,
2238 * `*acked_pkt_it` is definitely set to indicate which DATA packet is being acked. If invalid, `*acked_pkt_it`
2239 * may or may not be set, as that information may or may not be available any longer (example of it being available:
2240 * the ack is for an earlier transmission attempt of packet P, but packet P is currently In-flight due to a
2241 * subsequent retransmission attempt).
2242 *
2243 * @param socket_id
2244 * Connection ID (socket pair) identifying the socket in #m_socks.
2245 * @param sock
2246 * Peer socket.
2247 * @param ack
2248 * Individual acknowledgment being categorized.
2249 * @param dupe_or_late
2250 * Set to false if ack refers to currently In-flight instance of a packet; true if no longer In-flight
2251 * (late = considered Dropped laready; duplicate = was acked before); untouched if `false` returned.
2252 * @param acked_pkt_it
2253 * Set to point into Peer_socket::m_snd_flying_pkts_by_sent_when that is being acked if `!*dupe_or_late`,
2254 * or if `*dupe_or_late` but the acked packet is still known; set to `end()` a/k/a `past_oldest()`
2255 * otherwise; untouched if `false`
2256 * returned.
2257 * @return `false` if and only if the ack is sufficiently invalid to have made this method close the socket.
2258 */
2261 bool* dupe_or_late, Peer_socket::Sent_pkt_ordered_by_when_iter* acked_pkt_it);
2262
2263 /**
2264 * Helper of perform_accumulated_on_recv_tasks() that computes the RTT implied by a given individual acknowledgment.
2265 * In addition to returning the RTT, note the convenience out-param.
2266 *
2267 * @param flying_pkt
2268 * The In-flight DATA packet to which the ack pertains.
2269 * @param time_now
2270 * The current time to use for the RTT computation (not using value within to allow for caller to simulate
2271 * simultaneity between nearby RTT computations).
2272 * @param ack
2273 * Individual acknowledgment being categorized.
2274 * @param sent_when
2275 * This out-param is set to point within Peer_socket::m_snd_flying_pkts_by_sent_when's `Sent_when`
2276 * structure pertaining to the DATA packet send attempt to which `ack` refers.
2277 * @return The RTT. May be zero.
2278 */
2280 const Fine_time_pt& time_now,
2282 const Peer_socket::Sent_packet::Sent_when** sent_when) const;
2283
2284 /**
2285 * Handles a just-computed new RTT (round trip time) measurement for an individual packet earlier
2286 * sent: updates smoothed RTT, DTO, and anything else relevant.
2287 *
2288 * @param sock
2289 * Peer socket in Peer_socket::Int_state::S_ESTABLISHED.
2290 * @param round_trip_time
2291 * The RTT just computed, with as much resolution as is available.
2292 */
2293 void new_round_trip_time_sample(Peer_socket::Ptr sock, Fine_duration round_trip_time);
2294
2295 /**
2296 * Helper of perform_accumulated_on_recv_tasks() that determines the range of In-flight packets that should be
2297 * Dropped due to given individual acks that have just been processed; and updates the relevant `m_acks_after_me`
2298 * members in the socket.
2299 *
2300 * Logging is minimal, and no stats are recorded. However, see associated drop_pkts_on_acks() method.
2301 *
2302 * Peer_socket::Sent_packet::m_acks_after_me data members, as documented, are incremented where relevant based
2303 * on the just-processed acks in `flying_now_acked_pkts`.
2304 *
2305 * Finally, the following In-flight packets must be considered Dropped due to acks:
2306 * - The packet referred to by the returned iterator into Peer_socket::m_snd_flying_pkts_by_sent_when.
2307 * - All packets contained in the same structure appearing later in it (i.e., sent out earlier), up to
2308 * `past_oldest()` (a/k/a `end()`).
2309 *
2310 * Note that this method does not actually perform the various tasks: it only updates `m_acks_after_me` and
2311 * computes/returns the start of the to-be-Dropped range. See drop_pkts_on_acks() for the actual dropping.
2312 *
2313 * @param sock
2314 * Peer socket.
2315 * @param flying_now_acked_pkts
2316 * The individual DATA packet send attempts acks of which have just been processed.
2317 * The Peer_socket::Sent_packet (and within it, the Peer_socket::Sent_packet::Sent_when) with the order ID
2318 * P, where P is in `flying_now_acked_pkts`, must be in Peer_socket::m_snd_flying_pkts_by_sent_when.
2319 * @return Iterator into `sock->m_snd_flying_pkts_by_sent_when` indicating the latest-sent packet that should
2320 * be Dropped due to acks; `past_oldest()` a/k/a `end()` if none should be so Dropped.
2321 */
2324 const boost::unordered_set<Peer_socket::order_num_t>& flying_now_acked_pkts);
2325
2326 /**
2327 * Helper of perform_accumulated_on_recv_tasks() that acts on the determination made by
2328 * categorize_pkts_as_dropped_on_acks().
2329 *
2330 * In all cases, all relevant (to the categorization of the In-flight packets as Dropped) information is logged
2331 * and stats are recorded.
2332 *
2333 * This acts, or gathers information necessary to act, on the determination by categorize_pkts_as_dropped_on_acks()
2334 * that a certain range of In-flight packets should be Dropped due to excess acks of packets sent before them.
2335 * Namely:
2336 * - `*cong_ctl_dropped_...` are set to the values to report congestion control as part of a new loss event.
2337 * - `*dropped_...` are set to values that indicate totals w/r/t the packets Dropped (regardless of whether it's
2338 * a new or existing loss event).
2339 * - `*pkts_marked_to_drop` are loaded with the Peer_socket::Sent_packet::Sent_when::m_order_num order IDs
2340 * specifying the Dropped packets.
2341 * - `sock` members `m_snd_flying_pkts*` and related are updated, meaning the newly Dropped packets are removed.
2342 * - On the other hand, if retransmission is on, Peer_socket::m_snd_rexmit_q is pushed onto, gaining the
2343 * just-Dropped packets to retransmit.
2344 * - `true` is returned.
2345 *
2346 * However, if it is determined that a retransmission placed onto `sock->m_snd_rexmit_q` would indicate one
2347 * retransmission too many, the socket is closed, and `false` is returned.
2348 *
2349 * @param sock
2350 * Peer socket.
2351 * @param last_dropped_pkt_it
2352 * Return value of of categorize_pkts_as_dropped_on_acks().
2353 * @param cong_ctl_dropped_pkts
2354 * Will be set to total # of packets marked as Dropped to report to congestion control as part of
2355 * a loss event (`<= *dropped_pkts`).
2356 * @param cong_ctl_dropped_bytes
2357 * Total data size corresponding to `cong_ctl_dropped_pkts` (`<= *dropped_bytes)`).
2358 * @param dropped_pkts
2359 * Will be set to total # of packets marked as Dropped by this method.
2360 * @param dropped_bytes
2361 * Total data size corresponding to `dropped_pkts`.
2362 * @param pkts_marked_to_drop
2363 * Will be filled with packet IDs (`sock->m_snd_flying_pkts_by_sent_when[...]->m_sent_when->m_order_num`)
2364 * of the packets marked dropped by this method. Results undefined unless empty at method start.
2365 * @return `true` normally; `false` if too many retransmissions detected, and thus `sock` was closed.
2366 */
2368 const Peer_socket::Sent_pkt_ordered_by_when_iter& last_dropped_pkt_it,
2369 size_t* cong_ctl_dropped_pkts, size_t* cong_ctl_dropped_bytes,
2370 size_t* dropped_pkts, size_t* dropped_bytes,
2371 std::vector<Peer_socket::order_num_t>* pkts_marked_to_drop);
2372
2373 /**
2374 * Helper of handle_accumulated_acks() that logs the about-to-be-handled accumulated individual acknowledgments.
2375 *
2376 * @param sock
2377 * Peer socket with 0 or more accumulated acks recorded.
2378 */
2380
2381 /**
2382 * Handles a Drop_timer (Peer_socket::m_snd_drop_timer) event in ESTABLISHED state by dropping the specified
2383 * packets. To be executed as a Drop_timer callback.
2384 *
2385 * @param sock
2386 * Peer socket is Peer_socket::Int_state::S_ESTABLISHED with at least one In-flight sent packet.
2387 * @param drop_all_packets
2388 * If `true`, will consider all packets Dropped. If `false`, will consider only the earliest
2389 * In-flight packet dropped.
2390 */
2391 void drop_timer_action(Peer_socket::Ptr sock, bool drop_all_packets);
2392
2393 /**
2394 * Logs TRACE or DATA messages thats show the detailed state of the sending sequence number space.
2395 * Quite slow if DATA log level is enabled or `force_verbose_info_logging` is `true`.
2396 *
2397 * @param sock
2398 * Socket whose data to log.
2399 * @param force_verbose_info_logging
2400 * Similar to same argument in log_rcv_window().
2401 */
2402 void log_snd_window(Peer_socket::Const_ptr sock, bool force_verbose_info_logging = false) const;
2403
2404 /**
2405 * Thread W implementation of connect(). Performs all the needed work up to waiting for network
2406 * traffic, gives the resulting Peer_socket to the user thread, and signals that user thread.
2407 *
2408 * Pre-condition: We're in thread W; thread U != W is waiting for us to return having set `*sock`. Post-condition:
2409 * `*sock` contains a Peer_socket::Ptr in an OPEN+CONNECTING state if `!(Peer_socket::m_disconnect_cause)`
2410 * for `*sock`; otherwise an error occurred, and that error is Peer_socket::m_disconnect_cause (in `*sock`).
2411 *
2412 * @param to
2413 * See connect().
2414 * @param serialized_metadata
2415 * Serialized metadata to provide to the peer when the connection is being established.
2416 * @param opts
2417 * See connect().
2418 * @param sock
2419 * `*sock` shall be set to the resulting new Peer_socket. Check `(*sock)->m_disconnect_cause`.
2420 */
2421 void connect_worker(const Remote_endpoint& to,
2422 const boost::asio::const_buffer& serialized_metadata,
2423 const Peer_socket_options* opts,
2424 Peer_socket::Ptr* sock);
2425
2426 /**
2427 * Implementation core of `sync_connect*()` that gets rid of templated or missing arguments thereof.
2428 *
2429 * E.g., the API would wrap this and supply a Fine_duration instead of generic `duration`; and supply
2430 * `Fine_duration::max()` if user omitted the timeout argument. Code bloat and possible circular definition issues
2431 * are among the reasons for this "de-templating" pattern.
2432 *
2433 * @param to
2434 * See connect().
2435 * @param max_wait
2436 * See the public `sync_connect(timeout)`. `"duration<Rep, Period>::max()"` maps to the value
2437 * `Fine_duration::max()` for this argument.
2438 * @param serialized_metadata
2439 * See connect_with_metadata().
2440 * @param err_code
2441 * See sync_connect().
2442 * @param opts
2443 * See connect().
2444 * @return See sync_connect().
2445 */
2447 const boost::asio::const_buffer& serialized_metadata,
2448 Error_code* err_code,
2449 const Peer_socket_options* opts);
2450
2451 /**
2452 * Assuming we've just sent SYN or SYN_ACK, sets up an asynchronous scheduled task to fire within some
2453 * amount of time, so that we may try the SYN[_ACK] again if we don't get the acknowledgement by
2454 * then (or we may close socket after too many such retries). If `initial` is `true`, an overall
2455 * connection timeout scheduled task is also set up, to trigger the aforementioned close on timeout.
2456 *
2457 * @param socket_id
2458 * Connection ID (socket pair) identifying the socket in #m_socks.
2459 * @param sock
2460 * Peer socket in SYN_SENT or SYN_RCVD internal state.
2461 * @param initial
2462 * `true` if and only if the first SYN or SYN_ACK; otherwise it is a retry.
2463 */
2464 void setup_connection_timers(const Socket_id& socket_id, Peer_socket::Ptr sock, bool initial);
2465
2466 /**
2467 * Handles the triggering of the retransmit timer wait set up by
2468 * setup_connection_timers(); it will re-send the SYN or SYN_ACK.
2469 *
2470 * @param socket_id
2471 * Connection ID (socket pair) identifying the socket in #m_socks.
2472 * @param sock
2473 * Peer socket.
2474 */
2476
2477 /**
2478 * Cancel any timers and scheduled tasks active in the given socket. More precisely, causes for each handler
2479 * scheduled to happen in the future to be called as soon as possible with error code
2480 * `operation_aborted`. If, by the time the current handler has begun, the handler was about to be
2481 * called due the timer triggering, this method will not be able to induce `operation_aborted`.
2482 * Therefore the handler should be careful to check state and not rely on `operation_aborted`,
2483 * despite this method.
2484 *
2485 * Update: The caveats in previous paragraph do not apply to scheduled tasks (`util::schedule_task_*()`).
2486 * Canceling such tasks (which this method also does) prevents their handlers from running.
2487 *
2488 * @param sock
2489 * Socket whose timers/scheduled tasks to abort.
2490 */
2492
2493 /**
2494 * Creates a new Drop Timer and saves it to `sock->m_snd_drop_timer`. Pre-condition: `m_int_state ==
2495 * S_ESTABLISHED`, and `sock->m_snd_drop_timer` is null.
2496 *
2497 * @param socket_id
2498 * Connection ID (socket pair) identifying the socket in #m_socks.
2499 * @param sock
2500 * Socket that just entered ESTABLISHED state.
2501 */
2503
2504 /**
2505 * Implementation of non-blocking `sock->close_abruptly()` for socket `sock` in all cases except when
2506 * `sock->state() == State::S_CLOSED`. See Peer_socket::close_abruptly() doc
2507 * header; this method is the entirety of that method's implementation after CLOSED is
2508 * eliminated as a possibility.
2509 *
2510 * Pre-conditions:
2511 * - current thread is not W;
2512 * - `sock->m_mutex` is locked and just after entering `sock->close_abruptly()`;
2513 * - no changes to `*sock` have been made since `m_mutex` was locked;
2514 * - `sock->state() == Stated::S_OPEN` (so `sock` is in #m_socks);
2515 * - `sock` has been given to user via accept() or connect() or friends.
2516 *
2517 * Post-condition (not exhaustive): `sock->m_mutex` is unlocked.
2518 *
2519 * @param sock
2520 * Socket in OPEN state.
2521 * @param err_code
2522 * See Peer_socket::close_abruptly().
2523 */
2524 void close_abruptly(Peer_socket::Ptr sock, Error_code* err_code);
2525
2526 /**
2527 * A thread W method that handles the transition of the given socket from OPEN (any sub-state)
2528 * to CLOSED (including eliminating the given Peer_socket from our data structures). For
2529 * example, if an invalid packet comes in on the socket, and we send back an RST, then we're free
2530 * to then close our side immediately, as no further communication (with the other side or the
2531 * local user) is needed. As another example, if we there is a graceful close while Receive buffer
2532 * has data, user must Receive all of it, and the final handshake must finish, and then this is called.
2533 *
2534 * @todo Graceful close not yet implemented w/r/t close_connection_immediately().
2535 *
2536 * Pre-condition: if `err_code` is failure: `sock` is in #m_socks; `sock->state() == S_OPEN` (and any
2537 * `sock->m_int_state` that corresponds to it); `err_code` contains the reason for the close.
2538 *
2539 * Pre-condition: if `err_code` is success: `sock` is in #m_socks; `sock` state is
2540 * OPEN+DISCONNECTING; `m_int_state` is CLOSED; Send and Receive buffers are empty;
2541 * `m_disconnect_cause` is not success.
2542 *
2543 * Post-condition: `sock` Receive and Send buffers are empty; `sock->state() == S_CLOSED` (and `sock`
2544 * is no longer in #m_socks or any other Node structures, directly or indirectly) with
2545 * `sock->m_disconnect_cause` set to reason for closing. Other decently memory-consuming structures
2546 * are also cleared to conserve memory.
2547 *
2548 * Any socket that is in #m_socks MUST be eventually closed using this method. No
2549 * socket that is not in #m_socks must be passed to this method. In particular, do not call this
2550 * method during connect() or handle_syn_to_listening_server().
2551 *
2552 * @param socket_id
2553 * Connection ID (socket pair) identifying the socket in #m_socks.
2554 * @param sock
2555 * Socket to close.
2556 * @param err_code
2557 * If this is not success, then it is an abrupt close, and this is why `sock` is being
2558 * abruptly closed. `m_disconnect_cause` is set accordingly and logged.
2559 * If `err_code` is failure, then: `sock` is OPEN+DISCONNECTING (graceful close), and all
2560 * criteria required for it to move so CLOSED are satisfied: internal state is CLOSED
2561 * (goodbye handshake finished), and Receive and Send buffers are empty; `m_disconnect_cause`
2562 * is already set.
2563 * @param defer_delta_check
2564 * Same meaning as in event_set_all_check_delta().
2565 */
2567 const Error_code& err_code, bool defer_delta_check);
2568
2569 /**
2570 * Helper that creates a new SYN packet object to the extent that is suitable for immediately passing to
2571 * async_sock_low_lvl_packet_send_paced(). `sock` members that reflect any data in Syn_packet must already be
2572 * saved and are not used as the source for such data.
2573 *
2574 * @param sock
2575 * See async_sock_low_lvl_packet_send().
2576 * @return Pointer to new packet object suitable for async_sock_low_lvl_packet_send_paced() without having to fill
2577 * any further data members in the object.
2578 */
2580
2581 /**
2582 * Like create_syn() but for SYN_ACK.
2583 *
2584 * @param sock
2585 * See create_syn().
2586 * @return See create_syn().
2587 */
2589
2590 /**
2591 * Helper to create, fully fill out, and asynchronously send via async_sock_low_lvl_packet_send_paced()
2592 * a SYN_ACK_ACK packet. Since rcv_wnd is advertised, Peer_socket::m_rcv_last_sent_rcv_wnd is updated for `sock`.
2593 *
2594 * @param sock
2595 * See async_sock_low_lvl_packet_send().
2596 * @param syn_ack
2597 * SYN_ACK to which the resulting SYN_ACK_ACK is the reply.
2598 */
2600 boost::shared_ptr<const Syn_ack_packet>& syn_ack);
2601
2602 /**
2603 * Asynchronously send RST to the other side of the given socket and
2604 * close_connection_immediately().
2605 *
2606 * @param socket_id
2607 * See close_connection_immediately().
2608 * @param sock
2609 * See close_connection_immediately().
2610 * @param err_code
2611 * See close_connection_immediately().
2612 * @param defer_delta_check
2613 * Same meaning as in event_set_all_check_delta().
2614 */
2616 const Error_code& err_code, bool defer_delta_check);
2617
2618 /**
2619 * Implementation of non-blocking `sock->send()` for socket `sock` in all cases except when
2620 * `sock->state() == State::S_CLOSED`.
2621 *
2622 * Pre-conditions:
2623 * - current thread is not W;
2624 * - `sock->m_mutex` is locked and after entering `sock->[sync_]send()`;
2625 * - no changes to `*sock` have been made since `m_mutex` was locked;
2626 * - `sock->state() == State::S_OPEN` (so `sock` is in #m_socks);
2627 * - `snd_buf_feed_func is as described below.
2628 *
2629 * This method completes the functionality of `sock->send()`.
2630 *
2631 * @see Important: see giant comment inside Node::send() for overall design and how send_worker()
2632 * fits into it.
2633 * @param sock
2634 * Socket, which must be in #m_socks, on which `[sync_]send()` was called.
2635 * @param snd_buf_feed_func
2636 * Pointer to function with signature `size_t fn(size_t x)` that will perform
2637 * `sock->m_snd_buf.feed_bufs_copy(...)` call with `max_data_size == X`, which will feed the
2638 * data the user wants to `sock->send()` into `sock->m_snd_buf`, and return the return value
2639 * of that call (which indicates how many bytes the call was able to fit into `m_snd_buf`).
2640 * Doing it this way prevents this Node::send() from being a template, which prevents
2641 * circular dependency unpleasantness. See Peer_socket::send() for details.
2642 * @param err_code
2643 * See Peer_socket::send().
2644 * @return See Peer_socket::send().
2645 */
2646 size_t send(Peer_socket::Ptr sock,
2647 const Function<size_t (size_t max_data_size)>& snd_buf_feed_func,
2648 Error_code* err_code);
2649
2650 /**
2651 * Returns `true` if and only if calling `sock->send()` with at least some arguments would return
2652 * either non-zero (i.e., successfully enqueued data to send) or zero and an error (but not
2653 * zero and NO error). `sock` will be locked and unlocked; safe to call from any thread.
2654 *
2655 * @param sock_as_any
2656 * Socket to examine, as an `any` wrapping a Peer_socket::Ptr.
2657 * @return See above.
2658 */
2659 bool sock_is_writable(const boost::any& sock_as_any) const;
2660
2661 /**
2662 * Helper placed by send() onto W to invoke send_worker() but ensures that the socket has not
2663 * entered some state such that sending data is not possible and no longer going to be possible.
2664 *
2665 * Example: `send(sock)` runs while `sock` is in ESTABLISHED state; queues up
2666 * send_worker_check_state() on thread W; thread W detects a connection reset and moves `sock` to
2667 * CLOSED; send_worker_check_state() gets its turn on thread W; detects state is now CLOSED and
2668 * returns without doing anything.
2669 *
2670 * @see Important: see giant comment inside Node::send() for overall design and how send_worker()
2671 * fits into it.
2672 * @param sock
2673 * Socket on which to possibly send low-level packets.
2674 */
2676
2677 /**
2678 * Thread W implemention of send(): synchronously or asynchronously send the contents of
2679 * `sock->m_snd_buf` to the other side. This locks the socket and examines `m_snd_buf`. If a low-level
2680 * UDP packet cannot be produced from the front of `m_snd_buf` (i.e., not enough data in `m_snd_buf`),
2681 * then there is nothing to do. Otherwise, determines whether network conditions (e.g.,
2682 * congestion control) allow for 1 or more such packets to be sent. If not, then there is nothing
2683 * to do. Otherwise (if 1 or more packets can be sent), 1 or more packets are sent and removed
2684 * from `sock->m_snd_buf`. Finally, `m_snd_buf` is unlocked.
2685 *
2686 * Pre-condition: `sock->m_int_state == S_ESTABLISHED`. @todo Are there other states where sending
2687 * DATA packets is OK? If so it would be during graceful termination, if we implement it. See
2688 * send_worker() for contedt for this to-do.
2689 *
2690 * @see Important: see giant comment inside Node::send() for overall design and how send_worker()
2691 * fits into it.
2692 * @param sock
2693 * Socket on which to possibly send low-level packets.
2694 * @param defer_delta_check
2695 * Same meaning as in event_set_all_check_delta().
2696 */
2697 void send_worker(Peer_socket::Ptr sock, bool defer_delta_check);
2698
2699 /**
2700 * Answers the perennial question of congestion and flow control: assuming there is a DATA packet
2701 * to send to the other side on the given socket, should we do so at this moment? Over a perfect
2702 * link and with a perfect receiver, this would always return true, and we would always send every
2703 * packet as soon as we could make it. As it is, some congestion control algorithm is used here
2704 * to determine if the link should be able to handle a packet, and rcv_wnd is used to determine if
2705 * the receive would be able to buffer a packet if it did arrive.
2706 *
2707 * @param sock
2708 * Socket for which we answer the question.
2709 * @return `true` if should send; `false` if should wait until it becomes `true` and THEN send.
2710 */
2711 bool can_send(Peer_socket::Const_ptr sock) const;
2712
2713 /**
2714 * Implementation of non-blocking sock->receive() for socket `sock` in all cases except when
2715 * `sock->state() == State::S_CLOSED`.
2716 *
2717 * Pre-conditions:
2718 * - current thread is not W;
2719 * - `sock->m_mutex` is locked and just after entering `sock->receive()`;
2720 * - no changes to `*sock` have been made since `m_mutex` was locked;
2721 * - `sock->state() == Stated::S_OPEN` (so `sock` is in #m_socks);
2722 * - `rcv_buf_feed_func` is as described below.
2723 *
2724 * This method completes the functionality of `sock->receive()`.
2725 *
2726 * @param sock
2727 * Socket, which must be in #m_socks, on which `receive()` was called.
2728 * @param rcv_buf_consume_func
2729 * Pointer to function with signature `size_t fn()` that will perform
2730 * `sock->m_rcv_buf.consume_bufs_copy(...)` call, which will consume data from `m_rcv_buf`,
2731 * and return the return value of that call (which indicates how many bytes
2732 * Socket_buffer::consume_bufs_copy() was able to fit into the user's data structure). Doing it this way
2733 * prevents this Node::receive() from being a template, which prevents circular dependency
2734 * unpleasantness. See Peer_socket::receive() for details.
2735 * @param err_code
2736 * See Peer_socket::receive().
2737 * @return See Peer_socket::receive().
2738 */
2739 size_t receive(Peer_socket::Ptr sock,
2740 const Function<size_t ()>& rcv_buf_consume_func,
2741 Error_code* err_code);
2742
2743 /**
2744 * Returns `true` if and only if calling sock->receive() with at least some arguments would return
2745 * either non-zero (i.e., successfully dequeued received data) or zero and an error (but not
2746 * zero and NO error). `sock` will be locked and unlocked; safe to call from any thread.
2747 *
2748 * @param sock_as_any
2749 * Socket to examine, as an `any` wrapping a Peer_socket::Ptr.
2750 * @return See above.
2751 */
2752 bool sock_is_readable(const boost::any& sock_as_any) const;
2753
2754 /**
2755 * Placed by receive() onto W if it has dequeued data from Receive buffer and given it to the
2756 * user, which would free up space in the Receive buffer, which *possibly* should result in a
2757 * window update sent to the server, so that it knows it can now send more data.
2758 *
2759 * @see Node::receive().
2760 * @param sock
2761 * Socket (whose state is ESTABLISHED or later).
2762 */
2764
2765 /**
2766 * receive_wnd_updated() helper that continues rcv_wnd recovery: that is, sends unsolicited ACK
2767 * with a rcv_wnd advertisement only and schedules the next iteration of a timer to have this
2768 * occur again, unless that timer is canceled due to too long a recovery phase or DATA packets
2769 * arriving from the other side.
2770 *
2771 * @param sock
2772 * See receive_wnd_updated().
2773 * @param rcv_wnd
2774 * The rcv_wnd (free Receive buffer space) to advertise to the other side.
2775 */
2776 void async_rcv_wnd_recovery(Peer_socket::Ptr sock, size_t rcv_wnd);
2777
2778 /**
2779 * Pertaining to the async_rcv_wnd_recovery() mechanism, this handles the event that we have
2780 * received an acceptable (either into Receive buffer or reassembly queue) DATA packet from the
2781 * other side. If we are currently in rcv_wnd recovery, this signifies the recovery "worked" --
2782 * the sender is sending data again -- so we can now end this phase.
2783 *
2784 * @param sock
2785 * See receive_wnd_updated().
2786 */
2788
2789 /**
2790 * Computes and returns the currently correct rcv_wnd value; that is the amount of space free in
2791 * Receive buffer for the given socket. This may only be called from thread W.
2792 *
2793 * @param sock
2794 * A socket.
2795 * @return See above.
2796 */
2797 size_t sock_rcv_wnd(Peer_socket::Const_ptr sock) const;
2798
2799 /**
2800 * Placed by receive() onto W during a graceful close, after the Receive buffer had been emptied
2801 * by the user; determines whether the socket can now proceed to
2802 * `Peer_socket::m_state == Peer_socket::State::S_CLOSED`
2803 * and be removed from the Node.
2804 *
2805 * @see Node::receive().
2806 * @param sock
2807 * Socket which may possibly now move to `m_state == S_CLOSED`.
2808 */
2810
2811 /**
2812 * Sends a low-level ACK packet, with all accumulated in Peer_socket::m_rcv_pending_acks of `sock` individual packet
2813 * acknowledgments, to the other side's UDP endpoint. If the pending acknowledgments don't fit
2814 * into one ACK, more ACKs are generated and sent as necessary. If there is an error sending or
2815 * preparing to send, `sock` is closed abruptly (close_connection_immediately()).
2816 *
2817 * This may be called either directly or by boost.asio due to delayed ACK timer being triggered.
2818 * If `sock` is not in Peer_socket::Int_state::S_ESTABLISHED, this does nothing except possibly logging.
2819 *
2820 * @param sock
2821 * Socket the remote side of which will get the RST. Method is basically a NOOP unless
2822 * state is Peer_socket::Int_state::S_ESTABLISHED.
2823 * @param sys_err_code
2824 * If invoked via timer trigger, this is boost.asio's error code. If invoked directly,
2825 * this should be set to the default (success). Value is handled as follows: assuming
2826 * ESTABLISHED state: `operation_aborted` => NOOP; success or any other error => attempt to
2827 * send ACK(s).
2828 */
2829 void async_low_lvl_ack_send(Peer_socket::Ptr sock, const Error_code& sys_err_code = Error_code());
2830
2831 /**
2832 * Return `true` if and only if there are enough data either in Peer_socket::m_snd_rexmit_q of `sock` (if
2833 * retransmission is on) or in Peer_socket::m_snd_buf of `sock` to send a DATA packet to the other
2834 * side.
2835 *
2836 * Pre-condition: `sock->m_mutex` is locked.
2837 *
2838 * @param sock
2839 * Socket whose retransmission queue and Send buffer to examine.
2840 * @return See above.
2841 */
2842 bool snd_deqable(Peer_socket::Const_ptr sock) const;
2843
2844 /**
2845 * Return `true` if and only if there is enough free space in Peer_socket::m_snd_buf of `sock` to enqueue any given
2846 * atomic piece of user data.
2847 *
2848 * Pre-condition: `sock->m_mutex` is locked.
2849 *
2850 * Currently this simply means that there is space for at least max-block-size bytes (i.e., one
2851 * maximally large block) in `sock->m_snd_buf`.
2852 *
2853 * Design rationale for the latter: See code.
2854 *
2855 * @param sock
2856 * Socket whose Send buffer to examine.
2857 * @return See above.
2858 */
2859 bool snd_buf_enqable(Peer_socket::Const_ptr sock) const;
2860
2861 /**
2862 * Return true if and only if there are enough data in Peer_socket::m_rcv_buf of `sock` to give the user some
2863 * data in a Peer_socket::receive() call.
2864 *
2865 * Pre-condition: `sock->m_mutex` is locked.
2866 *
2867 * Currently this simply means that there is at least 1 block of data in `m_rcv_buf`.
2868 *
2869 * Design rationale: see snd_buf_deqable().
2870 *
2871 * @param sock
2872 * Socket whose Receive buffer to examine.
2873 * @return See above.
2874 */
2875 bool rcv_buf_deqable(Peer_socket::Const_ptr sock) const;
2876
2877 /**
2878 * Sets internal state of given socket to the given state and logs a TRACE message about it.
2879 * Should only be run from thread W; performs no locking.
2880 *
2881 * @param sock
2882 * Socket under consideration.
2883 * @param new_state
2884 * New state.
2885 */
2887
2888 /**
2889 * Sets Peer_socket::m_state and Peer_socket::m_open_sub_state. If moving to Peer_socket::State::S_CLOSED, resets
2890 * the required data to their "undefined" values (e.g., Peer_socket::m_local_port = #S_PORT_ANY). Thread-safe.
2891 *
2892 * @warning Only set `state` = `S_CLOSED` if no more data are in Receive buffer, so that the
2893 * user can get those data before `S_CLOSED` state. See Peer_socket::State::S_DISCONNECTING.
2894
2895 * @param sock
2896 * Socket under consideration.
2897 * @param state
2898 * New Peer_socket::m_state.
2899 * @param open_sub_state
2900 * Ignored if `state != S_OPEN`; otherwise the new value for Peer_socket::m_open_sub_state.
2901 */
2903 Peer_socket::State state,
2905
2906 /**
2907 * Records that thread W shows underlying connection is broken (graceful termination, or error)
2908 * and sets Peer_socket::m_disconnect_cause and Peer_socket::m_state, Peer_socket::m_open_sub_state accordingly.
2909 * Optionally also empties the Send and Receive buffers and any other decently memory-consuming structures.
2910 * Thread-safe.
2911 *
2912 * So the mutually exclusive closure scenarios are:
2913 * - `sock_disconnect_detected(sock, err_code, false); ...; sock_disconnect_completed(sock);`
2914 * Graceful close initiated; ...buffers emptied...; graceful close completed.
2915 * - `sock_disconnect_detected(sock, err_code, true);`
2916 * Abrupt close, or graceful close when the buffers already happen to be empty.
2917 *
2918 * @param sock
2919 * Socket under consideration.
2920 * @param disconnect_cause
2921 * The cause of the disconnect.
2922 * @param close
2923 * If `true`, the target public state should be the super-final `S_CLOSED`, and the Send and
2924 * Receive buffers are cleared; if `false`, the target public state should be the ominous
2925 * `S_OPEN`+`S_DISCONNECTING`, and the buffers are left alone. The caller's responsibility is
2926 * to decide which one it is, but `true` is typically either for an abrupt close (e.g.,
2927 * RST) or for a graceful close when buffers are empty; while `false` is typically for a
2928 * graceful close before buffers are empty, so that the user can get Receive buffer, and
2929 * the Node can send out Send buffer.
2930 */
2932 const Error_code& disconnect_cause, bool close);
2933
2934 /**
2935 * While in `S_OPEN`+`S_DISCONNECTING` state (i.e., after beginning a graceful close with
2936 * `sock_disconnect_detected(..., false)`, moves the socket to `S_CLOSED` state and clears Receive/Send
2937 * buffers and any other decently memory-consuming structures.
2938 *
2939 * Pre-conditions: state is `S_OPEN`+`S_DISCONNECTING`; Peer_socket::m_disconnect_cause is set to non-success
2940 * value.
2941 *
2942 * @param sock
2943 * Socket under consideration.
2944 */
2946
2947 /**
2948 * Helper that clears all non-O(1)-space data structures stored inside `sock`. Intended to be
2949 * called from `sock_disconnect_*()`, not anywhere else. Pre-condition: `sock->m_mutex` is
2950 * locked.
2951 *
2952 * @param sock
2953 * Socket under consideration.
2954 */
2956
2957 /**
2958 * Analogous to validate_options() but checks per-socket options instead of per-Node
2959 * options.
2960 *
2961 * `*prev_opts` is replaced with `opts`. Leave `prev_opts` as null unless an
2962 * existing Peer_socket's options are being changed via Peer_socket::set_options(). Otherwise a
2963 * Node_options::m_dyn_sock_opts Peer_socket_options is being changed, and that is
2964 * always allowed (since if a per-socket option were not dynamic in that way, it would simply be a
2965 * per-Node option instead).
2966 *
2967 * @param opts
2968 * New option values to validate.
2969 * @param prev_opts
2970 * null if called from constructor; `&sock->m_opts` if called from sock->set_options().
2971 * Used to ensure no static per-socket option is being changed.
2972 * @param err_code
2973 * After return, `*err_code` is success or: error::Code::S_OPTION_CHECK_FAILED,
2974 * error::Code::S_STATIC_OPTION_CHANGED.
2975 * If `!err_code`, error::Runtime_error() with that #Error_code is thrown instead.
2976 * @return `true` on success, `false` on validation error.
2977 */
2978 bool sock_validate_options(const Peer_socket_options& opts, const Peer_socket_options* prev_opts,
2979 Error_code* err_code) const;
2980
2981 /**
2982 * Thread W implementation of sock->set_options(). Performs all the needed work to complete
2983 * `sock->set_options()` call.
2984 *
2985 * Pre-condition: `sock->state()` is not Peer_socket::State::S_CLOSED.
2986 *
2987 * @param sock
2988 * See Peer_socket::set_options().
2989 * @param opts
2990 * See Peer_socket::set_options().
2991 * @param err_code
2992 * See Peer_socket::set_options().
2993 * @return See Peer_socket::set_options().
2994 */
2995 bool sock_set_options(Peer_socket::Ptr sock, const Peer_socket_options& opts, Error_code* err_code);
2996
2997 /**
2998 * Implementation of `sock->info()` for socket `sock` in all cases except when
2999 * `sock->state() == Peer_socket::State::S_CLOSED`. See Peer_socket::info() doc header; this method is the entirety
3000 * of that method's implementation after `S_CLOSED` is eliminated as a possibility.
3001 *
3002 * Pre-conditions:
3003 * - current thread is not W;
3004 * - `sock->m_mutex` is locked and just after entering `sock->info()`;
3005 * - no changes to *sock have been made since `m_mutex` was locked;
3006 * - `sock->state() == Peer_socket::State::S_OPEN`.
3007 *
3008 * Post-condition (not exhaustive): `sock->m_mutex` is unlocked.
3009 *
3010 * @param sock
3011 * Socket in consideration.
3012 * @return See Peer_socket::info().
3013 */
3015
3016 /**
3017 * Given a Peer_socket, copies all stats info (as available via Peer_socket::info()) from various
3018 * structures into the given stats `struct`. This can then be logged, given to the user, etc.
3019 *
3020 * This should be run from thread W only.
3021 *
3022 * @param sock
3023 * Socket in consideration. It can be in any state, but see above.
3024 * @param stats
3025 * All members (direct or indirect) of this `struct` will be filled.
3026 */
3028
3029 /**
3030 * Constructs the socket pair (connection ID) for the given socket. For performance, try not to
3031 * use this, as this is usually already available in most points in Node code and can be passed
3032 * around to places where it's not. However there are situations when one must reconstruct it
3033 * from a Peer_socket::Ptr alone.
3034 *
3035 * Call from thread W only.
3036 *
3037 * @todo Could make it a Socket_id constructor instead.
3038 * @param sock
3039 * Source socket.
3040 * @return Ditto.
3041 */
3043
3044 /**
3045 * Obtain the sequence number for the datum just past the last (latest) In-flight (i.e., sent but
3046 * neither Acknowledged nor Dropped) packet, for the given socket. If there are no In-flight
3047 * packets, returns the default Sequence_number -- which is < all other Sequence_numbers.
3048 *
3049 * Note that "last" in this case refers to position in the sequence number space, not time at which packets
3050 * are sent. (A packet with a given Sequence_number may be sent several times due to retransmission.)
3051 *
3052 * @param sock
3053 * Socket whose In-flight packets to examine.
3054 * @return See above.
3055 */
3057
3058 /**
3059 * Erases (for example if considered Acknowledged or Dropped) a packet `struct` from the
3060 * "scoreboard" (Peer_socket::m_snd_flying_pkts_by_sent_when) and adjusts all related structures.
3061 *
3062 * Note: It does NOT inform `sock->m_snd_drop_timer` (namely calling Drop_timer::on_packet_no_longer_in_flight()).
3063 * This is left to the caller; in particular because the timing may not be appropriate for what such a
3064 * call might trigger (e.g., on-Drop-Timeout actions such as massive retransmission).
3065 *
3066 * @param sock
3067 * Socket to modify.
3068 * @param pkt_it
3069 * Iterator into `m_snd_flying_pkts_by_sent_when` which will be deleted.
3070 */
3072
3073 /**
3074 * Adds a new packet `struct` (presumably representing packet to be sent shortly) to the
3075 * "scoreboard" (Peer_socket::m_snd_flying_pkts_by_sent_when) and adjusts all related structures as applicable. Note,
3076 * however, that mark_data_packet_sent() is NOT called, because we should do that when the DATA
3077 * packet is actually sent (after pacing, if any).
3078 *
3079 * @param sock
3080 * Socket to modify.
3081 * @param seq_num
3082 * The first sequence number of the DATA packet.
3083 * @param sent_pkt
3084 * Ref-counted pointer to new packet `struct`.
3085 */
3087 const Sequence_number& seq_num,
3089
3090 /**
3091 * Updates Peer_socket::m_snd_flying_bytes according to an operation (add packets, remove packets)
3092 * caller is about to undertake or has just undertaken on Peer_socket::m_snd_flying_pkts_by_sent_when (= the
3093 * scoreboard). Call this WHENEVER `m_snd_flying_pkts_by_sent_when` is about to be modified (if erasing) or
3094 * has just been modified (if adding) to ensure `m_snd_flying_bytes` is updated accordingly.
3095 *
3096 * @warning This has strong implications for congestion control! Do not forget.
3097 * @param sock
3098 * Socket to modify.
3099 * @param pkt_begin
3100 * Iterator to first packet that was added or will be removed.
3101 * @param pkt_end
3102 * Iterator one past the last packet that was added or will be removed.
3103 * @param added
3104 * If `true`, the given range of packets was just added (e.g., Sent); if `false`, the given
3105 * range of packets is about to be removed (e.g., Dropped or Acknowledged).
3106 */
3110 bool added);
3111
3112 /**
3113 * Checks whether the given sent packet has been retransmitted the maximum number of allowed
3114 * times; if so then performs rst_and_close_connection_immediately() and returns `false`; otherwise
3115 * returns `true`.
3116 *
3117 * @param sock
3118 * Socket to check and possibly close.
3119 * @param pkt_it
3120 * Iterator info Peer_socket::m_snd_flying_pkts_by_sent_when of `sock` for packet in question. Its
3121 * `m_rexmit_id` should not yet be incremented for the potential new retransmission.
3122 * @param defer_delta_check
3123 * Same meaning as in event_set_all_check_delta().
3124 * @return See above.
3125 */
3128 bool defer_delta_check);
3129
3130 /**
3131 * Logs a verbose state report for the given socket. This is suitable for calling from
3132 * perform_regular_infrequent_tasks() and other infrequently executed spots.
3133 *
3134 * @param sock
3135 * Socket whose state to log.
3136 */
3137 void sock_log_detail(Peer_socket::Const_ptr sock) const;
3138
3139 /**
3140 * Assuming `*seq_num` points to the start of data.m_data, increments `*seq_num` to point
3141 * to the datum just past `data->m_data`.
3142 *
3143 * @param seq_num
3144 * Pointer to sequence number to increment.
3145 * @param data
3146 * DATA packet whose `m_data` to examine.
3147 */
3148 static void advance_seq_num(Sequence_number* seq_num,
3149 boost::shared_ptr<const Data_packet> data);
3150
3151 /**
3152 * Assuming `*seq_num` points to the start of some data of the given size, increments
3153 * `*seq_num` to point to the datum just past that amount of data.
3154 *
3155 * @param seq_num
3156 * Pointer to sequence number to increment.
3157 * @param data_size
3158 * Data size.
3159 */
3160 static void advance_seq_num(Sequence_number* seq_num, size_t data_size);
3161
3162 /**
3163 * Given an iterator into a Peer_socket::Sent_pkt_by_sent_when_map or Peer_socket::Recv_pkt_map, gets the range of
3164 * sequence numbers in the packet represented thereby.
3165 *
3166 * @tparam Packet_map_iter
3167 * Iterator type (`const` or otherwise) into one of the above-mentioned maps.
3168 * @param packet_it
3169 * A valid, non-`end()` iterator into such a map.
3170 * @param seq_num_start
3171 * If 0, ignored; otherwise the sequence number of the first datum in that packet is placed
3172 * there.
3173 * @param seq_num_end
3174 * If 0, ignored; otherwise the sequence number just past the last datum in that packet is
3175 * placed there.
3176 */
3177 template<typename Packet_map_iter>
3178 static void get_seq_num_range(const Packet_map_iter& packet_it,
3179 Sequence_number* seq_num_start, Sequence_number* seq_num_end);
3180
3181 /**
3182 * Returns the "order number" to use for Peer_socket::Sent_packet::Sent_when structure corresponding to the next
3183 * packet to be sent. This will be higher than the last sent packet's number. Make sure you send packets
3184 * in exactly increasing numeric order of this order number.
3185 *
3186 * 0 is reserved and never returned by this.
3187 *
3188 * @param sock
3189 * Socket to consider.
3190 * @return See above.
3191 */
3193
3194 /**
3195 * Internal factory used for ALL Peer_socket objects created by this Node (including subclasses).
3196 *
3197 * @param opts
3198 * See Peer_socket::Peer_socket().
3199 * @return Pointer to newly constructed socket.
3200 */
3201 virtual Peer_socket* sock_create(const Peer_socket_options& opts);
3202
3203 // Methods dealing with individual Server_sockets. Implementations are in server_socket.cpp.
3204
3205 /**
3206 * Implementation of non-blocking `serv->accept()` for server socket `serv` in all cases except when
3207 * `serv->state() == Server_socket::State::S_CLOSED`.
3208 *
3209 * Pre-conditions:
3210 * - current thread is not W;
3211 * - `serv->m_mutex` is locked and just after entering `serv->accept()`;
3212 * - no changes to `*serv` have been made since `m_mutex` was locked;
3213 * - `serv->state() != Server_socket::State::S_CLOSED` (so `serv` is in `m_servs`).
3214 *
3215 * This method completes the functionality of `serv->accept()`.
3216 *
3217 * @param serv
3218 * Server socket, which must be in #m_servs, on which Server_socket::accept() was called.
3219 * @param err_code
3220 * See Server_socket::accept().
3221 * @return See Server_socket::accept().
3222 */
3224
3225 /**
3226 * Returns `true` if and only if calling `serv->accept()` with at least some arguments would return
3227 * either non-null (i.e., successfully dequeued a connected socket) or null and an error (but not
3228 * null and NO error). `serv` will be locked and unlocked; safe to call from any thread.
3229 *
3230 * @param serv_as_any
3231 * Socket to examine, as an `any` wrapping a Server_socket::Ptr.
3232 * @return See above.
3233 */
3234 bool serv_is_acceptable(const boost::any& serv_as_any) const;
3235
3236 /**
3237 * Thread W implementation of listen(). Performs all the needed work, gives the resulting
3238 * Server_socket to the user thread, and signals that user thread.
3239 *
3240 * Pre-condition: We're in thread W; thread U != W is waiting for us to return having set `*serv`. Post-condition:
3241 * `*serv` contains a `Server_socket::Ptr` in a Server_socket::State::S_LISTENING state if
3242 * `!(*serv)->m_disconnect_cause`; otherwise an error occurred, and that error is `(*serv)->m_disconnect_cause`.
3243 *
3244 * @param local_port
3245 * See listen().
3246 * @param child_sock_opts
3247 * See listen().
3248 * @param serv
3249 * `*serv` shall be set to the resulting Server_socket. Check `(*serv)->m_disconnect_cause`.
3250 */
3251 void listen_worker(flow_port_t local_port, const Peer_socket_options* child_sock_opts,
3252 Server_socket::Ptr* serv);
3253
3254 /**
3255 * Handles a just-deserialized, just-demultiplexed low-level SYN packet delivered to the given
3256 * server socket. So it will hopefully create a #m_socks entry, send back a SYN_ACK, etc.
3257 *
3258 * @param serv
3259 * Server socket in LISTENING state to which this SYN was demuxed.
3260 * @param syn
3261 * Deserialized immutable SYN.
3262 * @param low_lvl_remote_endpoint
3263 * The remote Node address.
3264 * @return New socket placed into Node socket table; or `Ptr()` on error, wherein no socket was saved.
3265 */
3267 boost::shared_ptr<const Syn_packet> syn,
3268 const util::Udp_endpoint& low_lvl_remote_endpoint);
3269
3270 /**
3271 * Handles a just-deserialized, just-demultiplexed low-level SYN_ACK_ACK packet delivered to the
3272 * given peer socket in Peer_socket::Int_state::S_SYN_RCVD state. So it will hopefully finish up establishing
3273 * connection on our side.
3274 *
3275 * @param socket_id
3276 * Connection ID (socket pair) identifying the socket in #m_socks.
3277 * @param sock
3278 * Peer socket in Peer_socket::Int_state::S_SYN_RCVD.
3279 * @param syn_ack_ack
3280 * Deserialized immutable SYN_ACK_ACK.
3281 */
3283 Peer_socket::Ptr sock,
3284 boost::shared_ptr<const Syn_ack_ack_packet> syn_ack_ack);
3285
3286 /**
3287 * Handles a just-deserialized, just-demultiplexed, low-level DATA packet delivered to the given
3288 * peer socket in SYN_RCVD state. This is legitimate under loss and re-ordering conditions.
3289 * This will hopefully save the packet for later handling once we have entered ESTABLISHED state.
3290 *
3291 * @param sock
3292 * Peer socket in Peer_socket::Int_state::S_SYN_RCVD.
3293 * @param packet
3294 * Deserialized packet of type DATA.
3295 * (For performance when moving data to Receive buffer, this is modifiable.)
3296 */
3298 boost::shared_ptr<Data_packet> packet);
3299
3300 /**
3301 * Handles the transition of the given server socket from `S_LISTENING`/`S_CLOSING` to `S_CLOSED`
3302 * (including eliminating the given Peer_socket from our data structures).
3303 *
3304 * Pre-condition: there is no socket `sock` such that `sock->m_originating_serv == serv`; i.e., there
3305 * are no sockets having to do with this server that have not yet been `accept()`ed.
3306 *
3307 * Pre-condition: `serv` is in `m_servs`; `serv->state() != S_OPEN`.
3308 *
3309 * Post-condition: `serv->state() == Server_socket::State::S_CLOSED` (and `serv` is no longer in `m_servs` or any
3310 * other Node structures, directly or indirectly) with `serv->m_disconnect_cause` set to `err_code` (or see
3311 * below).
3312 *
3313 * Any server socket that is in #m_servs MUST be eventually closed using this method. No
3314 * socket that is not in #m_servs must be passed to this method. In particular, do not call this
3315 * method during listen().
3316 *
3317 * @param local_port
3318 * Flow port of the server to delete.
3319 * @param serv
3320 * Socket to close.
3321 * @param err_code
3322 * Why is it being closed? Server_socket::m_disconnect_cause is set accordingly and logged.
3323 * @param defer_delta_check
3324 * Same meaning as in event_set_all_check_delta().
3325 */
3327 const Error_code& err_code, bool defer_delta_check);
3328
3329 /**
3330 * Sets Server_socket::m_state. If moving to `S_CLOSED`, resets the required data to their "undefined" values
3331 * (e.g., `Server_socket::m_local_port = #S_PORT_ANY`). Thread-safe.
3332 *
3333 * @param serv
3334 * Server socket under consideration.
3335 * @param state
3336 * New `m_state`.
3337 */
3339
3340 /**
3341 * Records that thread W shows this socket is not to listen to incoming connections and is to
3342 * abort any not-yet-established (i.e., not yet queued) and established-but-unclaimed (i.e.,
3343 * queued) connections; and sets Server_socket::m_disconnect_cause and Server_socket::m_state in `serv` accordingly.
3344 * Thread-safe.
3345 *
3346 * @param serv
3347 * Server socket under consideration.
3348 * @param disconnect_cause
3349 * The cause of the disconnect.
3350 * @param close
3351 * If `true`, the target public state should be the super-final `S_CLOSED`; if false, the target public state
3352 * should be the ominous `S_CLOSING`. The caller's responsibility is to decide which one it
3353 * is.
3354 */
3355 void serv_close_detected(Server_socket::Ptr serv, const Error_code& disconnect_cause, bool close);
3356
3357 /**
3358 * Records that an unestablished socket `sock` (Peer_socket::Int_state::S_SYN_RCVD) has just become established
3359 * and can be `accept()`ed (Peer_socket::Int_state::S_ESTABLISHED). Moves `sock` from
3360 * Server_socket::m_connecting_socks to Server_socket::m_unaccepted_socks (in `serv`).
3361 * To be called from thread W only. Thread-safe.
3362 *
3363 * @param serv
3364 * Server socket under consideration.
3365 * @param sock
3366 * Socket that was just moved to Peer_socket::Int_state::S_ESTABLISHED.
3367 */
3369
3370 /**
3371 * Records a new (just received SYN) peer socket from the given server socket. Adds `sock` to
3372 * Server_socket::m_connecting_socks (in `serv`) and maintains the Peer_socket::m_originating_serv (in `sock`)
3373 * invariant. To be called from thread W only. Thread-safe.
3374 *
3375 * @param serv
3376 * Server that originated `sock`.
3377 * @param sock
3378 * Socket that was just moved to Peer_socket::Int_state::S_SYN_RCVD.
3379 */
3381
3382 /**
3383 * Records that a `Server_socket`-contained (i.e., currently un-established, or established but not yet accepted
3384 * by user) Peer_socket is being closed and should be removed from the given Server_socket. To be
3385 * called from thread W only. Thread-safe.
3386 *
3387 * If `sock` is not contained in `*serv`, method does nothing.
3388 *
3389 * @param serv
3390 * Server socket under consideration.
3391 * @param sock
3392 * Socket to remove (moving from `S_SYN_RCVD` or `S_ESTABLISHED` to `S_CLOSED`).
3393 */
3395
3396 /**
3397 * Internal factory used for ALL Server_socket objects created by this Node (including subclasses).
3398 *
3399 * @param child_sock_opts
3400 * See Server_socket::Server_socket().
3401 * @return Pointer to newly constructed socket.
3402 */
3403 virtual Server_socket* serv_create(const Peer_socket_options* child_sock_opts);
3404
3405 // Methods dealing with individual Peer_sockets OR Server_sockets (determined via template at compile time).
3406
3407 /**
3408 * Implementation of core *blocking* transfer methods, namely Peer_socket::sync_send(), Peer_socket::sync_receive(),
3409 * and Server_socket::sync_accept() for all cases except when `sock->state() == Peer_socket::State::S_CLOSED`.
3410 * It is heavily templated and shared among those three implementations to avoid massive
3411 * copy/pasting, since the basic pattern of the blocking wrapper around Event_set::sync_wait() and
3412 * a non-blocking operation (Peer_socket::receive(), Peer_socket::send(), Server_socket::accept(), respectively)
3413 * is the same in all cases.
3414 *
3415 * Pre-conditions:
3416 * - current thread is not W;
3417 * - `sock->m_mutex` is locked;
3418 * - no changes to `*sock` have been made since `sock->m_mutex` was locked;
3419 * - `sock->state()` is OPEN (so `sock` is in #m_socks or #m_servs, depending on socket type at compile time);
3420 * - other arguments are as described below.
3421 *
3422 * This method completes the functionality of `sock->sync_send()`, `sock->sync_receive()`, and
3423 * `sock->sync_accept()`.
3424 *
3425 * @tparam Socket
3426 * Underlying object of the transfer operation (Peer_socket or Server_socket).
3427 * @tparam Non_blocking_func_ret_type
3428 * The return type of the calling transfer operation (`size_t` or Peer_socket::Ptr).
3429 * @param sock
3430 * Socket on which user called `sync_*()`.
3431 * @param non_blocking_func
3432 * When this method believes it should attempt a non-blocking transfer op, it will execute
3433 * `non_blocking_func()`.
3434 * If `non_blocking_func.empty()`, do not call `non_blocking_func()` --
3435 * return indicating no error so far, and let them do actual operation, if they want; we just tell them it
3436 * should be ready for them. This is known
3437 * as reactor pattern mode. Otherwise, do the successful operation and then
3438 * return. This is arguably more typical.
3439 * @param would_block_ret_val
3440 * The value that `non_blocking_func()` returns to indicate it was unable to perform the
3441 * non-blocking operation (i.e., no data/sockets available).
3442 * @param ev_type
3443 * Event type applicable to the type of operation this is. See Event_set::Event_type doc header.
3444 * @param wait_until
3445 * See `max_wait` argument on the originating `sync_*()` method. This is absolute timeout time point
3446 * derived from it; zero-valued if no timeout.
3447 * @param err_code
3448 * See this argument on the originating `sync_*()` method.
3449 * However, unlike that calling method's user-facing API, the present sync_op() method
3450 * does NOT allow null `err_code` (behavior undefined if `err_code` is null).
3451 * Corollary: we will NOT throw Runtime_error().
3452 * @return The value that the calling `sync_*()` method should return to its caller.
3453 * Corner/special case: If `non_blocking_func.empty()` (a/k/a "reactor pattern" mode), then
3454 * this will always return `would_block_ret_val`; the caller shall interpret
3455 * `bool(*err_code) == false` as meaning the socket has reached the desired state in time and without
3456 * error. In that special case, as of this writing, you can't just return this return value, since it's
3457 * always a zero/null/whatever.
3458 */
3459 template<typename Socket, typename Non_blocking_func_ret_type>
3460 Non_blocking_func_ret_type sync_op(typename Socket::Ptr sock,
3461 const Function<Non_blocking_func_ret_type ()>& non_blocking_func,
3462 Non_blocking_func_ret_type would_block_ret_val,
3463 Event_set::Event_type ev_type,
3464 const Fine_time_pt& wait_until,
3465 Error_code* err_code);
3466
3467 /**
3468 * Helper method that checks whether the given Peer_socket or Server_socket is CLOSED; if so, it
3469 * sets `*err_code` to the reason it was closed (which is in `sock->m_disconnect`) and returns `false`;
3470 * otherwise it returns `true` and leaves `*err_code` untouched. This exists to improve code reuse, as
3471 * this is a frequent operation for both socket types.
3472 *
3473 * Pre- and post-conditions: `sock->m_mutex` is locked.
3474 *
3475 * @tparam Socket_ptr
3476 * Peer_socket::Ptr or Server_socket::Ptr.
3477 * @param sock
3478 * The socket in question.
3479 * @param err_code
3480 * `*err_code` is set to `sock->m_disconnect_cause` if socket is closed.
3481 * @return `true` if state is not CLOSED; otherwise `false`.
3482 */
3483 template<typename Socket_ptr>
3484 static bool ensure_sock_open(Socket_ptr sock, Error_code* err_code);
3485
3486 // Methods dealing with individual Event_sets. Implementations are in event_set.cpp.
3487
3488 /**
3489 * Implementation of Event_set::async_wait() when `Event_set::state() == Event_set::State::S_INACTIVE`.
3490 *
3491 * Pre-conditions:
3492 * - current thread is not W;
3493 * - `event_set->m_mutex` is locked and just after entering async_wait();
3494 * - no changes to `*event_set` have been made since `m_mutex` was locked;
3495 * - `event_set->state() == Event_set::State::S_INACTIVE` (so `event_set` is in #m_event_sets);
3496 * - on_event is as originally passed into async_wait().
3497 *
3498 * This method completes the functionality of `event_set->async_wait()`.
3499 *
3500 * @param event_set
3501 * Event_set in question.
3502 * @param on_event
3503 * See Event_set::async_wait().
3504 * @param err_code
3505 * See Event_set::async_wait().
3506 * @return See Event_set::async_wait().
3507 */
3508 bool event_set_async_wait(Event_set::Ptr event_set, const Event_set::Event_handler& on_event,
3509 Error_code* err_code);
3510
3511 /**
3512 * Helper placed by event_set_async_wait() onto thread W to invoke event_set_check_baseline() but first ensure
3513 * that the `Event_set event_set` has not exited Event_set::State::S_WAITING (which would make any checking for
3514 * active events nonsense). If it has exited that state, does nothing. (That situation is possible due to
3515 * concurrently deleting the overarching Node (IIRC) and maybe other similar races.)
3516 *
3517 * @param event_set
3518 * Event_set in question.
3519 */
3521 /**
3522 * Checks each desired (Event_set::m_want) event in `event_set`; any that holds true is saved into `event_set`
3523 * (Event_set::m_can). This is the exhaustive, or "baseline," check. This should only be performed when
3524 * necessary, as it is typically slower than checking individual active sockets against the
3525 * Event_set ("delta" check).
3526 *
3527 * This check is skipped if `Event_set::m_baseline_check_pending == false` (for `event_set`).
3528 *
3529 * See Event_set::async_wait() giant internal comment for context on all of the above.
3530 *
3531 * Pre-conditions: `event_set` state is Event_set::State::S_WAITING; `event_set->m_mutex` is locked.
3532 *
3533 * This method, unlike most, is intended to be called from either W or U != W. All actions it
3534 * takes are on non-W-exclusive data (namely, actions on: `event_set`; and non-W-exclusive data in
3535 * Peer_socket and Server_socket, namely their state() and Receive/Send/Accept structures).
3536 *
3537 * @param event_set
3538 * Event_set in question.
3539 * @return `true` if and only if the check was performed; `false` returned if
3540 * `!event_set->m_baseline_check_pending`.
3541 */
3543
3544 /**
3545 * Check whether given Event_set contains any active sockets (Event_set::m_can); if so, signals the user (who
3546 * previously called `async_wait()` to set all this in motion): set state back to Event_set::State::S_INACTIVE from
3547 * Event_set::State::S_WAITING; calls the handler passed to `async_wait()`; forgets handler. If no active sockets,
3548 * does nothing.
3549 *
3550 * Pre-conditions: same as event_set_check_baseline().
3551 *
3552 * @param event_set
3553 * Event_set in question.
3554 */
3556
3557 /**
3558 * For each WAITING Event_set within the Node: checks for any events that hold, and if any do
3559 * hold, signals the user (calls handler, goes to INACTIVE, etc.). The logic for how it does so
3560 * is complex. For background, please see Event_set::async_wait() giant internal comment first.
3561 * Then read on here.
3562 *
3563 * For each WAITING Event_set: If baseline check (event_set_check_baseline()) is still required
3564 * and hasn't been performed, perform it. Otherwise, for efficiency perform a "delta" check,
3565 * wherein EVERY active (for all definitions of active: Readable, Writable, Acceptable) socket
3566 * detected since the last baseline check is checked against the desired event/socket pairs in the
3567 * Event_set. Any socket in both sets (active + desired) is saved in `event_set->m_can`. If
3568 * either the baseline or delta check yields at least one active event, signal user (call handler,
3569 * go INACTIVE, etc.).
3570 *
3571 * For the delta check just described, how does it know which sockets have been active since the
3572 * last check? Answer: `Node::m_sock_events` members (NOTE: not the same as `Event_set::m_can`, though
3573 * they are related). See #m_sock_events doc header for details.
3574 *
3575 * @param defer_delta_check
3576 * Set to `true` if and only if you know, for a FACT, that within a non-blocking amount of
3577 * time `event_set_all_check_delta(false)` will be called. For example, you may know
3578 * `event_set_all_check_delta(false)` will be called within the present boost.asio handler.
3579 * Then this method will only log and not perform the actual check, deferring to the
3580 * promised `event_set_all_check_delta(false)` call, by which point more events may have been
3581 * detected in #m_sock_events.
3582 */
3583 void event_set_all_check_delta(bool defer_delta_check);
3584
3585 /**
3586 * Implementation of Event_set::close() when `Event_set::state() != Event_set::State::S_CLOSED` for `event_set`.
3587 *
3588 * Pre-conditions:
3589 * - current thread is not W;
3590 * - `event_set->m_mutex` is locked and just after entering async_wait();
3591 * - no changes to `*event_set` have been made since `m_mutex` was locked;
3592 * - `event_set->state() != Event_set::State::S_CLOSED` (so `event_set` is in #m_event_sets).
3593 *
3594 * This method completes the functionality of `event_set->close()`.
3595 *
3596 * @param event_set
3597 * Event_set in question.
3598 * @param err_code
3599 * See Event_set::close().
3600 */
3601 void event_set_close(Event_set::Ptr event_set, Error_code* err_code);
3602
3603 /**
3604 * The guts of event_set_close_worker_check_state(): same thing, but assumes
3605 * `Event_set::state() == Event_set::State::S_CLOSED`, and Event_set::m_mutex is locked (for `event_set`).
3606 * May be called directly from thread W assuming those pre-conditions holds.
3607 *
3608 * @param event_set
3609 * Event_set in question.
3610 */
3611 void event_set_close_worker(Event_set::Ptr event_set);
3612
3613 /**
3614 * Thread W implementation of interrupt_all_waits(). Performs all the needed work, which is to
3615 * trigger any WAITING Event_set objects to fire their on-event callbacks, with the Boolean argument set
3616 * to `true`, indicating interrupted wait.
3617 *
3618 * Pre-condition: We're in thread W.
3619 */
3621
3622 /**
3623 * `signal_set` handler, executed on SIGINT and SIGTERM, if user has enabled this feature:
3624 * causes interrupt_all_waits_worker() to occur on thread W.
3625 *
3626 * Pre-condition: We're in thread W [sic].
3627 *
3628 * @param sys_err_code
3629 * boost.asio error code indicating the circumstances of the callback executing.
3630 * It is unusual for this to be truthy.
3631 * @param sig_number
3632 * Signal number of the signal that was detected.
3633 */
3634 void interrupt_all_waits_internal_sig_handler(const Error_code& sys_err_code, int sig_number);
3635
3636 // Constants.
3637
3638 /**
3639 * For a given unacknowledged sent packet P, the maximum number of times any individual packet
3640 * with higher sequence numbers than P may be acknowledged before P is considered Dropped (i.e.,
3641 * we give up on it). If we enable retransmission, that would trigger Fast Retransmit, using TCP's
3642 * terminology.
3643 */
3645
3646 /**
3647 * Time interval between performing "infrequent periodic tasks," such as stat logging. This
3648 * should be large enough to ensure that the tasks being performed incur no significant processor
3649 * use.
3650 */
3652
3653 // Data.
3654
3655 /**
3656 * This Node's global set of options. Initialized at construction; can be subsequently
3657 * modified by set_options(), although only the dynamic members of this may be modified.
3658 *
3659 * Accessed from thread W and user thread U != W. Protected by #m_opts_mutex. When reading, do
3660 * NOT access without locking (which is encapsulated in opt()).
3661 */
3663
3664 /// The mutex protecting #m_opts.
3666
3667 /**
3668 * The object used to simulate stuff like packet loss and latency via local means directly in the
3669 * code. If 0, no such simulation is performed. `shared_ptr<>` used for basic auto-`delete` convenience.
3670 */
3671 boost::shared_ptr<Net_env_simulator> m_net_env_sim;
3672
3673 /**
3674 * The main loop engine, functioning in the single-threaded-but-asynchronous callback-based
3675 * "reactor" style (or is it "proactor"?). The Node constructor creates a single new thread W, which then places
3676 * some callbacks onto this guy and invoke `m_task_engine.run()`, at which point the main loop
3677 * begins in thread W.
3678 *
3679 * Thus, per boost.asio's model, any work items (functions) placed
3680 * onto #m_task_engine (e.g.: `post(m_task_engine, do_something_fn);`) will execute in thread W,
3681 * as it's the one invoking `run()` at the time -- even if the placing itself is done on some
3682 * other thread, such as a user thread U. An example of the latter is a Peer_socket::send() implementation
3683 * might write to the socket's internal Send buffer in thread U, check whether it's currently possible
3684 * to send over the wire, and if and only if the answer is yes, `post(m_task_engine, S)`, where S
3685 * is a function/functor (created via lambdas usually) that will perform the hairy needed Node/socket
3686 * work on thread W.
3687 *
3688 * All threads may access this (no mutex required, as explicitly announced in boost.asio docs).
3689 *
3690 * Adding more threads that would call `m_task_engine.run()` would create a thread pool. With "strands" one
3691 * can avoid concurrency in this situation. An intelligent combination of those two concepts can lead to efficient
3692 * multi-core use without complex and/or inefficient locking. This is non-trivial.
3693 *
3694 * @see Class Node doc header for to-do items regarding efficient multi-core use and how that relates to
3695 * using an #m_task_engine thread pool and/or strands.
3696 */
3698
3699 /**
3700 * The UDP socket used to receive low-level packets (to assemble into application layer data) and send them
3701 * (vice versa).
3702 *
3703 * Only thread W can access this.
3704 *
3705 * Access to this may be highly contentious in high-traffic situations. Since only thread W accesses this, and that
3706 * thread does the vast bulk of the work of the entire Node, at least one known problem is that the internal OS
3707 * UDP receive buffer may be exceeded, as we may not read datagrams off this socket quickly enough.
3708 *
3709 * @see Class Node doc header for to-do items regarding the aforementioned UDP receive buffer overflow problem.
3710 */
3712
3713 /**
3714 * After we bind #m_low_lvl_sock to a UDP endpoint, this is a copy of that endpoint. Thus it
3715 * should contain the actual local address and port (even if user specified 0 for the latter,
3716 * say).
3717 *
3718 * This is equal to `Udp_endpoint()` until the constructor exits. After the constructor exits, its
3719 * value never changes, therefore all threads can access it without mutex. If the constructor
3720 * fails to bind, this remains equal to `Udp_endpoint()` forever.
3721 */
3723
3724 /**
3725 * OS-reported #m_low_lvl_sock UDP receive buffer maximum size, obtained right after we
3726 * OS-set that setting and never changed subsequently. Note the OS may not respect whatever value we
3727 * passed into the OS socket option setting call, or it may respect it but only approximately.
3728 */
3730
3731 /// Stores incoming raw packet data; re-used repeatedly for possible performance gains.
3733
3734 /// Flow port space for both client and server sockets. All threads may access this.
3736
3737 /// Sequence number generator (at least to generate ISNs). Only thread W can access this.
3739
3740 /**
3741 * Random number generator for picking security tokens; seeded on time at Node construction and generates
3742 * integers from the entire range. (Not thread-safe. Use only in thread W.)
3743 */
3745
3746 /**
3747 * The peer-to-peer connections this Node is currently tracking. Their states are not Peer_socket::State::S_CLOSED.
3748 * Only thread W can access this.
3749 */
3751
3752 /**
3753 * The server sockets this Node is currently tracking. Their states are not Server_socket::State::S_CLOSED.
3754 * Only thread W can access this.
3755 */
3757
3758 /**
3759 * Every Event_set to have been returned by event_set_create() and not subsequently reached
3760 * Event_set::State::S_CLOSED. Only thread W can access this.
3761 */
3763
3764 /**
3765 * All sockets that have been detected to be "ready" (by the Event_set doc header definition) at
3766 * any point since the last time #m_sock_events's contained sets were cleared (which happens initially and after each
3767 * event_set_all_check_delta() call). EVERY piece of code in thread W to potentially set a
3768 * socket's status to "ready" (e.g.: DATA received, error detected) MUST add that socket's handle
3769 * to this data structure. This enables the Event_set machinery to efficiently but thoroughly
3770 * detect every event in which the Event_set user is interested. The theory behind this is
3771 * described in the giant comment inside Event_set::async_wait().
3772 *
3773 * This maps Event_set::Event_type `enum` members to Event_set::Sockets socket sets, exactly the same way
3774 * Event_set::m_can and Event_set::m_want are set up.
3775 *
3776 * A question arises: why use this set to store such active sockets? Why not just call
3777 * event_set_all_check_delta() EVERY time we see a socket is now Readable, etc., thus handling it right
3778 * away and not needing to store it? Answer: we could. However, we want to collect as many
3779 * possibly active events as possible, without blocking, before performing the check. That way
3780 * the user is informed of as many events as possible, instead of the very first one (when there
3781 * could be hundreds more; for example if hundreds of DATA packets have arrived simultaneously).
3782 * The theory behind this is also discussed in Event_set::async_wait() giant comment. So we
3783 * insert into #m_sock_events and defer `event_set_all_check_delta(false)` to the end of the current
3784 * boost.asio handler, since we know we won't block (sleep) until the handler exits.
3785 *
3786 * Only thread W can access this.
3787 */
3789
3790 /**
3791 * Within a given low_lvl_recv_and_handle() or async part of async_wait_latency_then_handle_incoming() (async part)
3792 * call, by the time perform_accumulated_on_recv_tasks() is called, this stores exactly those sockets for which
3793 * possible ACK sending tasks have been accumulated during the low_lvl_recv_and_handle()/etc.
3794 * call. The idea is that, for efficiency and reduced overhead,
3795 * all simultaneously available incoming data are examined first, and some tasks are accumulated
3796 * to perform at the end. For example, all DATA packets to be acknowledged at the same time are
3797 * collected and then sent in as few ACKs as possible.
3798 *
3799 * Details on the acks to potentially send are stored within that Peer_socket itself (e.g.,
3800 * Peer_socket::m_rcv_pending_acks).
3801 *
3802 * This should be added to throughout the method, used in perform_accumulated_on_recv_tasks(), and
3803 * then cleared for the next run.
3804 *
3805 * Only thread W can access this.
3806 */
3807 boost::unordered_set<Peer_socket::Ptr> m_socks_with_accumulated_pending_acks;
3808
3809 /**
3810 * Within a given low_lvl_recv_and_handle() or async part of async_wait_latency_then_handle_incoming() call,
3811 * by the time perform_accumulated_on_recv_tasks() is called, this stores exactly those sockets for which
3812 * possible incoming-ACK handling tasks have been accumulated during the low_lvl_recv_and_handle()/etc.
3813 * call. The idea is that, for congestion control robustness,
3814 * all simultaneously available acknowledgments and rcv_wnd updates are collected first, and then
3815 * they're all handled together at the end.
3816 *
3817 * Details on the acks to potentially send are stored within that Peer_socket
3818 * itself (Peer_socket::m_rcv_acked_packets scan all).
3819 *
3820 * This should be added to throughout the method, used in perform_accumulated_on_recv_tasks(), and
3821 * then cleared for the next run.
3822 *
3823 * Only thread W can access this.
3824 */
3825 boost::unordered_set<Peer_socket::Ptr> m_socks_with_accumulated_acks;
3826
3827 /**
3828 * For debugging, when we detect loss of data we'd sent, we log the corresponding socket's state;
3829 * this is the last time this was done for any socket (or epoch if never). It's used to
3830 * throttle such messages, since they are CPU-intensive and disk-intensive (when logging to disk).
3831 */
3833
3834 /**
3835 * Promise that thread W sets to truthy `Error_code` if it fails to initialize or falsy once event loop is running.
3836 * The truthy payload can be returned or thrown inside an error::Runtime_exception if desired.
3837 */
3838 boost::promise<Error_code> m_event_loop_ready;
3839
3840 /// The future object through which the non-W thread waits for #m_event_loop_ready to be set to success/failure.
3841 boost::unique_future<Error_code> m_event_loop_ready_result;
3842
3843 /**
3844 * Signal set which we may or may not be using to trap SIGINT and SIGTERM in order to auto-fire
3845 * interrupt_all_waits(). `add()` is called on it at initialization if and only if that feature is enabled
3846 * by the user via `Node_options`. Otherwise this object just does nothing for the Node's lifetime.
3847 */
3849
3850 /// Worker thread (= thread W). Other members should be initialized before this to avoid race condition.
3852}; // class Node
3853
3854/**
3855 * @private
3856 *
3857 * The data nugget uniquely identifying a peer-to-peer connection from a remote endpoint to
3858 * a port in this Node. Its (unmodifiable after initialization) fields are to be constructed via direct
3859 * initialization (assuming the defaults are unacceptable).
3860 */
3862{
3863 // Data.
3864
3865 /// The other side of the connection.
3867 /// This side of the connection (within this Node).
3869
3870 // Methods.
3871
3872 /**
3873 * Hash value of this Socket_id for `unordered<>`.
3874 * @return Ditto.
3875 */
3876 size_t hash() const;
3877};
3878
3879// Free functions: in *_fwd.hpp.
3880
3881// However the following refer to inner type(s) and hence must be declared here and not _fwd.hpp.
3882
3883/**
3884 * @internal
3885 *
3886 * Free function that returns socket_id.hash(); has to be a free function named `hash_value()` for
3887 * boost.hash to pick it up.
3888 *
3889 * @relatesalso Node::Socket_id
3890 *
3891 * @param socket_id
3892 * Socket ID to hash.
3893 * @return socket_id.hash().
3894 */
3895size_t hash_value(const Node::Socket_id& socket_id);
3896
3897/**
3898 * @internal
3899 *
3900 * Whether `lhs` is equal to `rhs`.
3901 *
3902 * @relatesalso Node::Socket_id
3903 * @param lhs
3904 * Object to compare.
3905 * @param rhs
3906 * Object to compare.
3907 * @return See above.
3908 */
3909bool operator==(const Node::Socket_id& lhs, const Node::Socket_id& rhs);
3910
3911// Template implementations.
3912
3913template<typename Rep, typename Period>
3915 const boost::chrono::duration<Rep, Period>& max_wait,
3916 const boost::asio::const_buffer& serialized_metadata,
3917 Error_code* err_code,
3918 const Peer_socket_options* opts)
3919{
3920 assert(max_wait.count() > 0);
3921 return sync_connect_impl(to, util::chrono_duration_to_fine_duration(max_wait), serialized_metadata, err_code, opts);
3922}
3923
3924template<typename Rep, typename Period>
3926 const boost::chrono::duration<Rep, Period>& max_wait,
3927 Error_code* err_code, const Peer_socket_options* opts)
3928{
3929 return sync_connect_with_metadata(to, max_wait,
3930 boost::asio::buffer(&S_DEFAULT_CONN_METADATA, sizeof(S_DEFAULT_CONN_METADATA)),
3931 err_code, opts);
3932}
3933
3934template<typename Socket, typename Non_blocking_func_ret_type>
3935Non_blocking_func_ret_type Node::sync_op(typename Socket::Ptr sock,
3936 const Function<Non_blocking_func_ret_type ()>& non_blocking_func,
3937 Non_blocking_func_ret_type would_block_ret_val,
3938 Event_set::Event_type ev_type,
3939 const Fine_time_pt& wait_until,
3940 Error_code* err_code)
3941{
3942 using boost::adopt_lock;
3943 using boost::chrono::milliseconds;
3944 using boost::chrono::round;
3945
3946 // We are in user thread U != W.
3947
3948 {
3949 /* WARNING!!! sock->m_mutex is locked, but WE must unlock it before returning! Can't leave that
3950 * to the caller, because we must unlock at a specific point below, right before sync_wait()ing.
3951 * Use a Lock_guard that adopts an already-locked mutex. */
3952 typename Socket::Lock_guard lock(sock->m_mutex, adopt_lock);
3953
3954 if (!running())
3955 {
3957 return would_block_ret_val;
3958 }
3959 // else
3960
3961 /* Unlock. Why? Simply because we can't forbid other threads from accessing sock while we
3962 * shamelessly block (in sync_wait()). */
3963 } // Lock.
3964
3965 /* This is actually pretty simple. We create an Event_set with just the one event we care
3966 * about (e.g., sock is Writable) and sync_wait() for it. Then we invoke non_blocking_func()
3967 * (e.g., Node::send()) once that event holds. So, create Event_set. */
3968
3969 /* Note that we assume "this" remains valid throughout this method until we start sleeping
3970 * (sync_wait()). This is explicitly guaranteed by the "Thread safety" note in class Node doc
3971 * header (which says that Node deletion is allowed only once a blocking operation's sleep has
3972 * been entered). */
3973
3974 const Event_set::Ptr event_set = event_set_create(err_code);
3975 if (!event_set)
3976 {
3977 return would_block_ret_val; // *err_code is set. This is pretty weird but nothing we can do.
3978 }
3979 // else event_set ready.
3980
3981 // We must clean up event_set at any return point below.
3983 {
3984 // Eat any error when closing Event_set, as it's unlikely and not interesting to user.
3985 Error_code dummy_prevents_throw;
3986 event_set->close(&dummy_prevents_throw);
3987 });
3988
3989 // We care about just this event, ev_type.
3990 if (!(event_set->add_wanted_socket<Socket>(sock, ev_type, err_code)))
3991 {
3992 return would_block_ret_val; // *err_code is set. Node must have shut down or something.
3993 }
3994 // else go ahead and wait.
3995
3996 Non_blocking_func_ret_type op_result;
3997 const bool timeout_given = wait_until != Fine_time_pt();
3998 do
3999 {
4000 // We may have to call sync_wait() repeatedly; if timeout is given we must give less and less time each time.
4001 bool wait_result;
4002 if (timeout_given)
4003 {
4004 // Negative is OK but cleaner to clamp it to 0.
4005 const Fine_duration time_remaining = std::max(Fine_duration::zero(), wait_until - Fine_clock::now());
4006
4007 /* Do NOT log. We have waited already, so `this` Node may have been deleted, so get_logger() may be undefined!
4008 * @todo I don't like this. Want to log it. Maybe get rid of the allowing for `this` deletion during wait.
4009 * We don't allow it in async_*() case for instance. */
4010 /* FLOW_LOG_TRACE("Waiting again; timeout reduced "
4011 * "to [" << round<milliseconds>(time_remaining) << "] = [" << time_remaining << "]."); */
4012
4013 // Perform the wait until event detected, time_remaining expires, or wait is interrupted (a-la EINTR).
4014 wait_result = event_set->sync_wait(time_remaining, err_code);
4015 }
4016 else
4017 {
4018 // No timeout given. Perform the wait until event detected, or wait is interrupted (a-la EINTR).
4019 wait_result = event_set->sync_wait(err_code);
4020 }
4021
4022 if (!wait_result)
4023 {
4024 /* *err_code is set. Node must have shut down or something; or, maybe more likely, S_WAIT_INTERRUPTED
4025 * or S_WAIT_USER_TIMEOUT occurred. In all cases, it's correct to pass it on to our caller. */
4026 return would_block_ret_val;
4027 }
4028 // else sync_wait() has returned success.
4029
4030 // Warning: "this" may be have been deleted by any point below this line, unless specifically guaranteed.
4031
4032#ifndef NDEBUG
4033 const bool active = event_set->events_detected(err_code);
4034#endif
4035 assert(active); // Inactive but no error, so it must have been a timeout -- but that should've been error.
4036
4037 /* OK. sync_wait() reports event is ready (sock is active, e.g., Writable). Try to perform
4038 * non-blocking operation (e.g., Node::send()). We must lock again (to access m_node again,
4039 * plus it's a pre-condition of the non-blocking operation (e.g., Node::send()). In the
4040 * meantime sock may have gotten closed. Ensure that's not so (another pre-condition).
4041 *
4042 * Alternatively, in reactor-pattern mode, they want us to basically do a glorified sync_wait() for
4043 * one of the 3 events, depending on ev_type, and just return without performing any non_blocking_func();
4044 * in fact this mode is indicated by non_blocking_func.empty(). */
4045
4046 {
4047 typename Socket::Lock_guard lock(sock->m_mutex);
4048 if (sock->m_state == Socket::State::S_CLOSED) // As in the invoker of this method....
4049 {
4050 assert(sock->m_disconnect_cause);
4051 *err_code = sock->m_disconnect_cause;
4052 // Do NOT log. "this" Node may have been deleted, so get_logger() may be undefined!
4053
4054 return would_block_ret_val;
4055 }
4056 // else do it. Note that `this` is guaranteed to still exist if sock->m_state is not CLOSED.
4057
4058 if (non_blocking_func.empty())
4059 {
4060 FLOW_LOG_TRACE("Sync op of type [" << ev_type << "] with Event_set [" << event_set << "] in reactor pattern "
4061 "mode on object [" << sock << "] successful; returning without non-blocking op.");
4062 assert(!*err_code); // In reactor pattern mode: No error <=> yes, socket is in desired state now.
4063 return would_block_ret_val;
4064 }
4065
4066 op_result = non_blocking_func(); // send(), receive(), accept(), etc.
4067 } // Lock.
4068 // Cannot log below this line for aforementioned reasons. Also cannot log in subsequent iterations!
4069
4070 if (*err_code)
4071 {
4072 // *err_code is set. Any number of errors possible here; error on socket => socket is active.
4073 return would_block_ret_val;
4074 }
4075 // else no error.
4076
4077 /* If op_result > 0, then data was transferred (enqueued to Send buffer, dequeued from Receive
4078 * buffer or Accept queue, etc.); cool. If op_result == 0, sock is still not active. How
4079 * is that possible if sync_wait() returned non-zero events? Because some jerk in another
4080 * thread may also be non_blocking_func()ing at the same time. In that case we must try again
4081 * (must not return would_block_ret_val and no error). (And give less time, if timeout was
4082 * provided.) */
4083
4084 // Do NOT log as explained. @todo I don't like this. See similar @todo above.
4085 /* if (op_result == would_block_ret_val)
4086 * {
4087 * // Rare/interesting enough for INFO.
4088 * FLOW_LOG_INFO('[' << sock << "] got Active status "
4089 * "but the non-blocking operation still returned would-block. Another thread is interfering?");
4090 * } */
4091 }
4092 while (op_result == would_block_ret_val);
4093
4094 // Excellent. At least some data was transferred (e.g., enqueued on Send buffer). *err_code is success.
4095 return op_result;
4096} // Node::sync_op()
4097
4098template<typename Socket_ptr>
4099bool Node::ensure_sock_open(Socket_ptr sock, Error_code* err_code) // Static.
4100{
4101 // Pre-condition: sock is suitably locked. We are in thread U != W or W.
4102
4103 if (sock->m_state == Socket_ptr::element_type::State::S_CLOSED)
4104 {
4105 // CLOSED socket -- Node has disowned us.
4106 assert(!sock->m_node);
4107 assert(sock->m_disconnect_cause);
4108
4109 // Mark (already-determined) error in *err_code and log.
4110 FLOW_LOG_SET_CONTEXT(sock->get_logger(), sock->get_log_component()); // Static, so must do <--that to log this--v.
4111 FLOW_ERROR_EMIT_ERROR_LOG_INFO(sock->m_disconnect_cause);
4112 return false;
4113 }
4114 // else
4115 return true;
4116} // Node::ensure_sock_open()
4117
4118template<typename Opt_type>
4119bool Node::validate_static_option(const Opt_type& new_val, const Opt_type& old_val, const std::string& opt_id,
4120 Error_code* err_code) const
4121{
4122 using std::string;
4123
4124 if (new_val != old_val)
4125 {
4126 const string& opt_id_nice = Node_options::opt_id_to_str(opt_id);
4127 FLOW_LOG_WARNING("Option [" << opt_id_nice << "] is static, but attempted to change "
4128 "from [" << old_val << "] to [" << new_val << "]. Ignoring entire option set.");
4130 return false;
4131 }
4132 // else
4133
4134 return true;
4135}
4136
4137template<typename Opt_type>
4138Opt_type Node::opt(const Opt_type& opt_val_ref) const
4139{
4140 /* They've given the REFERENCE to the value they want to read. Another thread may write to that
4141 * value concurrently. Therefore, acquire ownership of the enclosing m_opts. Copy the value. Unlock.
4142 * Then return the copy. Most options are small (typically primitive types, typically integers;
4143 * or boost.chrono time values which are internally also usually just integers), so the copy
4144 * should not be a big deal. */
4145
4147 return opt_val_ref;
4148}
4149
4150template<typename Peer_socket_impl_type>
4152{
4153 return new Peer_socket_impl_type(get_logger(), &m_task_engine, opts);
4154}
4155
4156template<typename Server_socket_impl_type>
4158{
4159 return new Server_socket_impl_type(get_logger(), child_sock_opts);
4160}
4161
4162} // namespace flow::net_flow
Convenience class that simply stores a Logger and/or Component passed into a constructor; and returns...
Definition: log.hpp:1612
Logger * get_logger() const
Returns the stored Logger pointer, particularly as many FLOW_LOG_*() macros expect.
Definition: log.cpp:217
Interface that the user should implement, passing the implementing Logger into logging classes (Flow'...
Definition: log.hpp:1284
A user-set collection of sockets and desired conditions on those sockets (such as: "socket has data t...
Definition: event_set.hpp:254
Event_type
Type of event or condition of interest supported by class Event_set.
Definition: event_set.hpp:307
Objects of this class can be fed to Node to make it internally simulate network conditions like loss,...
An object of this class is a single Flow-protocol networking node, in the sense that: (1) it has a di...
Definition: node.hpp:934
void snd_flying_pkts_updated(Peer_socket::Ptr sock, Peer_socket::Sent_pkt_ordered_by_when_const_iter pkt_begin, const Peer_socket::Sent_pkt_ordered_by_when_const_iter &pkt_end, bool added)
Updates Peer_socket::m_snd_flying_bytes according to an operation (add packets, remove packets) calle...
void low_lvl_packet_sent(Peer_socket::Ptr sock, Low_lvl_packet::Const_ptr packet, size_t bytes_expected_transferred, const Error_code &sys_err_code, size_t bytes_transferred)
Completion handler for async_low_lvl_packet_send_impl(); called when the packet is either successfull...
Definition: low_lvl_io.cpp:491
void serv_peer_socket_init(Server_socket::Ptr serv, Peer_socket::Ptr sock)
Records a new (just received SYN) peer socket from the given server socket.
void sock_pacing_new_packet_ready(Peer_socket::Ptr sock, Low_lvl_packet::Ptr packet)
async_sock_low_lvl_packet_send_paced() pacing helper: Handles a DATA or ACK packet that was just pass...
Definition: low_lvl_io.cpp:642
bool categorize_individual_ack(const Socket_id &socket_id, Peer_socket::Ptr sock, Ack_packet::Individual_ack::Const_ptr ack, bool *dupe_or_late, Peer_socket::Sent_pkt_ordered_by_when_iter *acked_pkt_it)
Helper of perform_accumulated_on_recv_tasks() that categorizes the given accumulated individual ackno...
void handle_data_to_established(const Socket_id &socket_id, Peer_socket::Ptr sock, boost::shared_ptr< Data_packet > packet, bool syn_rcvd_qd_packet)
Handles a just-deserialized, just-demultiplexed, low-level DATA packet delivered to the given peer so...
bool sock_is_writable(const boost::any &sock_as_any) const
Returns true if and only if calling sock->send() with at least some arguments would return either non...
Peer_socket_info sock_info(Peer_socket::Const_ptr sock)
Implementation of sock->info() for socket sock in all cases except when sock->state() == Peer_socket:...
void async_no_sock_low_lvl_packet_send(const util::Udp_endpoint &low_lvl_remote_endpoint, Low_lvl_packet::Const_ptr packet)
async_low_lvl_packet_send_impl() wrapper to call when packet is to be sent to the remote side of the ...
Definition: low_lvl_io.cpp:307
void receive_wnd_updated(Peer_socket::Ptr sock)
Placed by receive() onto W if it has dequeued data from Receive buffer and given it to the user,...
virtual Server_socket * serv_create(const Peer_socket_options *child_sock_opts)
Internal factory used for ALL Server_socket objects created by this Node (including subclasses).
void serv_set_state(Server_socket::Ptr serv, Server_socket::State state)
Sets Server_socket::m_state.
void async_sock_low_lvl_packet_send(Peer_socket::Ptr sock, Low_lvl_packet::Const_ptr &&packet, bool delayed_by_pacing)
async_low_lvl_packet_send_impl() wrapper to call when packet is to be sent to the remote side of the ...
Definition: low_lvl_io.cpp:301
void sock_track_new_data_after_gap_rexmit_off(Peer_socket::Ptr sock, boost::shared_ptr< const Data_packet > packet, size_t data_size, bool *slide, size_t *slide_size)
Helper for handle_data_to_established() that aims to register the given DATA packet as an out-of-orde...
bool sock_data_to_reassembly_q_unless_overflow(Peer_socket::Ptr sock, boost::shared_ptr< Data_packet > packet)
Helper for handle_data_to_established() that aims to register the given DATA packet as an out-of-orde...
void perform_accumulated_on_recv_tasks()
Performs all tasks to be performed at the end of low_lvl_recv_and_handle() or async part of async_wai...
Definition: node.cpp:375
static bool ensure_sock_open(Socket_ptr sock, Error_code *err_code)
Helper method that checks whether the given Peer_socket or Server_socket is CLOSED; if so,...
Definition: node.hpp:4099
void send_worker(Peer_socket::Ptr sock, bool defer_delta_check)
Thread W implemention of send(): synchronously or asynchronously send the contents of sock->m_snd_buf...
void interrupt_all_waits(Error_code *err_code=0)
Interrupts any blocking operation, a/k/a wait, and informs the invoker of that operation that the blo...
Definition: event_set.cpp:1390
void handle_accumulated_acks(const Socket_id &socket_id, Peer_socket::Ptr sock)
Helper of perform_accumulated_on_recv_tasks() that handles any incoming acknowledgments and rcv_wnd u...
Node_options options() const
Copies this Node's option set and returns that copy.
Definition: node.cpp:1106
void handle_incoming(util::Blob *packet_data, const util::Udp_endpoint &low_lvl_remote_endpoint)
Handles a just-received, not-yet-deserialized low-level packet.
Definition: node.cpp:426
~Node() override
Destroys Node.
Definition: node.cpp:139
void async_rcv_wnd_recovery(Peer_socket::Ptr sock, size_t rcv_wnd)
receive_wnd_updated() helper that continues rcv_wnd recovery: that is, sends unsolicited ACK with a r...
void log_accumulated_acks(Peer_socket::Const_ptr sock) const
Helper of handle_accumulated_acks() that logs the about-to-be-handled accumulated individual acknowle...
boost::asio::signal_set Signal_set
Short-hand for a signal set.
Definition: node.hpp:1430
void sock_free_memory(Peer_socket::Ptr sock)
Helper that clears all non-O(1)-space data structures stored inside sock.
static const size_t & S_NUM_PORTS
Total number of Flow ports in the port space, including S_PORT_ANY.
Definition: node.hpp:939
bool event_set_check_baseline(Event_set::Ptr event_set)
Checks each desired (Event_set::m_want) event in event_set; any that holds true is saved into event_s...
Definition: event_set.cpp:1015
void sock_load_info_struct(Peer_socket::Const_ptr sock, Peer_socket_info *stats) const
Given a Peer_socket, copies all stats info (as available via Peer_socket::info()) from various struct...
void log_snd_window(Peer_socket::Const_ptr sock, bool force_verbose_info_logging=false) const
Logs TRACE or DATA messages thats show the detailed state of the sending sequence number space.
void send_worker_check_state(Peer_socket::Ptr sock)
Helper placed by send() onto W to invoke send_worker() but ensures that the socket has not entered so...
size_t m_low_lvl_max_buf_size
OS-reported m_low_lvl_sock UDP receive buffer maximum size, obtained right after we OS-set that setti...
Definition: node.hpp:3729
bool set_options(const Node_options &opts, Error_code *err_code=0)
Dynamically replaces the current options set (options()) with the given options set.
Definition: node.cpp:1054
void sync_sock_low_lvl_rst_send(Peer_socket::Ptr sock)
Sends an RST to the other side of the given socket, synchronously.
Definition: low_lvl_io.cpp:997
void mark_data_packet_sent(Peer_socket::Ptr sock, const Sequence_number &seq_num)
Performs important book-keeping based on the event "DATA packet was sent to destination....
Definition: low_lvl_io.cpp:416
void handle_syn_ack_ack_to_syn_rcvd(const Socket_id &socket_id, Peer_socket::Ptr sock, boost::shared_ptr< const Syn_ack_ack_packet > syn_ack_ack)
Handles a just-deserialized, just-demultiplexed low-level SYN_ACK_ACK packet delivered to the given p...
Non_blocking_func_ret_type sync_op(typename Socket::Ptr sock, const Function< Non_blocking_func_ret_type()> &non_blocking_func, Non_blocking_func_ret_type would_block_ret_val, Event_set::Event_type ev_type, const Fine_time_pt &wait_until, Error_code *err_code)
Implementation of core blocking transfer methods, namely Peer_socket::sync_send(),...
Definition: node.hpp:3935
void serv_close_detected(Server_socket::Ptr serv, const Error_code &disconnect_cause, bool close)
Records that thread W shows this socket is not to listen to incoming connections and is to abort any ...
static const Fine_duration S_REGULAR_INFREQUENT_TASKS_PERIOD
Time interval between performing "infrequent periodic tasks," such as stat logging.
Definition: node.hpp:3651
size_t sock_max_packets_after_unrecvd_packet(Peer_socket::Const_ptr sock) const
Computes and returns the max size for Peer_socket::m_rcv_packets_with_gaps for sock.
Peer_socket::Sent_pkt_ordered_by_when_iter categorize_pkts_as_dropped_on_acks(Peer_socket::Ptr sock, const boost::unordered_set< Peer_socket::order_num_t > &flying_now_acked_pkts)
Helper of perform_accumulated_on_recv_tasks() that determines the range of In-flight packets that sho...
void rcv_get_first_gap_info(Peer_socket::Const_ptr sock, bool *first_gap_exists, Sequence_number *seq_num_after_first_gap)
Helper for handle_data_to_established() that gets simple info about Peer_socket::m_rcv_packets_with_g...
bool snd_deqable(Peer_socket::Const_ptr sock) const
Return true if and only if there are enough data either in Peer_socket::m_snd_rexmit_q of sock (if re...
Fine_time_pt m_last_loss_sock_log_when
For debugging, when we detect loss of data we'd sent, we log the corresponding socket's state; this i...
Definition: node.hpp:3832
Server_socket::Ptr listen(flow_port_t local_port, Error_code *err_code=0, const Peer_socket_options *child_sock_opts=0)
Sets up a server on the given local Flow port and returns Server_socket which can be used to accept s...
void cancel_timers(Peer_socket::Ptr sock)
Cancel any timers and scheduled tasks active in the given socket.
void sock_pacing_time_slice_end(Peer_socket::Ptr sock, const Error_code &sys_err_code)
async_sock_low_lvl_packet_send_paced() pacing helper: If sock_pacing_process_q() ran out of the last ...
Definition: low_lvl_io.cpp:950
void sock_rcv_buf_now_readable(Peer_socket::Ptr sock, bool syn_rcvd_qd_packet)
Helper for handle_data_to_established() that assumes the given's socket Receive buffer is currently r...
boost::shared_ptr< Net_env_simulator > m_net_env_sim
The object used to simulate stuff like packet loss and latency via local means directly in the code.
Definition: node.hpp:3671
void async_wait_latency_then_handle_incoming(const Fine_duration &latency, util::Blob *packet_data, const util::Udp_endpoint &low_lvl_remote_endpoint)
Sets up handle_incoming(packet_data, low_lvl_remote_endpoint) to be called asynchronously after a spe...
Definition: low_lvl_io.cpp:249
void snd_flying_pkts_erase_one(Peer_socket::Ptr sock, Peer_socket::Sent_pkt_ordered_by_when_iter pkt_it)
Erases (for example if considered Acknowledged or Dropped) a packet struct from the "scoreboard" (Pee...
Opt_type opt(const Opt_type &opt_val_ref) const
Obtain a copy of the value of a given option in a thread-safe manner.
Definition: node.hpp:4138
bool sock_validate_options(const Peer_socket_options &opts, const Peer_socket_options *prev_opts, Error_code *err_code) const
Analogous to validate_options() but checks per-socket options instead of per-Node options.
Options_mutex m_opts_mutex
The mutex protecting m_opts.
Definition: node.hpp:3665
boost::shared_ptr< util::Timer > Timer_ptr
boost.asio timer wrapped in a ref-counted pointer.
Definition: node.hpp:1427
void event_set_close(Event_set::Ptr event_set, Error_code *err_code)
Implementation of Event_set::close() when Event_set::state() != Event_set::State::S_CLOSED for event_...
Definition: event_set.cpp:1271
void handle_accumulated_pending_acks(const Socket_id &socket_id, Peer_socket::Ptr sock)
Helper of perform_accumulated_on_recv_tasks() that handles any additional individual outgoing acknowl...
void receive_wnd_recovery_data_received(Peer_socket::Ptr sock)
Pertaining to the async_rcv_wnd_recovery() mechanism, this handles the event that we have received an...
static Peer_socket::order_num_t sock_get_new_snd_order_num(Peer_socket::Ptr sock)
Returns the "order number" to use for Peer_socket::Sent_packet::Sent_when structure corresponding to ...
Peer_socket::Options_mutex Options_mutex
Short-hand for high-performance, non-reentrant, exclusive mutex used to lock m_opts.
Definition: node.hpp:1433
Peer_socket::Ptr sync_connect_impl(const Remote_endpoint &to, const Fine_duration &max_wait, const boost::asio::const_buffer &serialized_metadata, Error_code *err_code, const Peer_socket_options *opts)
Implementation core of sync_connect*() that gets rid of templated or missing arguments thereof.
void event_set_check_baseline_assuming_state(Event_set::Ptr event_set)
Helper placed by event_set_async_wait() onto thread W to invoke event_set_check_baseline() but first ...
Definition: event_set.cpp:987
size_t max_block_size() const
The maximum number of bytes of user data per received or sent block on connections generated from thi...
Definition: node.cpp:1111
const util::Udp_endpoint & local_low_lvl_endpoint() const
Return the UDP endpoint (IP address and UDP port) which will be used for receiving incoming and sendi...
Definition: node.cpp:369
boost::asio::ip::udp::socket Udp_socket
Short-hand for UDP socket.
Definition: node.hpp:1424
bool serv_is_acceptable(const boost::any &serv_as_any) const
Returns true if and only if calling serv->accept() with at least some arguments would return either n...
void snd_flying_pkts_push_one(Peer_socket::Ptr sock, const Sequence_number &seq_num, Peer_socket::Sent_packet::Ptr sent_pkt)
Adds a new packet struct (presumably representing packet to be sent shortly) to the "scoreboard" (Pee...
void worker_run(const util::Udp_endpoint low_lvl_endpoint)
Worker thread W (main event loop) body.
Definition: node.cpp:151
Syn_packet::Ptr create_syn(Peer_socket::Const_ptr sock)
Helper that creates a new SYN packet object to the extent that is suitable for immediately passing to...
boost::unique_future< Error_code > m_event_loop_ready_result
The future object through which the non-W thread waits for m_event_loop_ready to be set to success/fa...
Definition: node.hpp:3841
void close_abruptly(Peer_socket::Ptr sock, Error_code *err_code)
Implementation of non-blocking sock->close_abruptly() for socket sock in all cases except when sock->...
void async_low_lvl_ack_send(Peer_socket::Ptr sock, const Error_code &sys_err_code=Error_code())
Sends a low-level ACK packet, with all accumulated in Peer_socket::m_rcv_pending_acks of sock individ...
static void get_seq_num_range(const Packet_map_iter &packet_it, Sequence_number *seq_num_start, Sequence_number *seq_num_end)
Given an iterator into a Peer_socket::Sent_pkt_by_sent_when_map or Peer_socket::Recv_pkt_map,...
Peer_socket::Ptr sync_connect_with_metadata(const Remote_endpoint &to, const boost::chrono::duration< Rep, Period > &max_wait, const boost::asio::const_buffer &serialized_metadata, Error_code *err_code=0, const Peer_socket_options *opts=0)
A combination of sync_connect() and connect_with_metadata() (blocking connect, with supplied metadata...
Definition: node.hpp:3914
void event_set_close_worker(Event_set::Ptr event_set)
The guts of event_set_close_worker_check_state(): same thing, but assumes Event_set::state() == Event...
Definition: event_set.cpp:1330
Syn_ack_packet::Ptr create_syn_ack(Peer_socket::Const_ptr sock)
Like create_syn() but for SYN_ACK.
virtual Peer_socket * sock_create(const Peer_socket_options &opts)
Internal factory used for ALL Peer_socket objects created by this Node (including subclasses).
bool snd_buf_enqable(Peer_socket::Const_ptr sock) const
Return true if and only if there is enough free space in Peer_socket::m_snd_buf of sock to enqueue an...
bool can_send(Peer_socket::Const_ptr sock) const
Answers the perennial question of congestion and flow control: assuming there is a DATA packet to sen...
void sock_pacing_process_q(Peer_socket::Ptr sock, bool executing_after_delay)
async_sock_low_lvl_packet_send_paced() pacing helper: Given that we are currently in the pacing time ...
Definition: low_lvl_io.cpp:841
void async_no_sock_low_lvl_rst_send(Low_lvl_packet::Const_ptr causing_packet, const util::Udp_endpoint &low_lvl_remote_endpoint)
Sends an RST to the given UDP endpoint in response to the given incoming low-level packet that came f...
Definition: low_lvl_io.cpp:580
void sock_slide_rcv_next_seq_num(Peer_socket::Ptr sock, size_t slide_size, bool reassembly_in_progress)
Helper for handle_data_to_established() that aims to register a set of received DATA packet data as i...
void sock_log_detail(Peer_socket::Const_ptr sock) const
Logs a verbose state report for the given socket.
boost::unordered_map< flow_port_t, Server_socket::Ptr > Port_to_server_map
A map from the local Flow port to the local Server_socket listening on that port.
Definition: node.hpp:1450
static const size_t & S_NUM_EPHEMERAL_PORTS
Total number of Flow "ephemeral" ports (ones reserved locally at random with Node::listen(S_PORT_ANY)...
Definition: node.hpp:948
boost::unordered_set< Peer_socket::Ptr > m_socks_with_accumulated_pending_acks
Within a given low_lvl_recv_and_handle() or async part of async_wait_latency_then_handle_incoming() (...
Definition: node.hpp:3807
boost::unordered_set< Peer_socket::Ptr > m_socks_with_accumulated_acks
Within a given low_lvl_recv_and_handle() or async part of async_wait_latency_then_handle_incoming() c...
Definition: node.hpp:3825
friend size_t hash_value(const Socket_id &socket_id)
Definition: node.cpp:1161
static void advance_seq_num(Sequence_number *seq_num, boost::shared_ptr< const Data_packet > data)
Assuming *seq_num points to the start of data.m_data, increments *seq_num to point to the datum just ...
static Sequence_number snd_past_last_flying_datum_seq_num(Peer_socket::Const_ptr sock)
Obtain the sequence number for the datum just past the last (latest) In-flight (i....
util::Thread m_worker
Worker thread (= thread W). Other members should be initialized before this to avoid race condition.
Definition: node.hpp:3851
Sequence_number::Generator m_seq_num_generator
Sequence number generator (at least to generate ISNs). Only thread W can access this.
Definition: node.hpp:3738
Peer_socket::Ptr connect(const Remote_endpoint &to, Error_code *err_code=0, const Peer_socket_options *opts=0)
Initiates an active connect to the specified remote Flow server.
void event_set_all_check_delta(bool defer_delta_check)
For each WAITING Event_set within the Node: checks for any events that hold, and if any do hold,...
Definition: event_set.cpp:1127
void serv_peer_socket_closed(Server_socket::Ptr serv, Peer_socket::Ptr sock)
Records that a Server_socket-contained (i.e., currently un-established, or established but not yet ac...
bool validate_static_option(const Opt_type &new_val, const Opt_type &old_val, const std::string &opt_id, Error_code *err_code) const
Helper that compares new_val to old_val and, if they are not equal, logs and returns an error; used t...
Definition: node.hpp:4119
bool rcv_buf_deqable(Peer_socket::Const_ptr sock) const
Return true if and only if there are enough data in Peer_socket::m_rcv_buf of sock to give the user s...
void async_acknowledge_packet(Peer_socket::Ptr sock, const Sequence_number &seq_num, unsigned int rexmit_id, size_t data_size)
Causes an acknowledgment of the given received packet to be included in a future Ack_packet sent to t...
Socket_id_to_socket_map m_socks
The peer-to-peer connections this Node is currently tracking.
Definition: node.hpp:3750
unsigned int handle_incoming_with_simulation(util::Blob *packet_data, const util::Udp_endpoint &low_lvl_remote_endpoint, bool is_sim_duplicate_packet=false)
Helper for low_lvl_recv_and_handle() that calls handle_incoming() on the not-yet-deserialized low-lev...
Definition: low_lvl_io.cpp:181
bool validate_option_check(bool check, const std::string &check_str, Error_code *err_code) const
Helper that, if the given condition is false, logs and returns an error; used to check for option val...
Definition: node.cpp:1092
static const flow_port_t & S_FIRST_SERVICE_PORT
The port number of the lowest service port, making the range of service ports [S_FIRST_SERVICE_PORT,...
Definition: node.hpp:954
Peer_socket::Options_lock Options_lock
Short-hand for lock that acquires exclusive access to an Options_mutex.
Definition: node.hpp:1436
void low_lvl_recv_and_handle(Error_code sys_err_code)
Handles the pre-condition that m_low_lvl_sock has a UDP packet available for reading,...
Definition: low_lvl_io.cpp:40
friend bool operator==(const Socket_id &lhs, const Socket_id &rhs)
Definition: node.cpp:1156
static Socket_id socket_id(Peer_socket::Const_ptr sock)
Constructs the socket pair (connection ID) for the given socket.
Udp_socket m_low_lvl_sock
The UDP socket used to receive low-level packets (to assemble into application layer data) and send t...
Definition: node.hpp:3711
void async_low_lvl_packet_send_impl(const util::Udp_endpoint &low_lvl_remote_endpoint, Low_lvl_packet::Const_ptr packet, bool delayed_by_pacing, Peer_socket::Ptr sock)
Takes given low-level packet structure, serializes it, and initiates asynchronous send of these data ...
Definition: low_lvl_io.cpp:315
void handle_syn_ack_to_syn_sent(const Socket_id &socket_id, Peer_socket::Ptr sock, boost::shared_ptr< const Syn_ack_packet > syn_ack)
Handles a just-deserialized, just-demultiplexed low-level SYN_ACK packet delivered to the given peer ...
bool event_set_async_wait(Event_set::Ptr event_set, const Event_set::Event_handler &on_event, Error_code *err_code)
Implementation of Event_set::async_wait() when Event_set::state() == Event_set::State::S_INACTIVE.
Definition: event_set.cpp:915
size_t send(Peer_socket::Ptr sock, const Function< size_t(size_t max_data_size)> &snd_buf_feed_func, Error_code *err_code)
Implementation of non-blocking sock->send() for socket sock in all cases except when sock->state() ==...
static const size_t & S_NUM_SERVICE_PORTS
Total number of Flow "service" ports (ones that can be reserved by number with Node::listen()).
Definition: node.hpp:942
static const flow_port_t & S_FIRST_EPHEMERAL_PORT
The port number of the lowest ephemeral Flow port, making the range of ephemeral ports [S_FIRST_EPHEM...
Definition: node.hpp:960
void sock_set_int_state(Peer_socket::Ptr sock, Peer_socket::Int_state new_state)
Sets internal state of given socket to the given state and logs a TRACE message about it.
Peer_socket * sock_create_forward_plus_ctor_args(const Peer_socket_options &opts)
Returns a raw pointer to newly created Peer_socket or sub-instance like asio::Peer_socket,...
Definition: node.hpp:4151
boost::promise< Error_code > m_event_loop_ready
Promise that thread W sets to truthy Error_code if it fails to initialize or falsy once event loop is...
Definition: node.hpp:3838
bool sock_is_readable(const boost::any &sock_as_any) const
Returns true if and only if calling sock->receive() with at least some arguments would return either ...
log::Logger * this_thread_init_logger_setup(const std::string &thread_type, log::Logger *logger=0)
Helper to invoke for each thread in which this Node executes, whether or not it starts that thread,...
Definition: node.cpp:113
Peer_socket::Ptr handle_syn_to_listening_server(Server_socket::Ptr serv, boost::shared_ptr< const Syn_packet > syn, const util::Udp_endpoint &low_lvl_remote_endpoint)
Handles a just-deserialized, just-demultiplexed low-level SYN packet delivered to the given server so...
bool sock_data_to_rcv_buf_unless_overflow(Peer_socket::Ptr sock, boost::shared_ptr< Data_packet > packet)
Helper for handle_data_to_established() that aims to pass the payload of the given DATA packet to the...
bool sock_set_options(Peer_socket::Ptr sock, const Peer_socket_options &opts, Error_code *err_code)
Thread W implementation of sock->set_options().
bool running() const
Returns true if and only if the Node is operating.
Definition: node.cpp:420
Port_to_server_map m_servs
The server sockets this Node is currently tracking.
Definition: node.hpp:3756
Event_set::Ev_type_to_socks_map m_sock_events
All sockets that have been detected to be "ready" (by the Event_set doc header definition) at any poi...
Definition: node.hpp:3788
static const uint8_t S_DEFAULT_CONN_METADATA
Type and value to supply as user-supplied metadata in SYN, if user chooses to use [[a]sync_]connect()...
Definition: node.hpp:1400
const Node_options & validate_options(const Node_options &opts, bool init, Error_code *err_code) const
Given a new set of Node_options intended to replace (or initialize) a Node's m_opts,...
Definition: node.cpp:980
Server_socket * serv_create_forward_plus_ctor_args(const Peer_socket_options *child_sock_opts)
Like sock_create_forward_plus_ctor_args() but for Server_sockets.
Definition: node.hpp:4157
void setup_drop_timer(const Socket_id &socket_id, Peer_socket::Ptr sock)
Creates a new Drop Timer and saves it to sock->m_snd_drop_timer.
void handle_ack_to_established(Peer_socket::Ptr sock, boost::shared_ptr< const Ack_packet > ack)
Handles a just-deserialized, just-demultiplexed, low-level ACK packet delivered to the given peer soc...
Peer_socket::Ptr sync_connect(const Remote_endpoint &to, const boost::chrono::duration< Rep, Period > &max_wait, Error_code *err_code=0, const Peer_socket_options *opts=0)
The blocking (synchronous) version of connect().
Definition: node.hpp:3925
void listen_worker(flow_port_t local_port, const Peer_socket_options *child_sock_opts, Server_socket::Ptr *serv)
Thread W implementation of listen().
void handle_syn_ack_to_established(Peer_socket::Ptr sock, boost::shared_ptr< const Syn_ack_packet > syn_ack)
Handles a just-deserialized, just-demultiplexed, duplicate (equal to already-received SYN_ACK) low-le...
Signal_set m_signal_set
Signal set which we may or may not be using to trap SIGINT and SIGTERM in order to auto-fire interrup...
Definition: node.hpp:3848
void serv_peer_socket_acceptable(Server_socket::Ptr serv, Peer_socket::Ptr sock)
Records that an unestablished socket sock (Peer_socket::Int_state::S_SYN_RCVD) has just become establ...
void handle_data_to_syn_rcvd(Peer_socket::Ptr sock, boost::shared_ptr< Data_packet > packet)
Handles a just-deserialized, just-demultiplexed, low-level DATA packet delivered to the given peer so...
boost::unordered_map< Socket_id, Peer_socket::Ptr > Socket_id_to_socket_map
A map from the connection ID (= remote-local socket pair) to the local Peer_socket that is the local ...
Definition: node.hpp:1447
void setup_connection_timers(const Socket_id &socket_id, Peer_socket::Ptr sock, bool initial)
Assuming we've just sent SYN or SYN_ACK, sets up an asynchronous scheduled task to fire within some a...
void log_rcv_window(Peer_socket::Const_ptr sock, bool force_verbose_info_logging=false) const
Logs TRACE or DATA messages that show the detailed state of the receiving sequence number space.
size_t sock_rcv_wnd(Peer_socket::Const_ptr sock) const
Computes and returns the currently correct rcv_wnd value; that is the amount of space free in Receive...
Peer_socket::Ptr accept(Server_socket::Ptr serv, Error_code *err_code)
Implementation of non-blocking serv->accept() for server socket serv in all cases except when serv->s...
void connect_worker(const Remote_endpoint &to, const boost::asio::const_buffer &serialized_metadata, const Peer_socket_options *opts, Peer_socket::Ptr *sock)
Thread W implementation of connect().
bool drop_pkts_on_acks(Peer_socket::Ptr sock, const Peer_socket::Sent_pkt_ordered_by_when_iter &last_dropped_pkt_it, size_t *cong_ctl_dropped_pkts, size_t *cong_ctl_dropped_bytes, size_t *dropped_pkts, size_t *dropped_bytes, std::vector< Peer_socket::order_num_t > *pkts_marked_to_drop)
Helper of perform_accumulated_on_recv_tasks() that acts on the determination made by categorize_pkts_...
static const Peer_socket::Sent_packet::ack_count_t S_MAX_LATER_ACKS_BEFORE_CONSIDERING_DROPPED
For a given unacknowledged sent packet P, the maximum number of times any individual packet with high...
Definition: node.hpp:3644
Node(log::Logger *logger, const util::Udp_endpoint &low_lvl_endpoint, Net_env_simulator *net_env_sim=0, Error_code *err_code=0, const Node_options &opts=Node_options())
Constructs Node.
Definition: node.cpp:40
util::Blob m_packet_data
Stores incoming raw packet data; re-used repeatedly for possible performance gains.
Definition: node.hpp:3732
Error_code sock_categorize_data_to_established(Peer_socket::Ptr sock, boost::shared_ptr< const Data_packet > packet, bool *dupe, bool *slide, size_t *slide_size)
Helper for handle_data_to_established() that categorizes the DATA packet received as either illegal; ...
void async_sock_low_lvl_rst_send(Peer_socket::Ptr sock)
Sends an RST to the other side of the given socket asynchronously when possible.
Definition: low_lvl_io.cpp:988
Event_set::Ptr event_set_create(Error_code *err_code=0)
Creates a new Event_set in Event_set::State::S_INACTIVE state with no sockets/events stored; returns ...
Definition: event_set.cpp:879
void interrupt_all_waits_worker()
Thread W implementation of interrupt_all_waits().
Definition: event_set.cpp:1419
void sock_set_state(Peer_socket::Ptr sock, Peer_socket::State state, Peer_socket::Open_sub_state open_sub_state=Peer_socket::Open_sub_state::S_CONNECTED)
Sets Peer_socket::m_state and Peer_socket::m_open_sub_state.
void receive_emptied_rcv_buf_while_disconnecting(Peer_socket::Ptr sock)
Placed by receive() onto W during a graceful close, after the Receive buffer had been emptied by the ...
void sock_disconnect_detected(Peer_socket::Ptr sock, const Error_code &disconnect_cause, bool close)
Records that thread W shows underlying connection is broken (graceful termination,...
size_t receive(Peer_socket::Ptr sock, const Function< size_t()> &rcv_buf_consume_func, Error_code *err_code)
Implementation of non-blocking sock->receive() for socket sock in all cases except when sock->state()...
void handle_connection_rexmit_timer_event(const Socket_id &socket_id, Peer_socket::Ptr sock)
Handles the triggering of the retransmit timer wait set up by setup_connection_timers(); it will re-s...
Node_options m_opts
This Node's global set of options.
Definition: node.hpp:3662
void async_low_lvl_recv()
Registers so that during the current or next m_task_engine.run(), the latter will wait for a receivab...
Definition: low_lvl_io.cpp:31
void close_connection_immediately(const Socket_id &socket_id, Peer_socket::Ptr sock, const Error_code &err_code, bool defer_delta_check)
A thread W method that handles the transition of the given socket from OPEN (any sub-state) to CLOSED...
void sock_disconnect_completed(Peer_socket::Ptr sock)
While in S_OPEN+S_DISCONNECTING state (i.e., after beginning a graceful close with sock_disconnect_de...
Fine_duration compute_rtt_on_ack(Peer_socket::Sent_packet::Const_ptr flying_pkt, const Fine_time_pt &time_now, Ack_packet::Individual_ack::Const_ptr ack, const Peer_socket::Sent_packet::Sent_when **sent_when) const
Helper of perform_accumulated_on_recv_tasks() that computes the RTT implied by a given individual ack...
void close_empty_server_immediately(const flow_port_t local_port, Server_socket::Ptr serv, const Error_code &err_code, bool defer_delta_check)
Handles the transition of the given server socket from S_LISTENING/S_CLOSING to S_CLOSED (including e...
void async_low_lvl_syn_ack_ack_send(const Peer_socket::Ptr &sock, boost::shared_ptr< const Syn_ack_packet > &syn_ack)
Helper to create, fully fill out, and asynchronously send via async_sock_low_lvl_packet_send_paced() ...
Event_sets m_event_sets
Every Event_set to have been returned by event_set_create() and not subsequently reached Event_set::S...
Definition: node.hpp:3762
util::Rnd_gen_uniform_range< Peer_socket::security_token_t > m_rnd_security_tokens
Random number generator for picking security tokens; seeded on time at Node construction and generate...
Definition: node.hpp:3744
void interrupt_all_waits_internal_sig_handler(const Error_code &sys_err_code, int sig_number)
signal_set handler, executed on SIGINT and SIGTERM, if user has enabled this feature: causes interrup...
Definition: event_set.cpp:1454
Peer_socket::Ptr connect_with_metadata(const Remote_endpoint &to, const boost::asio::const_buffer &serialized_metadata, Error_code *err_code=0, const Peer_socket_options *opts=0)
Same as connect() but sends, as part of the connection handshake, the user-supplied metadata,...
void new_round_trip_time_sample(Peer_socket::Ptr sock, Fine_duration round_trip_time)
Handles a just-computed new RTT (round trip time) measurement for an individual packet earlier sent: ...
void perform_regular_infrequent_tasks(bool reschedule)
Performs low-priority tasks that should be run on an infrequent, regular basis, such as stat logging ...
Definition: node.cpp:1116
util::Udp_endpoint m_low_lvl_endpoint
After we bind m_low_lvl_sock to a UDP endpoint, this is a copy of that endpoint.
Definition: node.hpp:3722
void async_sock_low_lvl_packet_send_paced(const Peer_socket::Ptr &sock, Low_lvl_packet::Ptr &&packet)
Begins the process of asynchronously sending the given low-level packet to the remote Node specified ...
Definition: low_lvl_io.cpp:599
bool ok_to_rexmit_or_close(Peer_socket::Ptr sock, const Peer_socket::Sent_pkt_ordered_by_when_iter &pkt_it, bool defer_delta_check)
Checks whether the given sent packet has been retransmitted the maximum number of allowed times; if s...
util::Task_engine m_task_engine
The main loop engine, functioning in the single-threaded-but-asynchronous callback-based "reactor" st...
Definition: node.hpp:3697
void sock_pacing_new_time_slice(Peer_socket::Ptr sock, const Fine_time_pt &now)
async_sock_low_lvl_packet_send_paced() pacing helper: Resets the socket's Send_pacing_data structure ...
Definition: low_lvl_io.cpp:757
Port_space m_ports
Flow port space for both client and server sockets. All threads may access this.
Definition: node.hpp:3735
void event_set_fire_if_got_events(Event_set::Ptr event_set)
Check whether given Event_set contains any active sockets (Event_set::m_can); if so,...
Definition: event_set.cpp:1082
void rst_and_close_connection_immediately(const Socket_id &socket_id, Peer_socket::Ptr sock, const Error_code &err_code, bool defer_delta_check)
Asynchronously send RST to the other side of the given socket and close_connection_immediately().
boost::unordered_set< Event_set::Ptr > Event_sets
A set of Event_set objects.
Definition: node.hpp:1453
void drop_timer_action(Peer_socket::Ptr sock, bool drop_all_packets)
Handles a Drop_timer (Peer_socket::m_snd_drop_timer) event in ESTABLISHED state by dropping the speci...
A peer (non-server) socket operating over the Flow network protocol, with optional stream-of-bytes an...
util::Mutex_non_recursive Options_mutex
Short-hand for high-performance, non-reentrant, exclusive mutex used to lock m_opts.
State
State of a Peer_socket.
Open_sub_state
The sub-state of a Peer_socket when state is State::S_OPEN.
@ S_CONNECTED
This Peer_socket was created through a passive connect (Node::accept() and the like) or an active con...
Sequence_number::seq_num_t order_num_t
Short-hand for order number type. 0 is reserved. Caution: Keep in sync with Drop_timer::packet_id_t.
Int_state
The state of the socket (and the connection from this end's point of view) for the internal state mac...
util::Lock_guard< Options_mutex > Options_lock
Short-hand for lock that acquires exclusive access to an Options_mutex.
Sent_pkt_by_sent_when_map::iterator Sent_pkt_ordered_by_when_iter
Short-hand for m_snd_flying_pkts_by_sent_when iterator type.
Sent_pkt_by_sent_when_map::const_iterator Sent_pkt_ordered_by_when_const_iter
Short-hand for m_snd_flying_pkts_by_sent_when const iterator type.
Internal net_flow class that maintains the available Flow-protocol port space, somewhat similarly to ...
Definition: port_space.hpp:94
An object of this type generates a series of initial sequence numbers (ISN) that are meant to be suff...
Definition: seq_num.hpp:373
An internal net_flow sequence number identifying a piece of data.
Definition: seq_num.hpp:126
A server socket able to listen on a single Flow port for incoming connections and return peer sockets...
State
State of a Server_socket.
An empty interface, consisting of nothing but a default virtual destructor, intended as a boiler-plat...
Definition: util.hpp:45
boost::shared_ptr< Peer_socket > Ptr
Short-hand for ref-counted pointer to mutable values of type Target_type::element_type (a-la T*).
Const_target_ptr Const_ptr
Short-hand for ref-counted pointer to immutable values of type Target_type::element_type (a-la T cons...
#define FLOW_ERROR_EMIT_ERROR(ARG_val)
Sets *err_code to ARG_val and logs a warning about the error using FLOW_LOG_WARNING().
Definition: error.hpp:202
#define FLOW_ERROR_EMIT_ERROR_LOG_INFO(ARG_val)
Identical to FLOW_ERROR_EMIT_ERROR(), but the message logged has flow::log::Sev::S_INFO severity inst...
Definition: error.hpp:218
#define FLOW_LOG_WARNING(ARG_stream_fragment)
Logs a WARNING message into flow::log::Logger *get_logger() with flow::log::Component get_log_compone...
Definition: log.hpp:152
#define FLOW_LOG_SET_CONTEXT(ARG_logger_ptr, ARG_component_payload)
For the rest of the block within which this macro is instantiated, causes all FLOW_LOG_....
Definition: log.hpp:405
#define FLOW_LOG_TRACE(ARG_stream_fragment)
Logs a TRACE message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
Definition: log.hpp:227
@ S_STATIC_OPTION_CHANGED
When setting options, tried to set an unchangeable (static) option.
@ S_NODE_NOT_RUNNING
Node not running.
Flow module containing the API and implementation of the Flow network protocol, a TCP-inspired stream...
Definition: node.cpp:25
uint16_t flow_port_t
Logical Flow port type (analogous to a UDP/TCP port in spirit but in no way relevant to UDP/TCP).
size_t hash_value(const Sequence_number &seq_num)
Free function that returns seq_num.hash(); has to be a free function named hash_value for boost....
Definition: seq_num.cpp:275
bool operator==(const Remote_endpoint &lhs, const Remote_endpoint &rhs)
Whether lhs is equal to rhs.
Definition: endpoint.cpp:60
const flow_port_t S_PORT_ANY
Special Flow port value used to indicate "invalid port" or "please pick a random available ephemeral ...
Definition: port_space.cpp:33
Auto_cleanup setup_auto_cleanup(const Cleanup_func &func)
Provides a way to execute arbitrary (cleanup) code at the exit of the current block.
Definition: util.hpp:307
boost::unique_lock< Mutex > Lock_guard
Short-hand for advanced-capability RAII lock guard for any mutex, ensuring exclusive ownership of tha...
Definition: util_fwd.hpp:265
Fine_duration chrono_duration_to_fine_duration(const boost::chrono::duration< Rep, Period > &dur)
Helper that takes a non-negative duration of arbitrary precision/period and converts it to Fine_durat...
Definition: util.hpp:31
boost::shared_ptr< void > Auto_cleanup
Helper type for setup_auto_cleanup().
Definition: util_fwd.hpp:205
boost::asio::io_context Task_engine
Short-hand for boost.asio event service, the central class of boost.asio.
Definition: util_fwd.hpp:135
boost::thread Thread
Short-hand for standard thread class.
Definition: util_fwd.hpp:78
Blob_with_log_context<> Blob
A concrete Blob_with_log_context that compile-time-disables Basic_blob::share() and the sharing API d...
Definition: blob_fwd.hpp:60
boost::asio::ip::udp::endpoint Udp_endpoint
Short-hand for the UDP endpoint (IP/port) type.
Definition: util_fwd.hpp:208
boost::system::error_code Error_code
Short-hand for a boost.system error code (which basically encapsulates an integer/enum error code and...
Definition: common.hpp:508
Fine_clock::duration Fine_duration
A high-res time duration as computed from two Fine_time_pts.
Definition: common.hpp:416
Fine_clock::time_point Fine_time_pt
A high-res time point as returned by Fine_clock::now() and suitable for precise time math in general.
Definition: common.hpp:413
unsigned char uint8_t
Byte. Best way to represent a byte of binary data. This is 8 bits on all modern systems.
Definition: common.hpp:391
boost::shared_ptr< const Individual_ack > Const_ptr
Short-hand for ref-counted pointer to immutable objects of this class.
The data nugget uniquely identifying a peer-to-peer connection from a remote endpoint to a port in th...
Definition: node.hpp:3862
const Remote_endpoint m_remote_endpoint
The other side of the connection.
Definition: node.hpp:3866
size_t hash() const
Hash value of this Socket_id for unordered<>.
Definition: node.cpp:1146
const flow_port_t m_local_port
This side of the connection (within this Node).
Definition: node.hpp:3868
A set of low-level options affecting a single Flow Node, including Peer_socket objects and other obje...
Definition: options.hpp:449
static std::string opt_id_to_str(const std::string &opt_id)
Helper that, for a given option m_blah, takes something like "m_blah_blah" and returns the similar mo...
Definition: options.cpp:160
Data store to keep timing related info when a packet is sent out.
uint16_t ack_count_t
Type used for m_acks_after_me.
A data store that keeps stats about the a Peer_socket connection.
Definition: info.hpp:456
A set of low-level options affecting a single Peer_socket.
Definition: options.hpp:36
Represents the remote endpoint of a Flow-protocol connection; identifies the UDP endpoint of the remo...
Definition: endpoint.hpp:93