Flow 2.0.0
Flow project: Full implementation reference.
server_socket.cpp
Go to the documentation of this file.
1/* Flow
2 * Copyright 2023 Akamai Technologies, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the
5 * "License"); you may not use this file except in
6 * compliance with the License. You may obtain a copy
7 * of the License at
8 *
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in
12 * writing, software distributed under the License is
13 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
14 * CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing
16 * permissions and limitations under the License. */
17
18/// @file
23#include "flow/async/util.hpp"
24#include <boost/random/random_device.hpp>
25
26namespace flow::net_flow
27{
28
29// Server_socket implementation.
30
31Server_socket::Server_socket(log::Logger* logger_ptr, const Peer_socket_options* child_sock_opts) :
32 Log_context(logger_ptr, Flow_log_component::S_NET_FLOW),
33 /* If they supplied a Peer_socket_options, store a copy of it. When new Peer_sockets are made
34 * (when people connect to us), each peer socket's per-socket options will be copies of this. If
35 * they did not supply a Peer_socket_options, the Node's global Peer_socket_options will be used
36 * for each subsequent Peer_socket. */
37 m_child_sock_opts(child_sock_opts ? new Peer_socket_options{*child_sock_opts} : nullptr),
38 m_state(State::S_CLOSED), // Incorrect; set explicitly.
39 m_node(nullptr), // Incorrect; set explicitly.
40 m_local_port(S_PORT_ANY), // Incorrect; set explicitly.
41 m_backlog_limit(0) // Incorrect; set explicitly.
42{
43 // Only print pointer value, because most members are garbage at this point.
44 FLOW_LOG_TRACE("Server_socket [" << static_cast<void*>(this) << "] created.");
45}
46
48{
49 delete m_child_sock_opts; // May be null (that's okay).
50
51 FLOW_LOG_TRACE("Server_socket [" << this << "] destroyed.");
52}
53
55{
56 Lock_guard lock{m_mutex}; // State is liable to change at any time.
57 return m_state;
58}
59
61{
62 Lock_guard lock{m_mutex}; // m_node can simultaneously change to 0 if state changes to S_CLOSED.
63 return m_node;
64}
65
67{
68 Lock_guard lock{m_mutex};
69 return m_disconnect_cause;
70}
71
73{
74 return m_local_port; // No need to lock (it never changes).
75}
76
78{
80 // ^-- Call ourselves and return if err_code is null. If got to present line, err_code is not null.
81
82 // We are in user thread U != W.
83
84 Lock_guard lock{m_mutex};
85
86 const Ptr serv = shared_from_this();
87 if (!Node::ensure_sock_open(serv, err_code)) // Ensure it's open, so that we can access m_node.
88 {
89 return Peer_socket::Ptr{};
90 }
91 // else m_node is valid.
92
93 // Forward the rest of the logic to Node, as is the general convention especially for logic affecting outside *this.
94 return m_node->accept(serv, err_code);
95} // Server_socket::accept()
96
98{
99 return sync_accept_impl(Fine_time_pt{}, reactor_pattern, err_code);
100}
101
102Peer_socket::Ptr Server_socket::sync_accept_impl(const Fine_time_pt& wait_until, bool reactor_pattern,
103 Error_code* err_code)
104{
105 using boost::adopt_lock;
106
108 // ^-- Call ourselves and return if err_code is null. If got to present line, err_code is not null.
109
110 // We are in user thread U != W.
111
112 Lock_guard lock{m_mutex};
113
114 const Ptr serv = shared_from_this();
115 if (!Node::ensure_sock_open(serv, err_code)) // Ensure it's open, so that we can access m_node.
116 {
117 return Peer_socket::Ptr{};
118 }
119 // else m_node is valid.
120
121 lock.release(); // Release lock (mutex is still LOCKED). sync_op() takes over holding the lock and unlocking.
122
123 // See comment in Peer_socket::node_sync_send().
124
125 /* Operating on Server_sockets, returning Peer_socket::Ptr; Event_set socket set type is
126 * Server_sockets.
127 * Object is serv; non-blocking operation is m_node->accept(...) -- or N/A in "reactor pattern" mode..
128 * Peer_socket::Ptr{} is the "would-block" return value for this operation.
129 * S_SERVER_SOCKET_ACCEPTABLE is the type of event to watch for here. */
130 return m_node
132 (serv,
133 reactor_pattern
135 : Function<Peer_socket::Ptr ()>([this, serv, err_code]() -> Peer_socket::Ptr
136 { return m_node->accept(serv, err_code); }),
138 wait_until, err_code);
139} // Server_socket::sync_accept_impl()
140
141// Node implementations (methods dealing with individual Server_sockets).
142
144 const Peer_socket_options* child_sock_opts)
145{
146 FLOW_ERROR_EXEC_AND_THROW_ON_ERROR(Server_socket::Ptr, listen, local_port, _1, child_sock_opts);
147 // ^-- Call ourselves and return if err_code is null. If got to present line, err_code is not null.
148
151 using boost::promise;
152 using boost::unique_future;
153
154 // We are in thread U != W.
155
156 if (!running())
157 {
159 return Server_socket::Ptr{};
160 }
161 // else
162
163 /* Basically we now need to do the following.
164 * -1- Reserve local_port in m_ports.
165 * -2- Create Server_socket serv.
166 * -3- Save serv in m_servs.
167 * (Both -1- and -3- can result in error.)
168 *
169 * -2- must be done in thread W, as m_servs is by design only to be accessed there. -1-, however,
170 * can be done in thread U, assuming m_ports is properly synchronized. So here are our choices:
171 *
172 * I. Perform -1- in U. If error, return. Perform -2-. Post callback to do -3- in W. Return
173 * serv in U. Meanwhile -2- will, at some point, be performed in W. If error in -3-, save it
174 * in serv. User will discover when trying to serv->accept().
175 *
176 * II. Perform -1- in U. If error, return. Perform -2-. Post callback to do -3- in W. Use a
177 * future/promise pair to wait for -3- to complete. Meanwhile -3- will, at some point, be performed
178 * in W. If error in -3-, save it in serv. Either way, U will wait for the result and return
179 * error or serv to the user.
180 *
181 * III. Post callback to do -1-, -2-, -3- in W. Use a future/promise pair to wait for -1-3- to complete.
182 * Meanwhile -1-3- will, at some point, be performed in W. If error in -1-3-, save it in
183 * serv. Either way, U will wait. If error, return error to the user. Else return serv to
184 * the user.
185 *
186 * Let's pick one. III > II due to simpler code; future used either way but most code is in one
187 * thread and one function (the W callback); the U part is a short wrapper. Also, Port_space need
188 * not be synchronized. So it's I vs. III. Advantage of I is speed; listen() will return in U
189 * faster, especially if thread W is loaded with work. (However III is still non-blocking, i.e.,
190 * no waiting for network events.) Advantage of III is simplicity of code (everything in one
191 * thread and callback, in W; no explicit locking); and any pre-networking error is immediately
192 * returned by listen() instead of being discovered later. I believe here III wins, especially
193 * because listen() should be fairly infrequent, so a split second speed difference should not be
194 * significant.
195 *
196 * So we choose III. We set up connect_worker() to run in W and wait for
197 * it to succeed or fail. asio_exec_ctx_post() does the promise/future stuff, or equivalent, so
198 * the code is really simple. */
199
200 // Load this onto thread W boost.asio work queue. We won't return until it's done, so [&] is OK.
202 asio_exec_ctx_post(get_logger(), &m_task_engine, Synchronicity::S_ASYNC_AND_AWAIT_CONCURRENT_COMPLETION,
203 [&]() { listen_worker(local_port, child_sock_opts, &serv); });
204 // If got here, the task has completed in thread W and signaled us to that effect.
205
206 // listen_worker() indicates success or failure through this data member.
207 if (serv->m_disconnect_cause)
208 {
209 *err_code = serv->m_disconnect_cause;
210 return Server_socket::Ptr{}; // serv will go out of scope and thus will be destroyed.
211 }
212 // else
213 err_code->clear();
214 return serv;
215} // Node::listen()
216
217void Node::listen_worker(flow_port_t local_port, const Peer_socket_options* child_sock_opts,
218 Server_socket::Ptr* serv_ptr)
219{
220 assert(serv_ptr);
221
222 // We are in thread W. listen() is waiting for us to set serv_promise in thread U.
223
224 // Create new socket and set all members that may be immediately accessed by user from thread U after we're done.
225
226 auto& serv = *serv_ptr;
227 if (child_sock_opts)
228 {
229 /* They provided custom per-socket options to distribute to any Peer_sockets created when people
230 * connect to this server. Before we give those to the new server socket, let's validate them
231 * (for proper values and internal consistency, etc.). */
232
233 Error_code err_code;
234 const bool opts_ok = sock_validate_options(*child_sock_opts, nullptr, &err_code);
235
236 // Due to the advertised interface of the current method, we must create a socket even on error.
237 serv.reset(serv_create(child_sock_opts));
238
239 // Now report error if indeed options were invalid. err_code is already set and logged in that case.
240 if (!opts_ok)
241 {
242 serv->m_disconnect_cause = err_code;
243 return;
244 }
245 // else
246 }
247 else
248 {
249 /* More typically, they did not provide per-socket options. So just pass null pointer into
250 * Peer_socket constructor; this will mean that when a Peer_socket is generated on connection,
251 * the code is to provide a copy of the global template for the per-socket options. That will
252 * happen later; we just pass in null. */
253 serv.reset(serv_create(nullptr));
254 }
255
256 // Server socket created; set members.
257
258 serv->m_node = this;
260
261 // Allocate given port.
262 serv->m_local_port = m_ports.reserve_port(local_port, &serv->m_disconnect_cause);
263 if (serv->m_local_port == S_PORT_ANY)
264 {
265 // Error already logged and is in serv->m_disconnect_cause.
266 return;
267 }
268 // else
269 local_port = serv->m_local_port; // If they'd specified S_PORT_ANY, this is now a random port.
270
271 /* Save backlog limit. Note it is dynamic at the Node level (thus future listen()s can set different values here),
272 * but that does not (in and of itself anyway) mean it can be subsequently changed for the `serv` we are returning. */
273 serv->m_backlog_limit = opt(m_opts.m_dyn_accept_backlog_limit);
274
275 FLOW_LOG_INFO("NetFlow worker thread listening for passive-connects on [" << serv << "]; "
276 "backlog limit [" << serv->m_backlog_limit << "].");
277
278 if (util::key_exists(m_servs, local_port))
279 {
280 /* This is a passive connect (we're accepting future connections). Therefore in particular it
281 * should be impossible that our local_port() equals an already existing connection's
282 * local_port(); Port_space is supposed to prevent the same port from being handed out to more
283 * than one connection. Therefore this must be a programming error. */
284
285 FLOW_LOG_WARNING("Cannot set up [" << serv << "], because server at port [" << local_port << "] already exists! "
286 "This is a port reservation error and constitutes either a bug or an extremely "
287 "unlikely condition.");
288
289 // Mark/log error.
290 Error_code* err_code = &serv->m_disconnect_cause;
292
293 // Return port.
294 Error_code return_err_code;
295 m_ports.return_port(serv->m_local_port, &return_err_code);
296 assert(!return_err_code);
297 return;
298 } // if (that server entry already exists)
299 // else
300
301 m_servs[local_port] = serv; // Now SYNs will be accepted.
302} // Node::listen_worker()
303
305{
306 // We are in user thread U != W.
307
308 // IMPORTANT: The logic here must be consistent with serv_is_acceptable().
309
310 // serv->m_mutex already locked.
311
312 if (serv->m_state == Server_socket::State::S_CLOSING)
313 {
314 /* This is the same to the user as CLOSED -- only different in that serv has not been disowned by the Node yet.
315 * See rationale for this in the accept() documentation header. */
316 FLOW_ERROR_EMIT_ERROR_LOG_INFO(serv->m_disconnect_cause);
317
318 // Not listening anymore; pretend nothing on queue.
319 return Peer_socket::Ptr{};
320 }
321 // else
322 assert(serv->m_state == Server_socket::State::S_LISTENING);
323
324 if (serv->m_unaccepted_socks.empty())
325 {
326 // Nothing on the queue. As advertised, this is not an error in LISTENING state.
327 err_code->clear();
328 return Peer_socket::Ptr{};
329 }
330 // else
331
332 // Pop from queue. Linked_hash_set queues things up at the front (via insert()), so pop from the back.
333 const auto it = --serv->m_unaccepted_socks.cend();
334 Peer_socket::Ptr sock = *it;
335 serv->m_unaccepted_socks.erase(it);
336
337 /* Now that it's accepted, remove reference to the server socket, so that when the server socket
338 * is closed, sock is not closed (since it's a fully functioning independent socket now). */
339 sock->m_originating_serv.reset(); // This is synchronized together with m_unaccepted_socks.
340
341 FLOW_LOG_INFO("Connection [" << sock << "] on [" << serv << "] accepted.");
342
343 err_code->clear();
344 return sock;
345} // Node::accept()
346
347bool Node::serv_is_acceptable(const boost::any& serv_as_any) const
348{
349 using boost::any_cast;
350
351 const Server_socket::Const_ptr serv = any_cast<Server_socket::Ptr>(serv_as_any);
352
353 Peer_socket::Lock_guard lock{serv->m_mutex}; // Many threads can access/write below state.
354
355 /* Our task here is to return true if and only if at this very moment calling serv->accept()would
356 * yield either a non-null return value OR a non-success *err_code. In other words,
357 * accept() would return "something." This is used for Event_set machinery.
358 *
359 * This should mirror accept()'s algorithm. @todo Should accept() call this, for code reuse?
360 * Maybe/maybe not. Consider performance when deciding.
361 *
362 * Basically, CLOSING/CLOSED => error (Acceptable); LISTENING + non-empty accept queue =>
363 * returns a socket (Acceptable); LISTENING + empty queue => returns null and no error (not
364 * Acceptable). So the latter is the only way (though quite common) it can be NOT Acceptable. */
365
366 return !((serv->m_state == Server_socket::State::S_LISTENING) && serv->m_unaccepted_socks.empty());
367} // Node::serv_is_acceptable()
368
370 const Error_code& err_code, bool defer_delta_check)
371{
372 // We are in thread W.
373
374 // Check explicitly documented pre-conditions.
375
376 assert(serv->m_state != Server_socket::State::S_CLOSED);
377 // Caller should have closed all the associated sockets already.
378 assert(serv->m_connecting_socks.empty());
379 {
380 Server_socket::Lock_guard lock{serv->m_mutex}; // At least m_unaccepted_socks can be accessed by user.
381 assert(serv->m_unaccepted_socks.empty());
382 }
383
384 FLOW_ERROR_LOG_ERROR(err_code);
385 FLOW_LOG_INFO("Closing and destroying [" << serv << "].");
386
387 serv_close_detected(serv, err_code, true); // Sets S_CLOSED public state (and related data).
388
389 // Next, remove serv from our main server list.
390
391#ifndef NDEBUG
392 const bool erased = 1 ==
393#endif
394 m_servs.erase(local_port);
395 assert(erased); // Not S_CLOSED => it's in m_servs. Otherwise there's a serious bug somewhere.
396
397 // Return the port.
398 Error_code return_err_code;
399 m_ports.return_port(local_port, &return_err_code);
400 assert(!return_err_code);
401
402 /* serv has changed to CLOSED state. Performing serv->accept() would therefore
403 * certainly return an error. Returning an error from that method (as opposed to null but no
404 * error) is considered Acceptable (as we want to alert the user to the error, so her wait [if
405 * any] wakes up and notices the error). Therefore we should soon inform anyone waiting on any
406 * Event_sets for serv to become Acceptable
407 *
408 * Caveat: Similar to that in Node::handle_syn_ack_ack_to_syn_rcvd() at similar point in the
409 * code. */
410
411 // Accumulate the event into the Node store (note: not any Event_set yet).
413 {
414 // Possibly inform the user for any applicable Event_sets right now.
415 event_set_all_check_delta(defer_delta_check);
416 }
417} // Node::close_empty_server_immediately()
418
420{
421 Server_socket::Lock_guard lock{serv->m_mutex};
422
423 // @todo Add TRACE logging.
424
425 serv->m_state = state;
427 {
428 /* Important convention: S_CLOSED means socket is permanently incapable of accepting more
429 * connections. At this point the originating Node removes the
430 * socket from its internal structures. Therefore, the Node itself may even go away -- while
431 * this Server_socket still exists. Since we use shared_ptr when giving our socket objects,
432 * that's fine -- but we want to avoid returning an invalid Node* in node(). So, when
433 * S_CLOSED, serv->m_node = nullptr. */
434 serv->m_node = nullptr;
435 }
436}
437
439 boost::shared_ptr<const Syn_packet> syn,
440 const util::Udp_endpoint& low_lvl_remote_endpoint)
441{
442 using util::Blob;
443 using boost::random::random_device;
444 using security_token_t = Peer_socket::security_token_t;
445
446 // We are in thread W.
447
448 /* We just got SYN (an overture from the other side). Create a peer-to-peer socket to track that
449 * connection being established. Though, if we are at the backlog limit, then there's no point: reject immediately
450 * without even temporarily taking the memory for the Peer_socket. */
451 {
452 const auto backlog_sz = serv->m_unaccepted_socks.size() + serv->m_connecting_socks.size();
453 if (backlog_sz >= static_cast<size_t>(serv->m_backlog_limit))
454 // As of this writing `==` is sufficient, but just in case it becomes mutable someday: use `>=`.
455 {
456 /* (Let's not use INFO or WARNING here; if there's a SYN-flood going on then no need to fill up the logs.
457 * After all it's not *that* interesting of a message, on balance. One could argue that logging rate-limiting
458 * is the `Logger`'s job -- and indeed it is -- but in this case we can defensibly avoid this
459 * difficulty altogether.) */
460 FLOW_LOG_TRACE("NetFlow worker thread, on receipt of [" << syn->m_type_ostream_manip << "] was about to "
461 "start passive-connect on [" << serv << "], but the backlog would then exceed the "
462 "limit [" << serv->m_backlog_limit << "]; resetting connection.");
464 return Peer_socket::Ptr{};
465 }
466 // else OK; do create that peer-to-peer socket.
467 }
468
469 Peer_socket::Ptr sock;
470 if (serv->m_child_sock_opts)
471 {
472 /* They provided custom per-socket options in listen(), and we've been storing them in *serv.
473 * They're already validated at listen() time, so we just give them to Peer_socket constructor,
474 * which copies them. */
475 sock.reset(sock_create(*serv->m_child_sock_opts));
476 }
477 else
478 {
479 /* More typically, they did not provide per-socket options. So we just pass our global template
480 * for the per-socket options to the Peer_socket constructor. The only caveat is that template
481 * may be concurrently changed, so we must lock it. Could do it with opt(), but that introduces
482 * an extra copy of the entire struct, so just do it explicitly (read-only lock for
483 * performance).
484 *
485 * Note: no need to validate; global options (including per-socket ones) are validated
486 * elsewhere when set. */
489 }
490
491 // Socket created; set members.
492
493 sock->m_active_connect = false;
494 sock->m_node = this;
496 sock->m_remote_endpoint = Remote_endpoint{ low_lvl_remote_endpoint, syn->m_packed.m_src_port };
497 sock->m_local_port = serv->m_local_port;
498 // Save it for user to be able to call sock->get_connect_metadata(). Add const to express we want copy, not move.
499 sock->m_serialized_metadata = static_cast<const Blob&>(syn->m_serialized_metadata);
500 sock->m_int_state = Peer_socket::Int_state::S_CLOSED; // Kind of pedantic. We'll set SYN_RCVD a bit later on.
501 // Save the start of the sequence number series based on their initial sequence number.
502 sock->m_rcv_init_seq_num = syn->m_init_seq_num;
503 sock->m_rcv_next_seq_num = sock->m_rcv_init_seq_num + 1;
504
505 /* Initialize the connection's send bandwidth estimator (object that estimates available
506 * outgoing bandwidth based on incoming acknowledgments). It may be used by m_snd_cong_ctl,
507 * depending on the strategy chosen, but may be useful in its own right. Hence it's a separate
508 * object, not inside *m_snd_cong_ctl. */
509 sock->m_snd_bandwidth_estimator.reset(new Send_bandwidth_estimator{get_logger(), sock});
510
511 // Initialize the connection's congestion control strategy based on the configured strategy.
512 sock->m_snd_cong_ctl.reset
513 (Congestion_control_selector::create_strategy(sock->m_opts.m_st_cong_ctl_strategy, get_logger(), sock));
514 // ^-- No need to use opt() yet: user doesn't have socket and cannot set_options() on it yet.
515
516 const Socket_id& socket_id = Node::socket_id(sock);
517 FLOW_LOG_INFO("NetFlow worker thread starting passive-connect of [" << sock << "] on [" << serv << "]. "
518 "Received [" << syn->m_type_ostream_manip << "] with ISN [" << syn->m_init_seq_num << "].");
519
520 // Ensure we can support the specified packet options.
521
522 if (syn->m_opt_rexmit_on != sock->rexmit_on())
523 {
524 FLOW_LOG_WARNING("NetFlow worker thread starting passive-connect of [" << sock << "] on [" << serv << "]. "
525 "Received [" << syn->m_type_ostream_manip << "] with "
526 "opt_rexmit_on [" << syn->m_opt_rexmit_on << "]; was configured otherwise on this side; "
527 "resetting connection.");
528 /* We'd inform the user here, but they didn't open the connection (it's a passive open, and they
529 * haven't yet called accept()). We can respond with RST, however, to tell the other side this
530 * connection isn't going to happen. We didn't place sock into m_socks, so just let it
531 * disappear via shared_ptr<> magic. */
533 return Peer_socket::Ptr{};
534 }
535 // else
536
537 // Ensure this socket pair does not yet exist in our peer-to-peer socket table.
538
540 {
541 /* This is a passive connect (they're intiating the connection). Therefore in particular it
542 * should be impossible that our local_port() equals an already existing connection's
543 * local_port(); Port_space is supposed to prevent the same ephemeral or service port from being
544 * handed out to more than one connection. Therefore this must be a programming error. */
545
546 FLOW_LOG_WARNING("Cannot add [" << sock << "], because such a connection already exists. "
547 "This is an ephemeral or service port collision and "
548 "constitutes either a bug or an extremely unlikely condition.");
549
550 // Same reasoning as above: send RST, and let sock disappear.
551 async_no_sock_low_lvl_rst_send(syn, low_lvl_remote_endpoint);
552 return Peer_socket::Ptr{};
553 } // if (that socket pair already exists)
554 // else
555
556 /* Try the packet send (as just below) again if SYN_ACK not acknowledged within a certain amount of
557 * time. Give up if that happens too many times.
558 * Follow same order of ops (schedule, send) as in the SYN case elsewhere. */
560
561 // Send SYN_ACK to continue the handshake. Save some *sock data first, as they are used in create_syn_ack().
562
563 /* Initial Sequence Number (the start of our own series).
564 * Remember it in case we must retransmit the SYN. (m_snd_next_seq_num may have been further increased by then.) */
565 Sequence_number& init_seq_num = sock->m_snd_init_seq_num;
567 // Same comment as when calling sock->m_snd_init_seq_num.set_metadata() elsewhere. See that.
568 init_seq_num.set_metadata('L',init_seq_num + 1, sock->max_block_size());
569 // Sequence number of first bit of actual data.
570 sock->m_snd_next_seq_num = init_seq_num + 1;
571
572 /* Security token. Random number from entire numeric range. Remember it for later verification.
573 * Use a CSPRNG (not the general-purpose mt19937-based Rnd_gen_uniform_range) because this token
574 * must be unpredictable to off-path attackers attempting to forge SYN_ACK_ACK packets. */
575 random_device rnd_dev; // Setting this up, in Linux at least, is microseconds per connection. No prob.
576 static_assert((random_device::min() == 0) && (random_device::max() == 0xFFFF'FFFF),
577 "The following statement assumes full 32-bit range of random_device.");
578 static_assert(sizeof(decltype(Peer_socket::m_security_token)) == (64 / 8),
579 "The following statement assumes 64-bit security tokens.");
580 sock->m_security_token = ((static_cast<security_token_t>(rnd_dev()) << 32)
581 |
582 static_cast<security_token_t>(rnd_dev()));
583
584 // Initial receive window is simply the entire empty Receive buffer.
585 sock->m_rcv_last_sent_rcv_wnd = sock_rcv_wnd(sock);
586
587 // Make a packet; fill out common fields in and asynchronously send it.
588 auto syn_ack = create_syn_ack(sock);
590
591 /* send will happen asynchronously, and the registered completion handler will execute in this
592 * thread when done (NO SOONER than this method finishes executing). */
593
594 // No more errors: Map socket pair to the socket data structure (kind of analogous to a TCP net-stack's TCB).
595 m_socks[socket_id] = sock;
596
597 // Also record it within the server socket (more comments inside this method).
598 serv_peer_socket_init(serv, sock);
599
600 // CLOSED -> SYN_RCVD.
602
603 return sock;
604} // Node::handle_syn_to_listening_server()
605
607 Peer_socket::Ptr sock,
608 boost::shared_ptr<const Syn_ack_ack_packet> syn_ack_ack)
609{
610 using boost::shared_ptr;
611
612 // We are in thread W.
613
614 /* We'd sent SYN_ACK and just got SYN_ACK_ACK. Assuming their SYN_ACK_ACK is valid, our side of
615 * connection can move to ESTABLISHED state, as theirs already has. */
616
617 FLOW_LOG_INFO("NetFlow worker thread continuing passive-connect of socket [" << sock << "]. "
618 "Received [" << syn_ack_ack->m_type_ostream_manip << "]; "
619 "security token [" << syn_ack_ack->m_packed.m_security_token << "].");
620
621 // First, validate their security token equals the one we've sent.
622 if (sock->m_security_token != syn_ack_ack->m_packed.m_security_token)
623 {
624 FLOW_LOG_WARNING("Received [" << syn_ack_ack->m_type_ostream_manip << "] targeted at state "
625 "[" << Peer_socket::Int_state::S_SYN_RCVD << "] socket [" << sock << "] "
626 "with mismatching security token "
627 "[" << syn_ack_ack->m_packed.m_security_token << "]; we had received and sent and expect "
628 "[" << sock->m_security_token << "]. Closing.");
629 /* Close connection in our structures (inform user if necessary as well). Pre-conditions
630 * assumed by call: sock in m_socks and sock->state() == S_OPEN (yes, since m_int_state ==
631 * S_SYN_RCVD); 3rd arg contains the reason for the close (yes). */
633 // ^-- defer_delta_check == true: for similar reason as at the end of this method.
634 return;
635 }
636 // else OK.
637
638 // No more errors.
639
640 // Move ourselves to connected state.
641
642 // The server socket to which the other side sent SYN to create the peer socket sock.
643 const Server_socket::Ptr serv = sock->m_originating_serv;
644
645 // Public state (thread-safe).
647 // Internal state. SYN_RCVD -> ESTABLISHED.
649
650 // Got the acknowledgment to SYN_ACK, so cancel retransmits and the timeout for that SYN_ACK.
651 cancel_timers(sock);
652
653 // Setup the Drop Timeout engine (m_snd_drop_timer).
655
656 // Add the peer socket to the server socket's accept queue (thread-safe)! accept() will return this.
657 serv_peer_socket_acceptable(serv, sock);
658 // BTW serv->m_originating_serv is now null.
659
660 // Record initial rcv_wnd; it should be the entire size of the other side's Receive buffer.
661 sock->m_snd_remote_rcv_wnd = syn_ack_ack->m_packed.m_rcv_wnd;
662
663 /* We may have queued up some DATA packets while we were SYN_RCVD (due to loss and/or
664 * re-ordering). See handle_data_to_syn_rcvd() for more information. So, handle the queued DATA
665 * packets as if they'd just arrived. */
666 for (shared_ptr<Data_packet> qd_packet : sock->m_rcv_syn_rcvd_data_q)
667 {
668 auto const logger_ptr = get_logger();
669 if (logger_ptr && logger_ptr->should_log(log::Sev::S_TRACE, get_log_component()))
670 {
672 ("Handling [" << qd_packet->m_type_ostream_manip << "] packet "
673 "received/queued in [" << Peer_socket::Int_state::S_SYN_RCVD << "] state; "
674 "packet data size = [" << qd_packet->m_data.size() << "].");
675
676 // Very verbose and CPU-intensive, especially DATA version!
677 if (logger_ptr->should_log(log::Sev::S_DATA, get_log_component()))
678 {
679 FLOW_LOG_DATA_WITHOUT_CHECKING("Readable representation is: "
680 "[\n" << qd_packet->m_verbose_ostream_manip << "].");
681 }
682 else
683 {
684 FLOW_LOG_TRACE_WITHOUT_CHECKING("Readable representation is: "
685 "[\n" << qd_packet->m_concise_ostream_manip << "].");
686 }
687 }
688
689 handle_data_to_established(socket_id, sock, qd_packet, true); // true <=> packet was queued during SYN_RCVD.
690 // qd_packet has probably been decimated for performance, so don't rely on qd_packet.m_data at this point!
691 }
692 if (!sock->m_rcv_syn_rcvd_data_q.empty())
693 {
694 FLOW_LOG_TRACE("Handled a total of [" << sock->m_rcv_syn_rcvd_data_q.size() << "] queued packets with "
695 "cumulative data size [" << sock->m_rcv_syn_rcvd_data_cumulative_size << "].");
696 }
697 sock->m_rcv_syn_rcvd_data_q.clear(); // Save memory.
698
699 /* Since we just added sock to serv's acceptable socket queue, certainly serv is now Acceptable.
700 * Therefore we should soon inform anyone waiting on any Event_sets for serv to become Acceptable.
701 *
702 * Caveat: The user could have called serv->accept() right after the previous statement in this
703 * method, which could indeed make serv not Acceptable again. That is OK. We only promise to
704 * inform the user of an event within a "non-blocking" amount of time of it occurring. If that
705 * same user decides to mess himself over by acting on these events prematurely, that is not our
706 * problem [assuming we don't crash things, which we do not]. Worst case is that the user will
707 * detect the event, try to accept() and get nothing [which is an eventuality for which any decent
708 * user code would prepare]. */
709
710 // Accumulate the event into the Node store (note: not any Event_set yet).
712 {
713 // Possibly inform the user for any applicable Event_sets right now.
715 /* ^-- defer_delta_check == true: because the only way to get to this method is from
716 * async_low_lvl_recv(), which will perform event_set_all_check_delta(false) at the end of itself,
717 * before the boost.asio handler exits. See Node::m_sock_events doc header for details. */
718 }
719
720 /* Do not m_sock_events[S_PEER_SOCKET_WRITABLE].insert(sock), as sock has not been accept()ed and
721 * therefore cannot be waited on currently. */
722} // Node::handle_syn_ack_ack_to_syn_rcvd()
723
725 boost::shared_ptr<Data_packet> packet)
726{
727 // We are in thread W.
728
729 /* We'd sent SYN_ACK, were waiting for SYN_ACK_ACK, but instead we got a DATA.
730 * This seems wrong at a first glance but can be legitimate. One possibility is they sent
731 * SYN_ACK_ACK and then some DATA, but the SYN_ACK_ACK was dropped (recall that the SYN_ACK_ACK is
732 * not itself acknowledged in our scheme). Another possibility is they sent both, but then DATA
733 * got re-ordered to in front of SYN_ACK_ACK.
734 *
735 * What does TCP do here? Well, it doesn't really have this problem, because every segment must
736 * have an ACK in it. So if a TCP gets data in SYN_RCVD, it must also contain the ACK to the
737 * SYN_ACK (what we call SYN_ACK_ACK) (in TCP, any packet without ACK is simply considered
738 * corrupt/invalid and would not be sent in the first place). So it's impossible to send data
739 * without acknowledging the SYN_ACK at the same time.
740 *
741 * For us, however ACK packets are independent of DATA packets, as are SYN_ACK_ACK packets.
742 * Therefore we should either drop these DATAs and hope whatever reliability implementation is
743 * used restores them later, or we should queue them for consumption when ESTABLISHED arrives.
744 * Let's do the latter. It's hard enough to deal with actual loss; introducing loss when we
745 * actually have the stuff seems absurd.
746 *
747 * So we just save them in a packet queue, and when we're ESTABLISHED we feed all the packets to
748 * handle_incoming() as if they'd just arrived. The only caveat is size of this queue. Since
749 * we have a maximum on the Receive buffer (sock->m_rcv_buf) and the packets-with-gaps structure
750 * (sock->m_rcv_packets_with_gaps), we must have one here as well. Since Receive buffer is
751 * empty until ESTABLISHED, it seems natural to limit this queue's cumulative byte size
752 * according to the limit imposed on Receive buffer. (There is some extra overhead to store the
753 * packet header info, but it's close enough.) After that, as when the Receive buffer fills up,
754 * we drop packets.
755 *
756 * Update (leaving preceding paragraph there for posterity): On 2nd thought, that is probably too
757 * generous given the possibility of SYN-flood-like attacks. Therefore we now use a separate option-knob to limit
758 * this queue's size. */
759
760 assert(sock->m_int_state == Peer_socket::Int_state::S_SYN_RCVD);
761 const bool first_time = sock->m_rcv_syn_rcvd_data_q.empty();
762
763 // Not a WARNING, because we didn't do anything wrong; could be network conditions; and avoid verbosity after 1st one.
765 "NetFlow worker thread received [" << packet->m_type_ostream_manip << "] packet while "
766 "in [" << Peer_socket::Int_state::S_SYN_RCVD << "] state for [" << sock << "]; "
767 "saving for processing later when in [" << Peer_socket::Int_state::S_ESTABLISHED << "] "
768 "state; packet data size = [" << packet->m_data.size() << "]; "
769 "first time? = [" << first_time << "].");
770
771 if (first_time)
772 {
773 sock->m_rcv_syn_rcvd_data_cumulative_size = 0; // It's garbage at the moment.
774 }
775 else if ((sock->m_rcv_syn_rcvd_data_cumulative_size + packet->m_data.size())
776 > sock->opt(sock->m_opts.m_st_rcv_sync_rcvd_data_q_cumulative_max_size))
777 {
778 /* Not a WARNING, because we didn't do anything wrong; could be network conditions.
779 * Not INFO either: a SYN-flood-like (DATA-flood?) attempt may not overfill the logs, given the limit, but generally
780 * logging on a per-packet basis (for one connection/connection-to-be) is not great.
781 * @todo An INFO-log the first time this happens would be nice. */
782 FLOW_LOG_TRACE("NetFlow worker thread received [" << packet->m_type_ostream_manip << "] packet while "
783 "in [" << Peer_socket::Int_state::S_SYN_RCVD << "] state for [" << sock << "]; "
784 "dropping because Receive queue full at [" << sock->m_rcv_syn_rcvd_data_cumulative_size << "].");
785 return;
786 }
787 // else
788
789 sock->m_rcv_syn_rcvd_data_cumulative_size += packet->m_data.size();
790 sock->m_rcv_syn_rcvd_data_q.push_back(packet); // Note that this is not a copy of the packet (just a pointer).
791
792 FLOW_LOG_TRACE("Receive queue now has [" << sock->m_rcv_syn_rcvd_data_q.size() << "] packets; "
793 "cumulative data size is [" << sock->m_rcv_syn_rcvd_data_cumulative_size << "].");
794} // Node::handle_data_to_syn_rcvd()
795
797 const Error_code& disconnect_cause, bool close)
798{
799 /* @todo Nothing calls this yet, as we don't support any way to close a Server_socket yet.
800 * Probably will reconsider this method when we do. */
801
802 Server_socket::Lock_guard lock{serv->m_mutex};
803 serv->m_disconnect_cause = disconnect_cause;
804 if (close)
805 {
806 // DONE.
807 serv_set_state(serv, Server_socket::State::S_CLOSED); // Reentrant mutex => OK.
808 }
809 else
810 {
811 // This socket is screwed but not yet out of the Node's system.
812 serv_set_state(serv, Server_socket::State::S_CLOSING); // Reentrant mutex => OK.
813 }
814}
815
817{
818 using std::list;
819
820 // We are in thread W.
821
822 /* sock is in one of two stages:
823 * - stage 1: serv->m_connecting_socks (not available via accept()), the earlier stage;
824 * - stage 2: serv->m_unaccepted_socks (available via accept()), the later stage.
825 *
826 * Try stage 1, then stage 2. */
827
828 // Stage 1.
829 const bool erased = serv->m_connecting_socks.erase(sock) == 1;
830 if (erased)
831 {
832 /* Maintain invariant. No need to lock mutex, because sock is in serv->m_connecting_socks, which
833 * means it is not in serv->m_unaccepted_socks yet, which means accept() cannot yield it, which means
834 * no non-W thread could be accessing m_originating_serv at the same time. */
835 sock->m_originating_serv.reset();
836 return;
837 }
838 // else
839
840 // Stage 2.
841
842 /* Remove from serv->m_unaccepted_socks. At this point accept() can access serv->m_unaccepted_socks and
843 * m_originating_serv, so we must lock. */
844 Server_socket::Lock_guard lock{serv->m_mutex};
845
846 sock->m_originating_serv.reset(); // Maintain invariant.
847
848 // O(1)ish.
849 serv->m_unaccepted_socks.erase(sock);
850
851 /* Notes:
852 *
853 * The unaccepted socket queue of serv can be accessed by accept()ing threads outside
854 * of thread W. So we must lock object at least to avoid corruption. We do that above.
855 *
856 * Now, let's think about the race. Suppose close_connection_immediately() starts and wins
857 * the race to lock *this; removes sock from serv->m_unaccepted_socks; unlocks *this; then the
858 * user immediately gets to call accept(). The user will not get sock as the result of the
859 * accept(), as we'd removed it in time. Good. Now suppose close_connection_immediately() starts
860 * but loses the race to lock *sock; user calls accept() first, and accept() yields sock (in
861 * S_ESTABLISHED state, though with empty Receive buffer, which is a pre-condition for
862 * close_connection_immediately()); then we lock sock and remove sock from
863 * serv->m_unaccepted_socks. Is this OK? Well, it is not different from this situation: they
864 * accept()ed, and then quickly there was an error on the resulting socket, so we closed it
865 * before any data came in. Therefore, yes, this is also OK. */
866} // Node::serv_peer_socket_closed()
867
869{
870 // We are in thread W.
871
872 {
873 Server_socket::Lock_guard lock{serv->m_mutex};
874 serv->m_unaccepted_socks.insert(sock); // Remember that Linked_hash_set<> insert()s at the *front*.
875 }
876 // This guy is only to be accessed from thread W (which we're in), so no lock needed.
877 serv->m_connecting_socks.erase(sock);
878}
879
881{
882 // We are in thread W.
883
884 /* Add this connecting socket to the pool of such connecting sockets maintained by the
885 * Server_socket. Once it is fully established, it will move from here to the queue
886 * serv->m_unaccepted_socks, where users can claim it via accept(). The utility of serv->m_unaccepted_socks
887 * is obvious, but why keep serv->m_connecting_socks? After all we've already added sock to m_socks, so
888 * demultiplexing of those messages (like SYN_ACK_ACK) will work without any additional structure.
889 * Answer: we need serv->m_connecting_socks at least for the case where we or the user close the
890 * listening socket (serv). In this case all pending connections must be aborted via RST (to let
891 * the other side know), and we'd know which ones to contact via serv->m_connecting_socks.
892 * The disallowing of accept()s after the associated listen() has been canceled is discussed in
893 * Server_socket documentation, but in short that behavior is similar to the BSD sockets
894 * behavior. */
895 serv->m_connecting_socks.insert(sock);
896
897 // And vice versa: maintain the invariant.
898 sock->m_originating_serv = serv;
899
900 // We didn't lock, because socket not yet available via accept(), so not accessed from non-W threads.
901} // Node::serv_peer_socket_init()
902
903Server_socket* Node::serv_create(const Peer_socket_options* child_sock_opts) // Virtual.
904{
905 // Just make a regular net_flow::Server_socket.
906 return serv_create_forward_plus_ctor_args<Server_socket>(child_sock_opts);
907}
908
909// Free implementations.
910
911std::ostream& operator<<(std::ostream& os, const Server_socket* serv)
912{
913 return
914 serv
915 ? (os
916 << "NetFlow_server [NetFlow [:" << serv->local_port() << "]] @" << static_cast<const void*>(serv))
917 : (os << "NetFlow_server@null");
918}
919
920/// @cond
921/* -^- Doxygen, please ignore the following. (Don't want docs generated for temp macro; this is more maintainable
922 * than specifying the macro name to omit it, in Doxygen-config EXCLUDE_SYMBOLS.) */
923
924// That's right, I did this. Wanna fight about it?
925#define STATE_TO_CASE_STATEMENT(ARG_state) \
926 case Server_socket::State::S_##ARG_state: \
927 return os << #ARG_state
928
929// -v- Doxygen, please stop ignoring.
930/// @endcond
931
932std::ostream& operator<<(std::ostream& os, Server_socket::State state)
933{
934 switch (state)
935 {
936 STATE_TO_CASE_STATEMENT(LISTENING);
937 STATE_TO_CASE_STATEMENT(CLOSING);
938 STATE_TO_CASE_STATEMENT(CLOSED);
939 }
940 return os;
941#undef STATE_TO_CASE_STATEMENT
942}
943
944} // namespace flow::net_flow
const Component & get_log_component() const
Returns reference to the stored Component object, particularly as many FLOW_LOG_*() macros expect.
Definition: log.cpp:226
Logger * get_logger() const
Returns the stored Logger pointer, particularly as many FLOW_LOG_*() macros expect.
Definition: log.cpp:215
Interface that the user should implement, passing the implementing Logger into logging classes (Flow'...
Definition: log.hpp:1286
static Congestion_control_strategy * create_strategy(Strategy_choice strategy_choice, log::Logger *logger_ptr, Peer_socket::Const_ptr sock)
Factory method that, given an enum identifying the desired strategy, allocates the appropriate Conges...
Definition: cong_ctl.cpp:101
@ S_SERVER_SOCKET_ACCEPTABLE
Event type specifying the condition of interest wherein a target Server_socket serv is such that call...
An object of this class is a single Flow-protocol networking node, in the sense that: (1) it has a di...
Definition: node.hpp:934
void serv_peer_socket_init(Server_socket::Ptr serv, Peer_socket::Ptr sock)
Records a new (just received SYN) peer socket from the given server socket.
void handle_data_to_established(const Socket_id &socket_id, Peer_socket::Ptr sock, boost::shared_ptr< Data_packet > packet, bool syn_rcvd_qd_packet)
Handles a just-deserialized, just-demultiplexed, low-level DATA packet delivered to the given peer so...
virtual Server_socket * serv_create(const Peer_socket_options *child_sock_opts)
Internal factory used for ALL Server_socket objects created by this Node (including subclasses).
void serv_set_state(Server_socket::Ptr serv, Server_socket::State state)
Sets Server_socket::m_state.
static bool ensure_sock_open(Socket_ptr sock, Error_code *err_code)
Helper method that checks whether the given Peer_socket or Server_socket is CLOSED; if so,...
Definition: node.hpp:4093
void handle_syn_ack_ack_to_syn_rcvd(const Socket_id &socket_id, Peer_socket::Ptr sock, boost::shared_ptr< const Syn_ack_ack_packet > syn_ack_ack)
Handles a just-deserialized, just-demultiplexed low-level SYN_ACK_ACK packet delivered to the given p...
Non_blocking_func_ret_type sync_op(typename Socket::Ptr sock, const Function< Non_blocking_func_ret_type()> &non_blocking_func, Non_blocking_func_ret_type would_block_ret_val, Event_set::Event_type ev_type, const Fine_time_pt &wait_until, Error_code *err_code)
Implementation of core blocking transfer methods, namely Peer_socket::sync_send(),...
Definition: node.hpp:3929
void serv_close_detected(Server_socket::Ptr serv, const Error_code &disconnect_cause, bool close)
Records that thread W shows this socket is not to listen to incoming connections and is to abort any ...
void cancel_timers(Peer_socket::Ptr sock)
Cancel any timers and scheduled tasks active in the given socket.
Opt_type opt(const Opt_type &opt_val_ref) const
Obtain a copy of the value of a given option in a thread-safe manner.
Definition: node.hpp:4132
bool sock_validate_options(const Peer_socket_options &opts, const Peer_socket_options *prev_opts, Error_code *err_code) const
Analogous to validate_options() but checks per-socket options instead of per-Node options.
Options_mutex m_opts_mutex
The mutex protecting m_opts.
Definition: node.hpp:3665
bool serv_is_acceptable(const boost::any &serv_as_any) const
Returns true if and only if calling serv->accept() with at least some arguments would return either n...
Syn_ack_packet::Ptr create_syn_ack(Peer_socket::Const_ptr sock)
Like create_syn() but for SYN_ACK.
virtual Peer_socket * sock_create(const Peer_socket_options &opts)
Internal factory used for ALL Peer_socket objects created by this Node (including subclasses).
void async_no_sock_low_lvl_rst_send(Low_lvl_packet::Const_ptr causing_packet, const util::Udp_endpoint &low_lvl_remote_endpoint)
Sends an RST to the given UDP endpoint in response to the given incoming low-level packet that came f...
Definition: low_lvl_io.cpp:584
Sequence_number::Generator m_seq_num_generator
Sequence number generator (at least to generate ISNs). Only thread W can access this.
Definition: node.hpp:3738
void event_set_all_check_delta(bool defer_delta_check)
For each WAITING Event_set within the Node: checks for any events that hold, and if any do hold,...
Definition: event_set.cpp:1127
void serv_peer_socket_closed(Server_socket::Ptr serv, Peer_socket::Ptr sock)
Records that a Server_socket-contained (i.e., currently un-established, or established but not yet ac...
Socket_id_to_socket_map m_socks
The peer-to-peer connections this Node is currently tracking.
Definition: node.hpp:3744
Peer_socket::Options_lock Options_lock
Short-hand for lock that acquires exclusive access to an Options_mutex.
Definition: node.hpp:1436
static Socket_id socket_id(Peer_socket::Const_ptr sock)
Constructs the socket pair (connection ID) for the given socket.
Server_socket::Ptr listen(flow_port_t local_port, Error_code *err_code=nullptr, const Peer_socket_options *child_sock_opts=nullptr)
Sets up a server on the given local Flow port and returns Server_socket which can be used to accept s...
void sock_set_int_state(Peer_socket::Ptr sock, Peer_socket::Int_state new_state)
Sets internal state of given socket to the given state and logs a TRACE message about it.
Peer_socket::Ptr handle_syn_to_listening_server(Server_socket::Ptr serv, boost::shared_ptr< const Syn_packet > syn, const util::Udp_endpoint &low_lvl_remote_endpoint)
Handles a just-deserialized, just-demultiplexed low-level SYN packet delivered to the given server so...
bool running() const
Returns true if and only if the Node is operating.
Definition: node.cpp:420
Port_to_server_map m_servs
The server sockets this Node is currently tracking.
Definition: node.hpp:3750
Event_set::Ev_type_to_socks_map m_sock_events
All sockets that have been detected to be "ready" (by the Event_set doc header definition) at any poi...
Definition: node.hpp:3782
void setup_drop_timer(const Socket_id &socket_id, Peer_socket::Ptr sock)
Creates a new Drop Timer and saves it to sock->m_snd_drop_timer.
void listen_worker(flow_port_t local_port, const Peer_socket_options *child_sock_opts, Server_socket::Ptr *serv)
Thread W implementation of listen().
void serv_peer_socket_acceptable(Server_socket::Ptr serv, Peer_socket::Ptr sock)
Records that an unestablished socket sock (Peer_socket::Int_state::S_SYN_RCVD) has just become establ...
void handle_data_to_syn_rcvd(Peer_socket::Ptr sock, boost::shared_ptr< Data_packet > packet)
Handles a just-deserialized, just-demultiplexed, low-level DATA packet delivered to the given peer so...
void setup_connection_timers(const Socket_id &socket_id, Peer_socket::Ptr sock, bool initial)
Assuming we've just sent SYN or SYN_ACK, sets up an asynchronous scheduled task to fire within some a...
size_t sock_rcv_wnd(Peer_socket::Const_ptr sock) const
Computes and returns the currently correct rcv_wnd value; that is the amount of space free in Receive...
Peer_socket::Ptr accept(Server_socket::Ptr serv, Error_code *err_code)
Implementation of non-blocking serv->accept() for server socket serv in all cases except when serv->s...
void sock_set_state(Peer_socket::Ptr sock, Peer_socket::State state, Peer_socket::Open_sub_state open_sub_state=Peer_socket::Open_sub_state::S_CONNECTED)
Sets Peer_socket::m_state and Peer_socket::m_open_sub_state.
Node_options m_opts
This Node's global set of options.
Definition: node.hpp:3662
void close_empty_server_immediately(const flow_port_t local_port, Server_socket::Ptr serv, const Error_code &err_code, bool defer_delta_check)
Handles the transition of the given server socket from S_LISTENING/S_CLOSING to S_CLOSED (including e...
void async_sock_low_lvl_packet_send_paced(const Peer_socket::Ptr &sock, Low_lvl_packet::Ptr &&packet)
Begins the process of asynchronously sending the given low-level packet to the remote Node specified ...
Definition: low_lvl_io.cpp:603
util::Task_engine m_task_engine
The main loop engine, functioning in the single-threaded-but-asynchronous callback-based "reactor" st...
Definition: node.hpp:3697
Port_space m_ports
Flow port space for both client and server sockets. All threads may access this.
Definition: node.hpp:3735
void rst_and_close_connection_immediately(const Socket_id &socket_id, Peer_socket::Ptr sock, const Error_code &err_code, bool defer_delta_check)
Asynchronously send RST to the other side of the given socket and close_connection_immediately().
security_token_t m_security_token
Random security token used during SYN_ACK-SYN_ACK_ACK.
uint64_t security_token_t
Type used for m_security_token.
@ S_OPEN
Future reads or writes may be possible. A socket in this state may be Writable or Readable.
@ S_CONNECTED
This Peer_socket was created through a passive connect (Node::accept() and the like) or an active con...
@ S_CONNECTING
This Peer_socket was created through an active connect (Node::connect() and the like),...
util::Lock_guard< Mutex > Lock_guard
Short-hand for RAII lock guard of Mutex.
@ S_ESTABLISHED
Public state is OPEN+CONNECTED; in our opinion the connection is established.
@ S_CLOSED
Closed (dead or new) socket.
@ S_SYN_RCVD
Public state is OPEN+CONNECTING; other side requested passive connect via SYN; we sent SYN_ACK and ar...
flow_port_t reserve_port(flow_port_t port, Error_code *err_code)
Reserve the specified service port, or reserve_ephemeral_port() if the specified port is S_PORT_ANY.
Definition: port_space.cpp:76
void return_port(flow_port_t port, Error_code *err_code)
Return a previously reserved port (of any type).
Definition: port_space.cpp:176
A per-Peer_socket module that tries to estimate the bandwidth available to the outgoing flow.
Definition: bandwidth.hpp:125
Sequence_number generate_init_seq_num()
Returns an initial sequence number (ISN) for use in a new connection.
Definition: seq_num.cpp:47
An internal net_flow sequence number identifying a piece of data.
Definition: seq_num.hpp:126
void set_metadata(char num_line_id=0, const Sequence_number &zero_point=Sequence_number{}, seq_num_delta_t multiple_size=0)
Updates the full set of metadata (used at least for convenient convention-based logging but not actua...
Definition: seq_num.cpp:245
A server socket able to listen on a single Flow port for incoming connections and return peer sockets...
Peer_socket_ptr accept(Error_code *err_code=nullptr)
Non-blocking accept: obtain socket for the least recently established not-yet-obtained peer connectio...
Server_socket(log::Logger *logger, const Peer_socket_options *child_sock_opts)
Constructs object; initializes most values to well-defined (0, empty, etc.) but not necessarily meani...
Mutex m_mutex
This object's mutex.
Error_code disconnect_cause() const
The error code that perviously caused state() to become State::S_CLOSED, or success code if state is ...
Peer_socket_ptr sync_accept(const boost::chrono::duration< Rep, Period > &max_wait, bool reactor_pattern=false, Error_code *err_code=nullptr)
Blocking (synchronous) version of accept().
Peer_socket_options const *const m_child_sock_opts
Either null or the pointer to a copy of the template Peer_socket_options intended for resulting Peer_...
State m_state
See state(). Should be set before user gets access to *this. Must not be modified by non-W threads.
Peer_socket_ptr sync_accept_impl(const Fine_time_pt &wait_until, bool reactor_pattern, Error_code *err_code)
Same as sync_accept() but uses a Fine_clock-based Fine_duration non-template type for implementation ...
State
State of a Server_socket.
@ S_CLOSED
No accept()s are or will be possible, AND Node has disowned the Server_socket.
@ S_CLOSING
No accept()s are or will be possible, but Node is still finishing up the closing operation.
@ S_LISTENING
Future or current accept()s may be possible. A socket in this state may be Acceptable.
State state() const
Current State of the socket.
flow_port_t m_local_port
See local_port().
~Server_socket() override
Boring virtual destructor. Note that deletion is to be handled exclusively via shared_ptr,...
flow_port_t local_port() const
The local Flow-protocol port on which this server is or was listening.
util::Lock_guard< Mutex > Lock_guard
Short-hand for RAII lock guard of Mutex.
Node * node() const
Node that produced this Server_socket.
Error_code m_disconnect_cause
The Error_code causing this server's move from LISTENING state (if this has occurred); otherwise a cl...
static Const_ptr const_ptr_cast(const From_ptr &ptr_to_cast)
Identical to ptr_cast() but adds const-ness (immutability) to the pointed-to type.
static Ptr ptr_cast(const From_ptr &ptr_to_cast)
Provides syntactic-sugary way to perform a static_pointer_cast<> from a compatible smart pointer type...
boost::shared_ptr< Peer_socket > Ptr
Short-hand for ref-counted pointer to mutable values of type Target_type::element_type (a-la T*).
Const_target_ptr Const_ptr
Short-hand for ref-counted pointer to immutable values of type Target_type::element_type (a-la T cons...
#define FLOW_ERROR_LOG_ERROR(ARG_val)
Logs a warning about the given error code using FLOW_LOG_WARNING().
Definition: error.hpp:233
#define FLOW_ERROR_EXEC_AND_THROW_ON_ERROR(ARG_ret_type, ARG_function_name,...)
Narrow-use macro that implements the error code/exception semantics expected of most public-facing Fl...
Definition: error.hpp:363
#define FLOW_ERROR_EMIT_ERROR(ARG_val)
Sets *err_code to ARG_val and logs a warning about the error using FLOW_LOG_WARNING().
Definition: error.hpp:202
#define FLOW_ERROR_EMIT_ERROR_LOG_INFO(ARG_val)
Identical to FLOW_ERROR_EMIT_ERROR(), but the message logged has flow::log::Sev::S_INFO severity inst...
Definition: error.hpp:218
#define FLOW_LOG_INFO(ARG_stream_fragment)
Logs an INFO message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
Definition: log.hpp:197
#define FLOW_LOG_WARNING(ARG_stream_fragment)
Logs a WARNING message into flow::log::Logger *get_logger() with flow::log::Component get_log_compone...
Definition: log.hpp:152
#define FLOW_LOG_WITH_CHECKING(ARG_sev, ARG_stream_fragment)
Logs a message of the specified severity into flow::log::Logger *get_logger() with flow::log::Compone...
Definition: log.hpp:489
#define FLOW_LOG_TRACE_WITHOUT_CHECKING(ARG_stream_fragment)
Logs a TRACE message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
Definition: log.hpp:354
#define FLOW_LOG_DATA_WITHOUT_CHECKING(ARG_stream_fragment)
Logs a DATA message into flow::log::Logger *get_logger() with flow::log::Component get_log_component(...
Definition: log.hpp:372
#define FLOW_LOG_TRACE(ARG_stream_fragment)
Logs a TRACE message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
Definition: log.hpp:227
Synchronicity
Enumeration indicating the manner in which asio_exec_ctx_post(), and various boost....
Definition: async_fwd.hpp:223
void asio_exec_ctx_post(log::Logger *logger_ptr, Execution_context *exec_ctx, Synchronicity synchronicity, Task &&task)
An extension of boost.asio's post() and dispatch() free function templates, this free function templa...
Definition: util.hpp:31
@ S_DATA
Message satisfies Sev::S_TRACE description AND contains variable-length structure (like packet,...
@ S_TRACE
Message indicates any condition that may occur with great frequency (thus verbose if logged).
@ S_INFO
Message indicates a not-"bad" condition that is not frequent enough to be of severity Sev::S_TRACE.
@ S_CONN_RESET_BAD_PEER_BEHAVIOR
Connection reset because of unexpected/illegal behavior by the other side.
@ S_INTERNAL_ERROR_PORT_COLLISION
Internal error: Ephemeral port double reservation allowed.
@ S_NODE_NOT_RUNNING
Node not running.
Flow module containing the API and implementation of the Flow network protocol, a TCP-inspired stream...
Definition: node.cpp:25
uint16_t flow_port_t
Logical Flow port type (analogous to a UDP/TCP port in spirit but in no way relevant to UDP/TCP).
const flow_port_t S_PORT_ANY
Special Flow port value used to indicate "invalid port" or "please pick a random available ephemeral ...
Definition: port_space.cpp:35
std::ostream & operator<<(std::ostream &os, const Congestion_control_selector::Strategy_choice &strategy_choice)
Serializes a Peer_socket_options::Congestion_control_strategy_choice enum to a standard ostream – the...
Definition: cong_ctl.cpp:146
bool key_exists(const Container &container, const typename Container::key_type &key)
Returns true if and only if the given key is present at least once in the given associative container...
Definition: util.hpp:310
Blob_with_log_context<> Blob
A concrete Blob_with_log_context that compile-time-disables Basic_blob::share() and the sharing API d...
Definition: blob_fwd.hpp:62
boost::asio::ip::udp::endpoint Udp_endpoint
Short-hand for the UDP endpoint (IP/port) type.
Definition: util_fwd.hpp:220
boost::system::error_code Error_code
Short-hand for a boost.system error code (which basically encapsulates an integer/enum error code and...
Definition: common.hpp:497
Flow_log_component
The flow::log::Component payload enumeration comprising various log components used by Flow's own int...
Definition: common.hpp:627
Fine_clock::time_point Fine_time_pt
A high-res time point as returned by Fine_clock::now() and suitable for precise time math in general.
Definition: common.hpp:402
The data nugget uniquely identifying a peer-to-peer connection from a remote endpoint to a port in th...
Definition: node.hpp:3856
unsigned int m_dyn_accept_backlog_limit
Maximum backlog size for each Server_socket subsequently created via Node::listen().
Definition: options.hpp:588
Peer_socket_options m_dyn_sock_opts
The set of per-Peer_socket options in this per-Node set of options.
Definition: options.hpp:600
A set of low-level options affecting a single Peer_socket.
Definition: options.hpp:36
Represents the remote endpoint of a Flow-protocol connection; identifies the UDP endpoint of the remo...
Definition: endpoint.hpp:93