Flow 1.0.1
Flow project: Full implementation reference.
node.cpp
Go to the documentation of this file.
1/* Flow
2 * Copyright 2023 Akamai Technologies, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the
5 * "License"); you may not use this file except in
6 * compliance with the License. You may obtain a copy
7 * of the License at
8 *
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in
12 * writing, software distributed under the License is
13 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
14 * CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing
16 * permissions and limitations under the License. */
17
18/// @file
22#include <algorithm>
23#include <limits>
24
25namespace flow::net_flow
26{
27// Static initializations.
28
29const Fine_duration Node::S_REGULAR_INFREQUENT_TASKS_PERIOD = boost::chrono::seconds(1); // Infrequent enough CPU-wise.
30
31// Note that they're references, not copies. Otherwise non-deterministic static initialization order would screw us.
37
38// Implementations. For Node code pertaining to individual Server_sockets and Peer_sockets see their .cpp files.
39
40Node::Node(log::Logger* logger_ptr, const util::Udp_endpoint& low_lvl_endpoint, Net_env_simulator* net_env_sim,
41 Error_code* err_code, const Node_options& opts) :
42 log::Log_context(this_thread_init_logger_setup("", logger_ptr),
43 Flow_log_component::S_NET_FLOW),
44 /* Take the given Node_options set and copy it into our stored global options. (Note the default
45 * is Node_options().) The default is safe, but if they actually are providing a custom set of
46 * options, then we must validate before accepting. This may result in a validation error.
47 * If !err_code, then it'll throw exception right here. If err_code, then it will set *err_code,
48 * so we check for it inside the constructor body and exit. */
49 m_opts(validate_options(opts, true, err_code)),
50 m_net_env_sim(net_env_sim), // Note this is a smart pointer taking over a raw pointer (we did advertise owning it).
51 m_low_lvl_sock(m_task_engine), // Blank (unbound) UDP socket.
52 m_packet_data(logger_ptr),
53 m_ports(logger_ptr),
54 m_seq_num_generator(logger_ptr),
55 m_sock_events(Event_set::empty_ev_type_to_socks_map()),
56 // Set up future object used to wait for m_event_loop_ready to become success or failure.
57 m_event_loop_ready_result(m_event_loop_ready.get_future()),
58 /* Set up a signal set object; this is a no-op until we .add() signals to it (which we may or may not do).
59 * Whether we do or not is more significant than merely whether whatever handler we'd later register
60 * via m_signal_set.async_wait() will be called; if we .add() zero signals, then IF some non-boost.asio
61 * signal handler is currently registered (such as the default OS handler; or the user's non-boost.asio handler)
62 * (or will be registered in the future) will continue to work undisturbed. If, however, we .add() one or more
63 * signals (or, equivalently, list 1 or more signal numbers right here in the constructor call),
64 * then we will REPLACE such non-boost.asio handlers with boost.asio's mechanism. (This behavior is
65 * explicitly documented in boost.asio docs.) Therefore we must be careful not to mindlessly .add() handler(s),
66 * and/or (equivalently) list 1 or more signal numbers in this constructor call here. The choice will be left to
67 * later code which will .add() or not .add() deliberately. */
68 m_signal_set(m_task_engine),
69 // Spawn new thread W; in it execute this->worker_run(low_lvl_endpoint). low_lvl_endpoint is copied.
70 m_worker(&Node::worker_run, this, low_lvl_endpoint)
71{
73
74 // We are in thread U.
75
76 FLOW_LOG_INFO("Starting flow::Node [" << static_cast<void*>(this) << "].");
77
78 // validate_options() may have already detected an error; then it would've thrown if (!err_code); else: Check it.
79 if (err_code && (*err_code))
80 {
81 FLOW_LOG_WARNING("Cannot start Node due to above error.");
82 return;
83 }
84
85 Error_code our_err_code; // Prepare this if they passed in no Error_code, so we can throw exception.
86 err_code || (err_code = &our_err_code);
87
88 FLOW_LOG_INFO("\n\n" << options()); // Log initial option values. Once per Node, so it's not too verbose.
89
90 FLOW_LOG_INFO("Just launched Flow worker thread [T" << m_worker.get_id() << "]. "
91 "Waiting for it to report startup success or failure.");
92
93 // Now wait around (probably not long) until thread W tells us either initialization success or failure.
95 *err_code = m_event_loop_ready_result.get();
96 if (*err_code)
97 {
98 FLOW_LOG_WARNING("Flow worker thread [T" << m_worker.get_id() << "] reported to me it failed to initialize: "
99 "[" << *err_code << "] [" << err_code->message() << "].");
100 }
101 else
102 {
103 FLOW_LOG_INFO("Flow worker thread [T" << m_worker.get_id() << "] reports to me it is ready for work.");
104 }
105 // m_task_engine.stopped() can now be reliably used to tell whether the Node (i.e., thread W) is running.
106
107 if (our_err_code) // Throw exception if there is an error, and they passed in no Error_code.
108 {
109 throw Runtime_error(our_err_code, FLOW_UTIL_WHERE_AM_I_STR());
110 }
111} // Node::Node()
112
113log::Logger* Node::this_thread_init_logger_setup(const std::string& thread_type, log::Logger* logger_ptr)
114{
115 using log::Logger;
117 using std::string;
118 using std::hex;
119
120 if (!logger_ptr)
121 {
122 logger_ptr = get_logger();
123 }
124
125 // Use standard beautified formatting for chrono durations/etc. output (and conversely input).
127
128 if (!thread_type.empty())
129 {
130 // Nickname the thread for more convenient logging.
131 string thread_nickname; // Write directly into this string.
132 util::ostream_op_to_string(&thread_nickname, "nod@", hex, this, '_', thread_type);
133 Logger::this_thread_set_logged_nickname(thread_nickname, logger_ptr);
134 }
135
136 return logger_ptr;
137}
138
139Node::~Node() // Virtual.
140{
141 FLOW_LOG_INFO("Waiting for Flow worker thread [T" << m_worker.get_id() << "] to finish.");
142
143 m_task_engine.stop(); // Let current running callbacks finish, then exit m_task_engine.run() and the thread.
144 m_worker.join(); // Wait for thread to finish. Closing activities are in worker_run() after run() exits.
145
146 // Aside: m_net_env_sim (unless null anyway) will probably die due to ref-count=0 at the end of this { block }.
147
148 FLOW_LOG_INFO("Node [" << static_cast<void*>(this) << "] shut down.");
149} // Node::~Node()
150
151void Node::worker_run(const util::Udp_endpoint low_lvl_endpoint)
152{
153 using log::Logger;
156 using boost::asio::socket_base;
157 using boost::asio::post;
158 using std::string;
159 using std::hex;
160
161 // We're in thread W.
162
163 // Global (for this thread, for this Node) logging setup.
165
166 FLOW_LOG_INFO("Flow worker thread reporting for duty. Will bind to [UDP " << low_lvl_endpoint << "].");
167
168 // Set up the UDP socket at the given interface and UDP port.
169 try // Performance not a concern during initialization; use exceptions for convenience.
170 {
171 // BSD socket equivalents: socket(), setsockopt(), setsockopt/ioctl/whatever(), bind(); in that order.
172 m_low_lvl_sock.open(low_lvl_endpoint.protocol()); // Pick IPv4 vs. IPv6 based on the bind IP they provided.
173
174 /* A small UDP buffer size (empirically seen to be ~80 1k datagrams in an older Linux system, for example) can
175 * cause loss when data are coming in very fast, and thread W's processing can't keep, causing
176 * UDP buffer overflow and thus dropped datagrams. So we set it to a high value to avoid that
177 * as much as we can. Also see related @todo in Node doc header (it has to do with moving
178 * UDP processing into a separate thread, so that datagrams are read off UDP as fast as
179 * possible and not blocked by other processing). */
180
181 socket_base::receive_buffer_size rcv_buf_opt(opt(m_opts.m_st_low_lvl_max_buf_size));
182 m_low_lvl_sock.set_option(rcv_buf_opt);
183
184 // Now read it back and store it for informational purposes. OS may not respect what we tried to set.
185 m_low_lvl_sock.get_option(rcv_buf_opt);
186 m_low_lvl_max_buf_size = rcv_buf_opt.value();
187
188 m_low_lvl_sock.non_blocking(true);
189 m_low_lvl_sock.bind(low_lvl_endpoint);
190
191 /* Save it for local_low_lvl_endpoint() (for user access). Why not just have
192 * local_low_lvl_endpoint() call m_low_lvl_sock.local_endpoint()? Answer: we'd have to
193 * protect m_low_lvl_sock with a mutex. Why not save to m_low_lvl_local_endpoint directly from
194 * low_lvl_endpoint? Because if, say, user selected UDP port 0, we want
195 * local_low_lvl_endpoint() to return the actual port the OS chose, not the
196 * more-useless "0." */
197 m_low_lvl_endpoint = m_low_lvl_sock.local_endpoint();
198 }
199 catch (const system_error& exc)
200 {
201 const Error_code sys_err_code = exc.code();
203 FLOW_LOG_WARNING("Unable to set up low-level socket. Node cannot initialize. Thread exiting.");
204
205 // Constructor is waiting to see if we were able to start event loop. We were not.
206 m_task_engine.stop(); // So that Node::running() will return false.
207 m_event_loop_ready.set_value(exc.code());
208
209 // Since we never started m_task_engine, Node::running() will return false.
210 return;
211 }
212
213 // Once run() executes below, this should be the first thing it does (report to constructor thread).
214 post(m_task_engine, [this]()
215 {
216 // We are in thread W.
217 m_event_loop_ready.set_value(Error_code());
218 });
219
220 // When a packet is available for reading (or error), call this->low_lvl_recv_and_handle(<error code>).
222
223 // Also execute some low-priority tasks (such as periodic stat logging) every S_REGULAR_INFREQUENT_TASKS_PERIOD.
225 [this](bool)
226 {
228 });
229
230 /* Go! Sleep until the next registered event. Our handlers (like low_lvl_recv_and_handle()) should themselves
231 * register more events to wait for thus ensuring run() doesn't run out of work (thus doesn't exit) until
232 * something intentionally wants to stop it (stop the Node). */
233
234 FLOW_LOG_INFO("Low-level socket initialized.");
235
237 {
238 FLOW_LOG_INFO("Setting up internal wait-interrupting interrupt signal handler. "
239 "CAUTION! User program MUST avoid using non-boost::asio::signal_set signal handling! "
240 "If it does use non-boost.asio, behavior is undefined.");
241
242 // Add the classic EINTR-inducing signal numbers.
243 m_signal_set.add(SIGINT);
244 m_signal_set.add(SIGTERM);
245
246 /* At this point, receiving those signals will NOT just exit the program (or whatever other non-boost.asio
247 * handling was active before the .add() calls). Before actually reporting successful initialization (reminder:
248 * constructor is currently waiting for us to finish initializatio and report it via the promise object), set up
249 * the handler that'll be called upon receiving the signals. */
250
251 /* this->interrupt_all_waits_internal_sig_handler(err_code, signal_number) will be called on signal (or error).
252 * Note that that function's contract (from its doc comment) is it must execute in thread W.
253 * Indeed boost::asio::io_service semantics guarantee it'll run in thread W (not some
254 * no-man's-land signal handler thread of execution, as one might fear could be the case) for the same reason
255 * the various socket I/O handlers and timer handlers above will run in thread W: because we'll run
256 * m_task_engine.run() below from thread W, and all such functions are guaranteed to run "as if"
257 * post(m_task_engine)ed. Anything post(m_task_engine)ed is guaranteed by boost.asio docs to execute form
258 * the same thread as m_task_engine.run(). */
259 m_signal_set.async_wait([this](const Error_code& sys_err_code, int sig_num)
260 {
261 interrupt_all_waits_internal_sig_handler(sys_err_code, sig_num);
262 });
263 } // if (m_opts.m_st_capture_interrupt_signals_internally)
264 /* else if (!m_opts.m_st_capture_interrupt_signals_internally)
265 * {
266 * Do NOT .add() anything; and don't async_wait() anything. As noted in comment at m_signal_set construction time,
267 * .add() does more than make it possible to .async_wait(). It also replaces any default OS or user's own
268 * non-boost.asio signal handling machinery with boost.asio's signal_set machinery. That can be quite draconian,
269 * so user must specifically set that option to true. If it's false (in all Nodes constructed by user in the entire
270 * app), then whatever signal handling machinery the user wants to set up for themselves (or leave at OS's
271 * discretion) will remain undisturbed. By the way, of course, they can use boost.asio machinery themselves too;
272 * it's just that doing so would still work even if m_st_capture_interrupt_signals_internally were true, so that's
273 * not the dangerous scenario.
274 * } */
275
276 FLOW_LOG_INFO("Flow event loop starting now.");
277
278 m_task_engine.run();
279
280 /* Destructor must have stop()ped m_task_engine. reset() will allow the below poll()s to
281 * proceed. */
282 m_task_engine.reset();
283
284 // Log final state report before closing down. Do not schedule to run again.
286
287 /* We should clean up everything (like close sockets, ensuring user gets errors when trying to
288 * send/receive/etc.), but quickly. We should avoid potentially slow blocking operations like
289 * graceful closes here; if the Node is shutting down, shut it down abruptly. @todo Reconsider.
290 *
291 * Therefore sending RSTs synchronously to all connected peers and then abruptly closing all
292 * sockets should be sufficient. User threads waiting on Readable, Writable, Acceptable will be
293 * woken and informed of the error. */
294
295 try // Performance not a concern during shutdown; use exceptions for convenience.
296 {
297 /* stop() lets any handlers running at the time finish but then run() exits before running any
298 * more handlers. This means there may have been some handlers queued up to run immediately.
299 * Let them run now, as if stop() was called just a few moments later. */
300 FLOW_LOG_TRACE("Flow worker thread event loop stopped. Running queued up handlers.");
301 m_task_engine.poll();
302
303 // Send RSTs (synchronously).
304
305 FLOW_LOG_INFO("Worker thread told to stop (probably Node destructor executed). "
306 "Sending [RST] to all open Flow sockets.");
307
308 // Just in case the RST sending operation blocks for some reason....
309 m_low_lvl_sock.non_blocking(false);
310 for (const auto& sock_pair : m_socks)
311 {
312 // May technically block but should not be for long if so (UDP). Probably OK.
313 sync_sock_low_lvl_rst_send(sock_pair.second); // Will log if there's a problem.
314 }
315 m_low_lvl_sock.non_blocking(true);
316
317 // Now close (synchronously) all open sockets. This posts no handlers on m_task_engine except canceled timers.
318 FLOW_LOG_TRACE("Abruptly closing all Flow peer sockets.");
319 while (!m_socks.empty()) // Don't loop over it -- loop body will erase elements!
320 {
321 Socket_id_to_socket_map::const_iterator sock_it = m_socks.begin();
322 close_connection_immediately(sock_it->first, sock_it->second, error::Code::S_NODE_SHUTTING_DOWN, false);
323 // ^-- defer_delta_checks == false: no need to optimize during shutdown.
324
325 // Note that the above covers not-fully-open (e.g., awaiting SYN_ACK) sockets as well.
326 }
327
328 // Close (synchronously) all server sockets.
329 FLOW_LOG_TRACE("Abruptly closing all Flow server sockets.");
330 while (!m_servs.empty()) // As above.
331 {
332 Port_to_server_map::const_iterator serv_it = m_servs.begin();
333 close_empty_server_immediately(serv_it->first, serv_it->second, error::Code::S_NODE_SHUTTING_DOWN, false);
334 // ^-- defer_delta_checks == false: no need to optimize during shutdown.
335 }
336
337 // Close all Event_sets. This is always synchronous. As advertised, this may trigger on-event behavior.
338 FLOW_LOG_TRACE("Closing all event sets and waking up any on-going waits on those event sets.");
339 while (!m_event_sets.empty()) // As above.
340 {
341 const Event_set::Ptr event_set = *m_event_sets.begin();
342 Event_set::Lock_guard lock(event_set->m_mutex); // Pre-condition for event_set_close_worker().
343 event_set_close_worker(event_set);
344 }
345
346 // Run those canceled timer handlers for cleanliness (and in case that would let go of some resources).
347 FLOW_LOG_TRACE("Cleaning up canceled timer tasks.");
348 m_task_engine.poll();
349
350 /* Note: to ensure everything is cleaned up by this point, turn on TRACE logging and see that
351 * (at least the major classes') destructors for all objects have logged that they are being
352 * destroyed. Be especially careful with any classes that we work with via shared_ptrs
353 * (Peer_socket and others). */
354
355 // Let go of low-level net-stack resources.
356 FLOW_LOG_TRACE("Closing low-level UDP socket.");
357
358 m_low_lvl_sock.non_blocking(false); // Just in case closing operation blocks for some reason....
359 m_low_lvl_sock.close(); // Give socket back to OS (UDP close()).
360 } // try
361 catch (const system_error& exc)
362 {
363 const Error_code sys_err_code = exc.code();
365 FLOW_LOG_WARNING("Could not cleanly shutdown, but still shut down.");
366 }
367} // Node::worker_run()
368
370{
371 // We are in thread U != W, but that's OK (m_low_lvl_local_endpoint cannot change after constructor).
372 return m_low_lvl_endpoint;
373}
374
376{
377 // We are in thread W.
378
379 /* We're being executed from the end of low_lvl_recv_and_handle() or
380 * async part of async_wait_latency_then_handle_incoming().
381 * Thus we must perform tasks, accumulated over the course of that call for efficiency, now. */
382
383 // Handle per-socket tasks.
384
385 // Handle accumulated incoming acknowledgments/rcv_wnd updates.
387 {
389 }
390 // Clean slate for the next receive handler invocation.
392
393 // Handle accumulated outgoing acknowledgments.
395 {
397 }
398 // Clean slate for the next receive handler invocation.
400
401 /* Have read all available low-level data. As a result m_sock_events may have elements (various
402 * events that have occurred, like a socket becoming Readable). Therefore a "delta" check of
403 * all Event_sets is needed, so that the user is informed of any events that he's been waiting
404 * on that have occurred. (See Event_set::async_wait() for details and general strategy.)
405 *
406 * Pass "false" to event_set_check_delta(), meaning "you must in fact check the Event_sets
407 * instead of deferring until later." Could we defer here? Not really. If events have
408 * occurred, we have a "non-blocking" amount of time since then during which we must inform the
409 * user of the events that have occurred. Doing so within one receive loop of detecting the
410 * events meets this criterion (since boost.asio is not given a chance to sleep -- we haven't
411 * exited the handler). If we do exit the handler, boost.asio can sleep (until the next timer
412 * or more UDP data arrive, for example), which certainly breaks the criterion. So we must do
413 * it no later than now.
414 *
415 * Do this AFTER the above, because handle_accumulated_*() may have added more events to handle
416 * here (but event_set_all_check_delta() cannot add more tasks for the former to perform). */
418} // Node::perform_accumulated_on_recv_tasks()
419
420bool Node::running() const
421{
422 // Simply check if the event loop is waiting for events vs. ran out of work (failed).
423 return !m_task_engine.stopped();
424}
425
427 const util::Udp_endpoint& low_lvl_remote_endpoint)
428{
429 using boost::static_pointer_cast;
430 using boost::dynamic_pointer_cast;
431 using boost::shared_ptr;
432 using boost::asio::const_buffer;
433 using std::type_index;
434
435 // We are in thread W.
436
437 // Save before *packet_data potentially annihilated.
438 const size_t packet_data_size = packet_data->size();
439
440 /* `packet_data` contains packet binary data. Deserialize it into a structure and handle it.
441 *
442 * Discussion of 2nd argument (m_dyn_guarantee_one_low_lvl_in_buf_per_socket option):
443 * For simplicity, suppose only DATA packets exist (they are the biggest, easily, in practice).
444 * Further suppose that m_dyn_low_lvl_max_packet_size = 32k, m_st_max_block_size = 1k.
445 *
446 * If the arg is set to true, then the low-level UDP input code will allocate a 32k buffer exactly once per Node;
447 * read up to 1k (+header overhead) into it. Then DATA deserializer will copy the sub-sequence within that
448 * up-to-1k-ish prefix of the 32k buffer that stores the actual DATA payload (which will be vast majority of
449 * that up-to-1k) into a newly allocated buffer for that packet. (This copy will then be moved around, to the
450 * extent possible, eventually ending up [probably copied, ultimately] in the user's data structure of choice,
451 * presumably first living in Receive buffer for the appropriate socket.) Then for the next incoming packet,
452 * the same original 32k buffer will be reused to store the deserialized data again; rinse/repeat.
453 * So the total comes to: 1 32k allocation and deallocation; N up-to-1k allocations and deallocations; and
454 * N up-to-1k copies; where N is # of DATA packets received (could be huge).
455 *
456 * If the arg is set to false, then the low-level UDP input code will allocate a 32k buffer once per DATA packet;
457 * read up to 1k (+header overhead) into it. Then DATA deserializer will MOVE the entire up-to-32k buffer including
458 * up-to-1k-ish prefix within it that stores the actual DATA payload (which will be vast majority of
459 * that up-to-1k) into the place where deserialized data are stored. (This buffer (and information about which part
460 * of it the actual DATA payload) will then be moved around, to the extent possible, eventually ending up [probably
461 * copied, ultimately] in the user's data structure of choice, presumably first living in Receive buffer for the
462 * appropriate socket.) Then for the next incoming packet, the low-level UDP input code will have to reallocate
463 * a new 32k buffer, because the deserializer "took" it for its own purpose (without copying it, thinking that's
464 * faster, which it is -- in that limited context), so it cannot be reused; rinse/repeat.
465 * So the total comes to: N 32k allocations and deallocations; but no copies; and N (quite cheap) moves; where
466 * N is # of DATA packets received (could be huge).
467 *
468 * What is faster? In practice (assuming a CPU-limited environment, meaning no loss, no delay, and a constant
469 * stream of data being sent and received), at least on a localhost Mac setup, it turns out that the `true` way
470 * is a bit faster, in that throughput is consistently higher by ~5%. (This should be taken with grain of salt:
471 * improvements in other areas of the code could make this impact become higher in proportion, etc. etc.)
472 * This isn't too surprising; N-bytes alloc/copy/dealloc is bad, but a 32x-bigger alloca/dealloc can certainly
473 * be worse.
474 *
475 * However, suppose we lower knob for "32k" to ~1k. Then we have, essentially:
476 * true => N 1k x alloc/copy/dealloc;
477 * false => N 1k alloc/dealloc.
478 *
479 * In this case `false` might be somewhat better. The good thing is that, as of this writing, all these values
480 * have option knobs. Of course, all this is subject to change and/or become automated over time.
481 *
482 * A huge caveat about the above discussion: Not all of the allocs/copies/deallocs occur in the same thread,
483 * so in a multi-core environment, throughput might be improved by parallelism vs. the implications of above text.
484 * Above could be rewritten to account for this; but at the moment the above should at least elucidate the issues
485 * involved sufficiently for the reader to reason about this on their own. */
489 // ^-- That logged as much or as little as appropriate (we need not). null if error. `packet_data` may be blown away.
490
491 if (!packet)
492 {
493 return; // As advertised, caller is free to do anything they want to *packet_data now (e.g., read another packet).
494 }
495
496 const auto& packet_ref = *packet;
497
498 /* Preliminary notes for the rest of this important method:
499 * - Invalid fields: If some field in the packet is simply erroneous, such as
500 * a negative number where a positive number is expected or something, the packet is
501 * simply dropped. No RST is sent and no error is reported to the user (other than a log
502 * message). This is similar to what at least the Zaghal/Khan FSM model for TCP describes, and
503 * it seems reasonable (and easy). This only applies to fields that could never be right in any
504 * situation.
505 * - Other errors: More likely are other error situations, such as receiving a SYN to a socket
506 * that's in SYN_SENT state. Since such problems can be a result of delayed packets or other
507 * conditions, there are various possibilities for response. Examples:
508 * - Example: Getting SYN in SYN_SENT: always illegal but could happen due to delayed packets or
509 * freak coincidences; send RST and close connection.
510 * - Example: Getting SYN on a non-listening port: common error situation; send RST.
511 * - Example: Getting SYN_ACK twice: duplicate packet or result of loss; send SYN_ACK_ACK
512 * again and continue normally.
513 * - Example: Getting RST in ESTABLISHED: other side is resetting connection. Close connection;
514 * don't respond with RST. */
515
516 // Sanity-check basic fields.
517
518 // Sanity-check source and destination Flow ports.
519 const flow_port_t flow_local_port = packet->m_packed.m_dst_port;
520 const flow_port_t flow_remote_port = packet->m_packed.m_src_port;
521 if (flow_remote_port == S_PORT_ANY)
522 {
523 FLOW_LOG_WARNING("Invalid src_port value [" << S_PORT_ANY << "] from [UDP " << low_lvl_remote_endpoint << "]. "
524 "Dropping packet.");
525 return;
526 }
527 // else
528 if (flow_local_port == S_PORT_ANY)
529 {
530 FLOW_LOG_WARNING("Invalid dst_port value [" << S_PORT_ANY << "] from [UDP " << low_lvl_remote_endpoint << "]. "
531 "Dropping packet.");
532 return;
533 }
534
535 // Sanity-check data size. Should be guaranteed during deserialization.
536 assert((typeid(packet_ref) != typeid(Data_packet)) ||
537 (!(static_pointer_cast<const Data_packet>(packet)->m_data.empty())));
538
539 // Demultiplex to proper destination socket; and handle according to state machine.
540
541 /* Check if destination port is to a peer-to-peer socket.
542 * It's important to check for this before checking whether the port matches a listening server
543 * socket. Example: server S listens on port 80 (so m_servs[80] exists); client C at remote
544 * endpoint R connects to S:80 (so now m_socks[R, 80] also exists); then client C sends a data
545 * packet to S:80. Should this go to S:[R, 80] or S:80? Clearly the former.
546 *
547 * The only caveat is: what if client C1 also wants to now connect S:80? With this algorithm this
548 * will go to S:80, which is good. However what if C1's address is C1:R? Then that SYN would
549 * erroneously go to the existing peer-to-peer socket S:[R, 80]. However that means somehow 2
550 * sockets bound to the same port and thus the same remote endpoint which shouldn't happen. */
551 const Socket_id socket_id{ { low_lvl_remote_endpoint, flow_remote_port }, flow_local_port };
552 Socket_id_to_socket_map::const_iterator sock_entry = m_socks.find(socket_id);
553 if (sock_entry != m_socks.end())
554 {
555 // Successful demultiplex to a peer socket.
556 Peer_socket::Ptr sock = sock_entry->second;
557
558 /* Record packet size, type in stats of the socket, now that we know the latter. (What if sock unknown?
559 * See below.) */
560 sock->m_rcv_stats.low_lvl_packet(typeid(packet_ref), packet_data_size);
561
562 // state cannot change unless this thread W changes it, so no need to lock object across the below code.
563 const Peer_socket::Int_state state = sock->m_int_state;
564
565 /* A little bit of extra validation. Could not check against max_block_size before
566 * demuxing the socket, as it's a per-socket option. */
567 const size_t max_block_size = sock->max_block_size();
568
569 {
570 shared_ptr<Data_packet> data;
571 shared_ptr<Ack_packet> ack;
572 shared_ptr<Syn_packet> syn;
573 shared_ptr<Syn_ack_packet> syn_ack;
574
575 /* Perform any tweaks and checks based on the new (for this packet) knowledge that is the identity of `sock`.
576 *
577 * One tweak in particular that applies to multiple packet types as done below:
578 * Every Sequence_number stored therein can be more conveniently logged as relative to the ISN (so 0, 1, etc.
579 * instead of some huge 64-bit thing) as well as a multiple of max-block-size (so 2x8192, 3x8192, etc.; as opposed
580 * to the actual results of those products). Now that we know `sock`, we can apply those values from it
581 * to the known Sequence_number members, so that they are logged nicely instead of 64-bit monstrosities.
582 * Nuance: you'll note (ISN + 1) popping up repeatedly; the `+ 1` is explained in class Sequence_number doc header
583 * near *Metadata* discussion. */
584 if ((data = dynamic_pointer_cast<Data_packet>(packet)))
585 {
586 // If it IS a DATA packet, and its contained data field is too large....
587 if (data->m_data.size() > max_block_size)
588 {
589 FLOW_LOG_WARNING("Packet of type [" << packet->m_type_ostream_manip << "] targeted at [" << sock << "] in "
590 "state [" << state << "] has data size [" << data->m_data.size() << "] exceeding "
591 "maximum block size [" << max_block_size << "]. Dropping packet.");
592 return;
593 }
594
595 data->m_seq_num.set_metadata('R', sock->m_rcv_init_seq_num + 1, max_block_size);
596 }
597 else if ((ack = dynamic_pointer_cast<Ack_packet>(packet)))
598 {
599 for (const Ack_packet::Individual_ack::Ptr& individual_ack : ack->m_rcv_acked_packets)
600 {
601 individual_ack->m_seq_num.set_metadata('L', sock->m_snd_init_seq_num + 1, max_block_size);
602 }
603 }
604 else if ((syn = dynamic_pointer_cast<Syn_packet>(packet)))
605 {
606 syn->m_init_seq_num.set_metadata('R', syn->m_init_seq_num + 1, max_block_size);
607 }
608 else if ((syn_ack = dynamic_pointer_cast<Syn_ack_packet>(packet)))
609 {
610 syn_ack->m_init_seq_num.set_metadata('R', syn_ack->m_init_seq_num + 1, max_block_size);
611 }
612 }
613
614 // else
615
616 bool reply_with_rst = false;
617 Error_code err_code; // Initialize to success.
618
619 switch (state)
620 {
622 {
623 // We're in SYN_SENT state: awaiting SYN_ACK.
624 if (typeid(packet_ref) == typeid(Syn_packet)) // To SYN_SENT state.
625 {
626 /* Replying to SYN with SYN is not allowed. We don't support simultaneous active opens.
627 * (RFC 793 does, but it's not a mainstream situation.) In fact one would have to perform
628 * Node::connect() while binding to a service port, which we do not allow.
629 *
630 * Another possibility is that this SYN is from some previous connection with the same
631 * socket pair, and we should disregard (no RST) it via sequence number checking. For now
632 * we assume our port reservation scheme on both ends eliminates socket pair reuse in this
633 * way and thus don't check for this.
634 *
635 * So, other side is misbehaving. Send RST and close connection. */
637 reply_with_rst = true;
638 }
639 else if (typeid(packet_ref) == typeid(Syn_ack_ack_packet)) // To SYN_SENT state.
640 {
641 // Similar to SYN. We should be sending SYN_ACK_ACK (later) -- not they.
643 reply_with_rst = true;
644 }
645 else if (typeid(packet_ref) == typeid(Data_packet)) // To SYN_SENT state.
646 {
647 // Similar to SYN. Should not be getting DATA yet -- must complete handshake first.
649 reply_with_rst = true;
650 }
651 else if (typeid(packet_ref) == typeid(Ack_packet)) // To SYN_SENT state.
652 {
653 // Similar to DATA.
655 reply_with_rst = true;
656 }
657 else if (typeid(packet_ref) == typeid(Rst_packet)) // To SYN_SENT state.
658 {
659 /* Sent SYN; received RST. Could be common: other side may simply not be listening on the
660 * port to which we wanted to connect. Close connection but don't reply to RST with RST
661 * (not useful; could cause endless RSTing). */
662 err_code = error::Code::S_CONN_REFUSED; // To SYN_SENT state.
663 }
664 else if (typeid(packet_ref) == typeid(Syn_ack_packet)) // To SYN_SENT state.
665 {
666 // Sent SYN; received SYN_ACK. Great.
668 sock,
669 static_pointer_cast<const Syn_ack_packet>(packet));
670 break;
671 }
672 else
673 {
674 assert(false); // We've eliminated this possibility already above.
675 }
676 break;
677 } // case Peer_socket::Int_state::S_SYN_SENT:
678
680 {
681 // We're in SYN_RCVD state: awaiting SYN_ACK_ACK.
682 if (typeid(packet_ref) == typeid(Syn_packet)) // To SYN_RCVD state.
683 {
684 /* Replying to SYN_ACK with SYN is not allowed. However it could be be a duplicated
685 * original SYN (ha ha), in which case it's not an error and is dropped silently. Otherwise
686 * it's an error.
687 *
688 * Another possibility is that this SYN is from some previous connection with the same
689 * socket pair, and we should disregard (no RST) it via sequence number checking. For now
690 * we assume our port reservation scheme on both ends eliminates socket pair reuse in this
691 * way and thus don't check for this.
692 *
693 * So, decide if other side is misbehaving. If so, send RST and close connection.
694 * Otherwise ignore. */
695 if (static_pointer_cast<const Syn_packet>(packet)->m_init_seq_num == sock->m_rcv_init_seq_num)
696 {
697 // Should be plenty of context if TRACE logging enabled anyway.
698 FLOW_LOG_TRACE("Duplicate valid [" << packet->m_type_ostream_manip << "] packet; ignoring.");
699 }
700 else
701 {
703 reply_with_rst = true;
704 }
705 }
706 else if (typeid(packet_ref) == typeid(Syn_ack_packet)) // To SYN_RCVD state.
707 {
708 // SYN_ACK in response to SYN_ACK is always wrong.
710 reply_with_rst = true;
711 break;
712 }
713 else if (typeid(packet_ref) == typeid(Data_packet)) // To SYN_RCVD state.
714 {
715 /* This is legitimate under loss conditions. Suppose we are SYN_RCVD, send SYN_ACK, they
716 * receive SYN_ACK, so they go SYN_SENT -> ESTABLISHED, and they send SYN_ACK_ACK. Now
717 * suppose that SYN_ACK_ACK was dropped. Now say they send DATA (which they have every
718 * right do to: their state is ESTABLISHED). So we've received DATA, but we're still in
719 * SYN_RCVD. It can also happen if DATA and SYN_ACK_ACK are re-ordered by the network. */
720 handle_data_to_syn_rcvd(sock, static_pointer_cast<Data_packet>(packet));
721 }
722 else if (typeid(packet_ref) == typeid(Ack_packet)) // To SYN_RCVD state.
723 {
724 /* On the other hand, since we are not yet ESTABLISHED, we couldn't have sent DATA, so we
725 * shouldn't be getting any ACKs. */
727 reply_with_rst = true;
728 }
729 else if (typeid(packet_ref) == typeid(Rst_packet)) // To SYN_RCVD state.
730 {
731 /* Received SYN; sent SYN_ACK; received RST. Shouldn't be common, but they are refusing
732 * connection at this late stage for some reason.
733 *
734 * Close connection but don't reply to RST with RST (not useful; could cause endless
735 * RSTing). */
737 }
738 else if (typeid(packet_ref) == typeid(Syn_ack_ack_packet)) // To SYN_RCVD state.
739 {
740 // Sent SYN_ACK; received SYN_ACK_ACK. Great.
741 handle_syn_ack_ack_to_syn_rcvd(socket_id, sock, static_pointer_cast<const Syn_ack_ack_packet>(packet));
742 }
743 else
744 {
745 assert(false); // We've eliminated this possibility already above.
746 }
747 break;
748 } // case Peer_socket::Int_state::S_SYN_RCVD:
749
751 {
752 /* We're in ESTABLISHED state: will accept DATA and ACKs, etc. However just because we've
753 * ESTABLISHED does not mean they have. Specifically, suppose we were SYN_SENT, got SYN_ACK,
754 * sent SYN_ACK_ACK, so transferred to ESTABLISHED. Now suppose that SYN_ACK_ACK was dropped.
755 * We're ESTABLISHED, but they're going to resend SYN_ACK due to retransmission timer.
756 * So we must be ready for this resent SYN_ACK, not just DATA and such. */
757 if (typeid(packet_ref) == typeid(Syn_packet)) // To ESTABLISHED state.
758 {
759 /* SYN while in ESTABLISHED is not allowed. However it could be a duplicated original SYN
760 * (ha ha), in which case it's not an error and is dropped silently. Otherwise it's an
761 * error.
762 *
763 * Another possibility is that this SYN is from some previous connection with the same
764 * socket pair, and we should disregard (no RST) it via sequence number checking. For now
765 * we assume our port reservation scheme on both ends eliminates socket pair reuse in this
766 * way and thus don't check for this.
767 *
768 * So, decide if other side is misbehaving. If so, send RST and close connection.
769 * Otherwise ignore. */
770 if (static_pointer_cast<const Syn_packet>(packet)->m_init_seq_num == sock->m_rcv_init_seq_num)
771 {
772 // Should be plenty of context if TRACE logging enabled anyway.
773 FLOW_LOG_TRACE("Duplicate valid [" << packet->m_type_ostream_manip << "] packet; ignoring.");
774 }
775 else
776 {
778 reply_with_rst = true;
779 }
780 break;
781 } // if (SYN)
782 else if (typeid(packet_ref) == typeid(Syn_ack_packet)) // To ESTABLISHED state.
783 {
784 /* This could be the corner case above (we are original SYNner, but our SYN_ACK_ACK was
785 * dropped, so they're resending SYN_ACK, which they think may have been lost). It could
786 * also just be a duplicate SYN_ACK. Suppose we respond with SYN_ACK_ACK as before. If
787 * they're resending SYN_ACK due to loss, then this is clearly proper; if they get
788 * SYN_ACK_ACK this time, they're good to go. If it's just a duplicate (they're already
789 * ESTABLISHED), then they'll similarly just accept SYN_ACK_ACK as a harmless duplicate.
790 * So, that's fine.
791 *
792 * As usual it can also be an invalid packet, in which case we decide the other side is
793 * misbehaving and RST/close connection. */
794
795 auto syn_ack = static_pointer_cast<const Syn_ack_packet>(packet);
796
797 if (sock->m_active_connect && (syn_ack->m_init_seq_num == sock->m_rcv_init_seq_num))
798 {
799 handle_syn_ack_to_established(sock, syn_ack);
800 }
801 else
802 {
804 reply_with_rst = true;
805 }
806 }
807 else if (typeid(packet_ref) == typeid(Syn_ack_ack_packet)) // To ESTABLISHED state.
808 {
809 /* We're ESTABLISHED but got SYN_ACK_ACK. Suppose we were the active connector. Then
810 * SYN_ACK_ACK is simply illegal at all times. Now suppose we were the passive connectee.
811 * To get to ESTABLISHED, we had to have received SYN_ACK_ACK already. Therefore this
812 * must be a duplicate SYN_ACK_ACK (assuming it equals the original SYN_ACK_ACK; otherwise
813 * it's just invalid).
814 *
815 * If it's a duplicate SYN_ACK_ACK, we just drop it as harmless. Otherwise the other guy
816 * is misbehaving, so we RST/close. */
817 if ((!sock->m_active_connect) &&
818 (static_pointer_cast<const Syn_ack_ack_packet>(packet)->m_packed.m_security_token
819 == sock->m_security_token))
820 {
821 // Should be plenty of context if TRACE logging enabled anyway.
822 FLOW_LOG_TRACE("Duplicate valid [" << packet->m_type_ostream_manip << "] packet; ignoring.");
823 }
824 else
825 {
827 reply_with_rst = true;
828 }
829 }
830 else if (typeid(packet_ref) == typeid(Rst_packet)) // To ESTABLISHED state.
831 {
832 /* Shouldn't be common, but they are resetting connection for some reason.
833 *
834 * Close connection but don't reply to RST with RST (not useful; could cause endless
835 * RSTing). */
837 }
838 else if (typeid(packet_ref) == typeid(Data_packet)) // To ESTABLISHED state.
839 {
840 // Got DATA! Great.
842 static_pointer_cast<Data_packet>(packet),
843 false); // false <=> packet received in ESTABLISHED.
844 }
845 else if (typeid(packet_ref) == typeid(Ack_packet)) // To ESTABLISHED state.
846 {
847 // Got ACK! Great.
848 handle_ack_to_established(sock, static_pointer_cast<const Ack_packet>(packet));
849 }
850 else
851 {
852 assert(false); // We've eliminated this already above.
853 }
854 break;
855 } // case Peer_socket::Int_state::S_ESTABLISHED:
857 assert(false); // Can't be CLOSED, since it's in m_socks.
858 } // switch (state)
859
860 // If above found we need to close this connection and/or send RST, do that.
861 if (err_code)
862 {
863 if (reply_with_rst)
864 {
865 FLOW_LOG_WARNING("Packet of type [" << packet->m_type_ostream_manip << "] targeted at [" << sock << "] in "
866 "state [" << state << "]; replying with RST.");
868 }
869 else
870 {
871 FLOW_LOG_WARNING("Packet of type [" << packet->m_type_ostream_manip << "] targeted at [" << sock << "] in "
872 "state [" << state << "]; dropping without reply.");
873 }
874
875 /* Close connection in our structures (inform user if necessary as well). Pre-conditions
876 * assumed by call: sock in m_socks and sock->state() == S_OPEN (yes, since sock is m_socks);
877 * err_code contains the reason for the close (yes). This will empty the Send and Receive
878 * buffers (if applicable, i.e., only if state is ESTABLISHED or later). That is OK,
879 * because this is the abrupt type of close (error). */
880 close_connection_immediately(socket_id, sock, err_code, true);
881 /* ^-- defer_delta_check == true: because the only way to get to this method is from
882 * async_low_lvl_recv(), which will perform event_set_all_check_delta(false) at the end of itself,
883 * before the boost.asio handler exits. See Node::m_sock_events doc header for details. */
884 } // if (error)
885
886 return;
887 } // if (destination port is to a peer socket.)
888 // else destination port is not to a peer socket.
889
890 // Check if destination port is to a server socket.
891 Port_to_server_map::const_iterator serv_entry = m_servs.find(flow_local_port);
892 if (serv_entry != m_servs.end())
893 {
894 // Successful demultiplex to a server socket.
895 Server_socket::Ptr serv = serv_entry->second;
896
897 /* Server socket can be in the following states:
898 * - S_LISTENING: Waiting for SYNs. Anything except a SYN is an error and yields an RST (except
899 * an RST does not yield an RST, as that's not useful and could lead to endless RSTing).
900 * - S_CLOSING: No longer listening (in the process of being removed from Node). Anything
901 * except an RST yields an RST (interepreted by other side as: connection refused [if SYN] or
902 * invalid message).
903 *
904 * S_CLOSED means it's not in m_serv, so that's not a possible state. */
905
906 // state cannot change unless this thread W changes it, so no need to lock object across the below code.
907 const Server_socket::State state = serv->state();
908 if ((state == Server_socket::State::S_LISTENING) &&
909 (typeid(packet_ref) == typeid(Syn_packet)))
910 {
911 auto new_sock = handle_syn_to_listening_server(serv,
912 static_pointer_cast<const Syn_packet>(packet),
913 low_lvl_remote_endpoint);
914 if (new_sock)
915 {
916 /* Record packet size, type in stats of the socket, now that we know the latter. It's kind of interesting,
917 * as new_sock (as name implies) was just created based on the very packet we are registering here.
918 * If it's a packet not corresponding to any Peer_socket, it won't be counted against any Peer_socket's stats
919 * (but that is an unusual case; though there is a to-do [as of this writing] for counting things Node-wide
920 * as well). */
921 new_sock->m_rcv_stats.low_lvl_packet(typeid(packet_ref), packet_data_size);
922 }
923 return;
924 }
925 // else
926 if (typeid(packet_ref) != typeid(Rst_packet))
927 {
928 FLOW_LOG_WARNING("Packet of type [" << packet->m_type_ostream_manip << "] from "
929 "[UDP " << low_lvl_remote_endpoint << "] targeted at state [" << state << "] "
930 "server port [" << flow_local_port << "]; replying with RST.");
931 async_no_sock_low_lvl_rst_send(packet, low_lvl_remote_endpoint);
932 }
933 else
934 {
935 FLOW_LOG_WARNING("Packet of type [" << packet->m_type_ostream_manip << "] from "
936 "[UDP " << low_lvl_remote_endpoint << "] targeted at state [" << state << "] "
937 "server port [" << flow_local_port << "]; dropping without reply.");
938 }
939 return;
940 } // if (demuxed to server socket)
941 // else destination port is not to a peer-to-peer socket or a server socket.
942
943 // Handle NOT_A_SOCKET state.
944
945 /* Destination socket is not a socket. In this degenerate state any incoming packets are
946 * rejected. Help the other side by issuing an RST. In particular they may be trying to SYN a
947 * non-listening socket, for example, and the RST will prevent them from waiting around before
948 * giving up.
949 *
950 * Exception: Don't issue an RST to an RST, as that's not useful (and could result in endless
951 * RSTing, maybe). */
952 if (typeid(packet_ref) != typeid(Rst_packet))
953 {
954 FLOW_LOG_WARNING("Packet from [UDP " << low_lvl_remote_endpoint << "] specified state [NOT_A_SOCKET] "
955 "port [" << flow_local_port << "]; meaning no such socket known; replying with RST.");
956 async_no_sock_low_lvl_rst_send(packet, low_lvl_remote_endpoint);
957 }
958 else
959 {
960 FLOW_LOG_WARNING("Packet of type [" << packet->m_type_ostream_manip << "] from "
961 "[UDP " << low_lvl_remote_endpoint << "] specified state [NOT_A_SOCKET] "
962 "port [" << flow_local_port << "]; meaning no such socket known; dropping without reply.");
963 }
964} // Node::handle_incoming()
965
966/// @cond
967/* -^- Doxygen, please ignore the following. (Don't want docs generated for temp macro; this is more maintainable
968 * than specifying the macro name to omit it, in Doxygen-config EXCLUDE_SYMBOLS.) */
969
970/* Normaly I try to avoid macro cleverness, but in this case to get a nice printout we need the
971 * # technique, and also this eliminates quite a bit of repetition. So let's.... */
972#define VALIDATE_STATIC_OPTION(ARG_opt) \
973 validate_static_option(opts.ARG_opt, m_opts.ARG_opt, #ARG_opt, err_code)
974#define VALIDATE_CHECK(ARG_check) \
975 validate_option_check(ARG_check, #ARG_check, err_code)
976
977// -v- Doxygen, please stop ignoring.
978/// @endcond
979
980const Node_options& Node::validate_options(const Node_options& opts, bool init, Error_code* err_code) const
981{
982 // Note: Can't use FLOW_ERROR_EXEC_AND_THROW_ON_ERROR() as return is a reference. @todo Look into solution?
983 if (!err_code) // Call the non-null err_code version of ourselves, and throw exception on error.
984 {
985 Error_code our_err_code;
986 const Node_options& result = validate_options(opts, init, &our_err_code);
987 if (our_err_code)
988 {
990 }
991 return result;
992 }
993 // else
994
995 /* We are to validate the given set of global option values. If init, then the context is that an
996 * already-running Node is being called with set_options(), i.e. user is modifying options for an
997 * existing Node. In that case we must ensure that no static (unchangeable) option's value
998 * would be changed by this.
999 *
1000 * If not init, then we're validating the options set given to us in Node constructor. So in
1001 * that case that static option check is to be skipped, since there are no existing option values
1002 * to change.
1003 *
1004 * Finally, we must check for individual integrity of the specified values (including consistency
1005 * with other option values). */
1006
1007 // We are in thread U != W or in thread W.
1008
1009 if (!init)
1010 {
1011 /* As explained above, they're trying to change an existing Node's option values. Ensure all
1012 * the static options' values are the same in opts and m_opts. */
1013
1014 // Explicitly documented pre-condition is that m_opts is already locked if necessary. So don't lock.
1015
1016 const bool static_ok
1017 = VALIDATE_STATIC_OPTION(m_st_capture_interrupt_signals_internally) &&
1018 VALIDATE_STATIC_OPTION(m_st_low_lvl_max_buf_size) &&
1019 VALIDATE_STATIC_OPTION(m_st_timer_min_period);
1020
1021 if (!static_ok)
1022 {
1023 // validate_static_option() has set *err_code.
1024 return opts;
1025 }
1026 // else
1027 } // if (!init)
1028
1029 // Now sanity-check the values themselves. @todo Comment and reconsider these?
1030
1031 const bool checks_ok
1032 = VALIDATE_CHECK(opts.m_st_low_lvl_max_buf_size >= 128 * 1024) &&
1033 VALIDATE_CHECK(opts.m_st_timer_min_period.count() >= 0) &&
1034 VALIDATE_CHECK(opts.m_dyn_low_lvl_max_packet_size >= 512);
1035
1036 if (!checks_ok)
1037 {
1038 // On error, validate_option_check() has set *err_code.
1039 return opts;
1040 }
1041 // else
1042
1043 /* The above validated only global options. Now verify that the per-socket template options (that
1044 * will be used to generate child Peer_sockets) are also valid. */
1045 sock_validate_options(opts.m_dyn_sock_opts, 0, err_code); // Will not throw. Will set *err_code if needed.
1046 // On error, that set *err_code.
1047
1048 return opts;
1049
1050#undef VALIDATE_CHECK
1051#undef VALIDATE_STATIC_OPTION
1052} // Node::validate_options()
1053
1054bool Node::set_options(const Node_options& opts, Error_code* err_code)
1055{
1056 namespace bind_ns = util::bind_ns;
1057 FLOW_ERROR_EXEC_AND_THROW_ON_ERROR(bool, Node::set_options, bind_ns::cref(opts), _1);
1058 // ^-- Call ourselves and return if err_code is null. If got to present line, err_code is not null.
1059
1060 // We are in thread U != W.
1061
1062 if (!running())
1063 {
1065 return Peer_socket::Ptr().get();
1066 }
1067 // else
1068
1069 /* We just want to replace m_opts with a copy of opts. First validate opts (including with
1070 * respect to m_opts, and also check for invalid values and such), then copy it over. */
1071
1072 // Log new options values. A bit computationally expensive so just use TRACE for now. @todo Reconsider?
1073 FLOW_LOG_TRACE("\n\n" << opts);
1074
1075 // Will be writing if all goes well, so must acquire exclusive ownership of m_opts.
1077
1078 /* Validate the new option set (including ensuring they're not changing static options' values).
1079 * Note that an explicit pre-condition of this method is that m_opts_mutex is locked if needed,
1080 * hence the above locking statement is not below this call. */
1081 validate_options(opts, false, err_code);
1082 if (*err_code)
1083 {
1084 return false;
1085 }
1086 // else
1087
1088 // Boo-ya.
1089 m_opts = opts;
1090 return true;
1091} // Node::set_options()
1092
1093bool Node::validate_option_check(bool check, const std::string& check_str,
1094 Error_code* err_code) const
1095{
1096 if (!check)
1097 {
1098 FLOW_LOG_WARNING("When changing options, check [" << check_str << "] is false. "
1099 "Ignoring entire option set.");
1101 return false;
1102 }
1103 // else
1104 return true;
1105}
1106
1108{
1109 return opt(m_opts); // Lock, copy entire struct, unlock, return copy.
1110}
1111
1113{
1115}
1116
1118{
1120
1121 // We are in thread W.
1122
1123 // The frequency is low enough to where we can use detailed logging with impunity in this method.
1124
1125 FLOW_LOG_INFO("[=== Periodic state logging. ===");
1126
1127 // Just log the stats for each Peer_socket. @todo Add Server_socket, Event_set when they have stats; global state.
1128
1129 for (const auto& id_and_sock : m_socks)
1130 {
1131 Peer_socket::Ptr sock = id_and_sock.second;
1132 sock_log_detail(sock);
1133 }
1134
1135 FLOW_LOG_INFO("=== Periodic state logging. ===]");
1136
1137 if (reschedule)
1138 {
1140 [this](bool)
1141 {
1143 });
1144 }
1145} // Node::perform_regular_infrequent_tasks()
1146
1148{
1149 using boost::hash_combine;
1150
1151 size_t val = 0;
1152 hash_combine(val, m_remote_endpoint);
1153 hash_combine(val, m_local_port);
1154 return val;
1155}
1156
1157bool operator==(const Node::Socket_id& lhs, const Node::Socket_id& rhs)
1158{
1159 return (lhs.m_local_port == rhs.m_local_port) && (lhs.m_remote_endpoint == rhs.m_remote_endpoint);
1160}
1161
1163{
1164 return socket_id.hash();
1165}
1166
1167} // namespace flow::net_flow
An std::runtime_error (which is an std::exception) that stores an Error_code.
Definition: error.hpp:49
Logger * get_logger() const
Returns the stored Logger pointer, particularly as many FLOW_LOG_*() macros expect.
Definition: log.cpp:224
Interface that the user should implement, passing the implementing Logger into logging classes (Flow'...
Definition: log.hpp:1291
A user-set collection of sockets and desired conditions on those sockets (such as: "socket has data t...
Definition: event_set.hpp:254
util::Lock_guard< Mutex > Lock_guard
Short-hand for RAII lock guard of Mutex. Use instead of boost::lock_guard for release() at least.
Definition: event_set.hpp:791
Objects of this class can be fed to Node to make it internally simulate network conditions like loss,...
An object of this class is a single Flow-protocol networking node, in the sense that: (1) it has a di...
Definition: node.hpp:937
void handle_data_to_established(const Socket_id &socket_id, Peer_socket::Ptr sock, boost::shared_ptr< Data_packet > packet, bool syn_rcvd_qd_packet)
Handles a just-deserialized, just-demultiplexed, low-level DATA packet delivered to the given peer so...
void perform_accumulated_on_recv_tasks()
Performs all tasks to be performed at the end of low_lvl_recv_and_handle() or async part of async_wai...
Definition: node.cpp:375
void handle_accumulated_acks(const Socket_id &socket_id, Peer_socket::Ptr sock)
Helper of perform_accumulated_on_recv_tasks() that handles any incoming acknowledgments and rcv_wnd u...
Node_options options() const
Copies this Node's option set and returns that copy.
Definition: node.cpp:1107
void handle_incoming(util::Blob *packet_data, const util::Udp_endpoint &low_lvl_remote_endpoint)
Handles a just-received, not-yet-deserialized low-level packet.
Definition: node.cpp:426
~Node() override
Destroys Node.
Definition: node.cpp:139
static const size_t & S_NUM_PORTS
Total number of Flow ports in the port space, including S_PORT_ANY.
Definition: node.hpp:942
size_t m_low_lvl_max_buf_size
OS-reported m_low_lvl_sock UDP receive buffer maximum size, obtained right after we OS-set that setti...
Definition: node.hpp:3771
bool set_options(const Node_options &opts, Error_code *err_code=0)
Dynamically replaces the current options set (options()) with the given options set.
Definition: node.cpp:1054
void sync_sock_low_lvl_rst_send(Peer_socket::Ptr sock)
Sends an RST to the other side of the given socket, synchronously.
void handle_syn_ack_ack_to_syn_rcvd(const Socket_id &socket_id, Peer_socket::Ptr sock, boost::shared_ptr< const Syn_ack_ack_packet > syn_ack_ack)
Handles a just-deserialized, just-demultiplexed low-level SYN_ACK_ACK packet delivered to the given p...
static const Fine_duration S_REGULAR_INFREQUENT_TASKS_PERIOD
Time interval between performing "infrequent periodic tasks," such as stat logging.
Definition: node.hpp:3693
Opt_type opt(const Opt_type &opt_val_ref) const
Obtain a copy of the value of a given option in a thread-safe manner.
Definition: node.hpp:4180
bool sock_validate_options(const Peer_socket_options &opts, const Peer_socket_options *prev_opts, Error_code *err_code) const
Analogous to validate_options() but checks per-socket options instead of per-Node options.
Options_mutex m_opts_mutex
The mutex protecting m_opts.
Definition: node.hpp:3707
void handle_accumulated_pending_acks(const Socket_id &socket_id, Peer_socket::Ptr sock)
Helper of perform_accumulated_on_recv_tasks() that handles any additional individual outgoing acknowl...
size_t max_block_size() const
The maximum number of bytes of user data per received or sent block on connections generated from thi...
Definition: node.cpp:1112
const util::Udp_endpoint & local_low_lvl_endpoint() const
Return the UDP endpoint (IP address and UDP port) which will be used for receiving incoming and sendi...
Definition: node.cpp:369
void worker_run(const util::Udp_endpoint low_lvl_endpoint)
Worker thread W (main event loop) body.
Definition: node.cpp:151
boost::unique_future< Error_code > m_event_loop_ready_result
The future object through which the non-W thread waits for m_event_loop_ready to be set to success/fa...
Definition: node.hpp:3883
void event_set_close_worker(Event_set::Ptr event_set)
The guts of event_set_close_worker_check_state(): same thing, but assumes Event_set::state() == Event...
Definition: event_set.cpp:1332
void async_no_sock_low_lvl_rst_send(Low_lvl_packet::Const_ptr causing_packet, const util::Udp_endpoint &low_lvl_remote_endpoint)
Sends an RST to the given UDP endpoint in response to the given incoming low-level packet that came f...
Definition: low_lvl_io.cpp:586
void sock_log_detail(Peer_socket::Const_ptr sock) const
Logs a verbose state report for the given socket.
static const size_t & S_NUM_EPHEMERAL_PORTS
Total number of Flow "ephemeral" ports (ones reserved locally at random with Node::listen(S_PORT_ANY)...
Definition: node.hpp:951
boost::unordered_set< Peer_socket::Ptr > m_socks_with_accumulated_pending_acks
Within a given low_lvl_recv_and_handle() or async part of async_wait_latency_then_handle_incoming() (...
Definition: node.hpp:3849
boost::unordered_set< Peer_socket::Ptr > m_socks_with_accumulated_acks
Within a given low_lvl_recv_and_handle() or async part of async_wait_latency_then_handle_incoming() c...
Definition: node.hpp:3867
friend size_t hash_value(const Socket_id &socket_id)
Definition: node.cpp:1162
util::Thread m_worker
Worker thread (= thread W). Other members should be initialized before this to avoid race condition.
Definition: node.hpp:3893
void event_set_all_check_delta(bool defer_delta_check)
For each WAITING Event_set within the Node: checks for any events that hold, and if any do hold,...
Definition: event_set.cpp:1129
Socket_id_to_socket_map m_socks
The peer-to-peer connections this Node is currently tracking.
Definition: node.hpp:3792
bool validate_option_check(bool check, const std::string &check_str, Error_code *err_code) const
Helper that, if the given condition is false, logs and returns an error; used to check for option val...
Definition: node.cpp:1093
static const flow_port_t & S_FIRST_SERVICE_PORT
The port number of the lowest service port, making the range of service ports [S_FIRST_SERVICE_PORT,...
Definition: node.hpp:957
Peer_socket::Options_lock Options_lock
Short-hand for lock that acquires exclusive access to an Options_mutex.
Definition: node.hpp:1439
friend bool operator==(const Socket_id &lhs, const Socket_id &rhs)
Definition: node.cpp:1157
static Socket_id socket_id(Peer_socket::Const_ptr sock)
Constructs the socket pair (connection ID) for the given socket.
Udp_socket m_low_lvl_sock
The UDP socket used to receive low-level packets (to assemble into application layer data) and send t...
Definition: node.hpp:3753
void handle_syn_ack_to_syn_sent(const Socket_id &socket_id, Peer_socket::Ptr sock, boost::shared_ptr< const Syn_ack_packet > syn_ack)
Handles a just-deserialized, just-demultiplexed low-level SYN_ACK packet delivered to the given peer ...
static const size_t & S_NUM_SERVICE_PORTS
Total number of Flow "service" ports (ones that can be reserved by number with Node::listen()).
Definition: node.hpp:945
static const flow_port_t & S_FIRST_EPHEMERAL_PORT
The port number of the lowest ephemeral Flow port, making the range of ephemeral ports [S_FIRST_EPHEM...
Definition: node.hpp:963
boost::promise< Error_code > m_event_loop_ready
Promise that thread W sets to truthy Error_code if it fails to initialize or falsy once event loop is...
Definition: node.hpp:3880
log::Logger * this_thread_init_logger_setup(const std::string &thread_type, log::Logger *logger=0)
Helper to invoke for each thread in which this Node executes, whether or not it starts that thread,...
Definition: node.cpp:113
Peer_socket::Ptr handle_syn_to_listening_server(Server_socket::Ptr serv, boost::shared_ptr< const Syn_packet > syn, const util::Udp_endpoint &low_lvl_remote_endpoint)
Handles a just-deserialized, just-demultiplexed low-level SYN packet delivered to the given server so...
bool running() const
Returns true if and only if the Node is operating.
Definition: node.cpp:420
Port_to_server_map m_servs
The server sockets this Node is currently tracking.
Definition: node.hpp:3798
const Node_options & validate_options(const Node_options &opts, bool init, Error_code *err_code) const
Given a new set of Node_options intended to replace (or initialize) a Node's m_opts,...
Definition: node.cpp:980
void handle_ack_to_established(Peer_socket::Ptr sock, boost::shared_ptr< const Ack_packet > ack)
Handles a just-deserialized, just-demultiplexed, low-level ACK packet delivered to the given peer soc...
void handle_syn_ack_to_established(Peer_socket::Ptr sock, boost::shared_ptr< const Syn_ack_packet > syn_ack)
Handles a just-deserialized, just-demultiplexed, duplicate (equal to already-received SYN_ACK) low-le...
Signal_set m_signal_set
Signal set which we may or may not be using to trap SIGINT and SIGTERM in order to auto-fire interrup...
Definition: node.hpp:3890
void handle_data_to_syn_rcvd(Peer_socket::Ptr sock, boost::shared_ptr< Data_packet > packet)
Handles a just-deserialized, just-demultiplexed, low-level DATA packet delivered to the given peer so...
Node(log::Logger *logger, const util::Udp_endpoint &low_lvl_endpoint, Net_env_simulator *net_env_sim=0, Error_code *err_code=0, const Node_options &opts=Node_options())
Constructs Node.
Definition: node.cpp:40
void async_sock_low_lvl_rst_send(Peer_socket::Ptr sock)
Sends an RST to the other side of the given socket asynchronously when possible.
Node_options m_opts
This Node's global set of options.
Definition: node.hpp:3704
void async_low_lvl_recv()
Registers so that during the current or next m_task_engine.run(), the latter will wait for a receivab...
Definition: low_lvl_io.cpp:31
void close_connection_immediately(const Socket_id &socket_id, Peer_socket::Ptr sock, const Error_code &err_code, bool defer_delta_check)
A thread W method that handles the transition of the given socket from OPEN (any sub-state) to CLOSED...
void close_empty_server_immediately(const flow_port_t local_port, Server_socket::Ptr serv, const Error_code &err_code, bool defer_delta_check)
Handles the transition of the given server socket from S_LISTENING/S_CLOSING to S_CLOSED (including e...
Event_sets m_event_sets
Every Event_set to have been returned by event_set_create() and not subsequently reached Event_set::S...
Definition: node.hpp:3804
void interrupt_all_waits_internal_sig_handler(const Error_code &sys_err_code, int sig_number)
signal_set handler, executed on SIGINT and SIGTERM, if user has enabled this feature: causes interrup...
Definition: event_set.cpp:1456
void perform_regular_infrequent_tasks(bool reschedule)
Performs low-priority tasks that should be run on an infrequent, regular basis, such as stat logging ...
Definition: node.cpp:1117
util::Udp_endpoint m_low_lvl_endpoint
After we bind m_low_lvl_sock to a UDP endpoint, this is a copy of that endpoint.
Definition: node.hpp:3764
util::Task_engine m_task_engine
The main loop engine, functioning in the single-threaded-but-asynchronous callback-based "reactor" st...
Definition: node.hpp:3739
Int_state
The state of the socket (and the connection from this end's point of view) for the internal state mac...
@ S_ESTABLISHED
Public state is OPEN+CONNECTED; in our opinion the connection is established.
@ S_SYN_SENT
Public state is OPEN+CONNECTING; user requested active connect; we sent SYN and are awaiting response...
@ S_CLOSED
Closed (dead or new) socket.
@ S_SYN_RCVD
Public state is OPEN+CONNECTING; other side requested passive connect via SYN; we sent SYN_ACK and ar...
static const size_t S_NUM_EPHEMERAL_PORTS
Total number of "ephemeral" ports (ones reserved at random with reserve_ephemeral_port()).
Definition: port_space.hpp:105
static const flow_port_t S_FIRST_SERVICE_PORT
The port number of the lowest service port.
Definition: port_space.hpp:108
static const flow_port_t S_FIRST_EPHEMERAL_PORT
The port number of the lowest ephemeral port.
Definition: port_space.hpp:111
static const size_t S_NUM_SERVICE_PORTS
Total number of "service" ports (ones that can be reserved by number with reserve_port()).
Definition: port_space.hpp:102
static const size_t S_NUM_PORTS
Total number of ports in the port space, including S_PORT_ANY.
Definition: port_space.hpp:99
State
State of a Server_socket.
@ S_LISTENING
Future or current accept()s may be possible. A socket in this state may be Acceptable.
boost::shared_ptr< Event_set > Ptr
Short-hand for ref-counted pointer to mutable values of type Target_type::element_type (a-la T*).
#define FLOW_ERROR_SYS_ERROR_LOG_WARNING()
Logs a warning about the (often errno-based or from a library) error code in sys_err_code.
Definition: error.hpp:269
#define FLOW_ERROR_EXEC_AND_THROW_ON_ERROR(ARG_ret_type, ARG_method_name,...)
Narrow-use macro that implements the error code/exception semantics expected of most public-facing Fl...
Definition: error.hpp:357
#define FLOW_ERROR_EMIT_ERROR(ARG_val)
Sets *err_code to ARG_val and logs a warning about the error using FLOW_LOG_WARNING().
Definition: error.hpp:202
#define FLOW_LOG_INFO(ARG_stream_fragment)
Logs an INFO message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
Definition: log.hpp:197
#define FLOW_LOG_WARNING(ARG_stream_fragment)
Logs a WARNING message into flow::log::Logger *get_logger() with flow::log::Component get_log_compone...
Definition: log.hpp:152
#define FLOW_LOG_TRACE(ARG_stream_fragment)
Logs a TRACE message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
Definition: log.hpp:227
void beautify_chrono_logger_this_thread(Logger *logger_ptr)
Sets certain chrono-related formatting on the given Logger in the current thread that results in a co...
Definition: log.cpp:278
@ S_CONN_RESET_BY_OTHER_SIDE
Other side reset an established connection.
@ S_NODE_SHUTTING_DOWN
Node shutting down.
@ S_OPTION_CHECK_FAILED
When setting options, at least one option's value violates a required condition on that option.
@ S_CONN_REFUSED
Other side refused connection.
@ S_CONN_RESET_BAD_PEER_BEHAVIOR
Connection reset because of unexpected/illegal behavior by the other side.
@ S_NODE_NOT_RUNNING
Node not running.
Flow module containing the API and implementation of the Flow network protocol, a TCP-inspired stream...
Definition: node.cpp:25
uint16_t flow_port_t
Logical Flow port type (analogous to a UDP/TCP port in spirit but in no way relevant to UDP/TCP).
const flow_port_t S_PORT_ANY
Special Flow port value used to indicate "invalid port" or "please pick a random available ephemeral ...
Definition: port_space.cpp:33
Scheduled_task_handle schedule_task_from_now(log::Logger *logger_ptr, const Fine_duration &from_now, bool single_threaded, Task_engine *task_engine, Scheduled_task_handler &&task_body_moved)
Schedule the given function to execute in a certain amount of time: A handy wrapper around Timer (asi...
Definition: sched_task.hpp:34
void ostream_op_to_string(std::string *target_str, T const &... ostream_args)
Writes to the specified string, as if the given arguments were each passed, via << in sequence,...
Definition: util.hpp:342
Blob_with_log_context<> Blob
A concrete Blob_with_log_context that compile-time-disables Basic_blob::share() and the sharing API d...
Definition: blob_fwd.hpp:60
boost::asio::ip::udp::endpoint Udp_endpoint
Short-hand for the UDP endpoint (IP/port) type.
Definition: util_fwd.hpp:208
boost::system::error_code Error_code
Short-hand for a boost.system error code (which basically encapsulates an integer/enum error code and...
Definition: common.hpp:502
Flow_log_component
The flow::log::Component payload enumeration comprising various log components used by Flow's own int...
Definition: common.hpp:632
Fine_clock::duration Fine_duration
A high-res time duration as computed from two Fine_time_pts.
Definition: common.hpp:410
boost::shared_ptr< Individual_ack > Ptr
Short-hand for ref-counted pointer to mutable objects of this class.
Internal net_flow struct that encapsulates the Flow-protocol low-level ACK packet.
Internal net_flow struct that encapsulates the Flow-protocol low-level DATA packet.
static Ptr create_from_raw_data_packet(log::Logger *logger_ptr, util::Blob *raw_packet, bool prefer_no_move)
Constructs packet on the heap with values determined by the given raw binary data as presumably recei...
The data nugget uniquely identifying a peer-to-peer connection from a remote endpoint to a port in th...
Definition: node.hpp:3904
const Remote_endpoint m_remote_endpoint
The other side of the connection.
Definition: node.hpp:3908
size_t hash() const
Hash value of this Socket_id for unordered<>.
Definition: node.cpp:1147
const flow_port_t m_local_port
This side of the connection (within this Node).
Definition: node.hpp:3910
A set of low-level options affecting a single Flow Node, including Peer_socket objects and other obje...
Definition: options.hpp:449
Peer_socket_options m_dyn_sock_opts
The set of per-Peer_socket options in this per-Node set of options.
Definition: options.hpp:580
size_t m_st_low_lvl_max_buf_size
The max size to ask the OS to set our UDP socket's receive buffer to in order to minimize loss if we ...
Definition: options.hpp:525
size_t m_dyn_low_lvl_max_packet_size
Any incoming low-level (UDP) packet will be truncated to this size.
Definition: options.hpp:553
Fine_duration m_st_timer_min_period
A time period such that the boost.asio timer implementation for this platform is able to accurately a...
Definition: options.hpp:534
bool m_dyn_guarantee_one_low_lvl_in_buf_per_socket
This very inside-baseball setting controls the allocation/copy behavior of the UDP receive-deserializ...
Definition: options.hpp:568
bool m_st_capture_interrupt_signals_internally
If and only if this is true, the Node will detect SIGINT and SIGTERM (or your OS's version thereof); ...
Definition: options.hpp:517
size_t m_st_max_block_size
The size of block that we will strive to (and will, assuming at least that many bytes are available i...
Definition: options.hpp:114
Internal net_flow struct that encapsulates the Flow-protocol low-level RST packet.
Internal net_flow struct that encapsulates the Flow-protocol low-level SYN_ACK_ACK packet.
Internal net_flow struct that encapsulates the Flow-protocol low-level SYN_ACK packet.
Internal net_flow struct that encapsulates the Flow-protocol low-level SYN packet.
#define FLOW_UTIL_WHERE_AM_I_STR()
Same as FLOW_UTIL_WHERE_AM_I() but evaluates to an std::string.
Definition: util_fwd.hpp:971