Flow 2.0.0
Flow project: Full implementation reference.
server_socket.cpp
Go to the documentation of this file.
1/* Flow
2 * Copyright 2023 Akamai Technologies, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the
5 * "License"); you may not use this file except in
6 * compliance with the License. You may obtain a copy
7 * of the License at
8 *
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in
12 * writing, software distributed under the License is
13 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
14 * CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing
16 * permissions and limitations under the License. */
17
18/// @file
23#include "flow/async/util.hpp"
24
25namespace flow::net_flow
26{
27
28// Server_socket implementation.
29
30Server_socket::Server_socket(log::Logger* logger_ptr, const Peer_socket_options* child_sock_opts) :
31 Log_context(logger_ptr, Flow_log_component::S_NET_FLOW),
32 /* If they supplied a Peer_socket_options, store a copy of it. When new Peer_sockets are made
33 * (when people connect to us), each peer socket's per-socket options will be copies of this. If
34 * they did not supply a Peer_socket_options, the Node's global Peer_socket_options will be used
35 * for each subsequent Peer_socket. */
36 m_child_sock_opts(child_sock_opts ? new Peer_socket_options(*child_sock_opts) : 0),
37 m_state(State::S_CLOSED), // Incorrect; set explicitly.
38 m_node(0), // Incorrect; set explicitly.
39 m_local_port(S_PORT_ANY) // Incorrect; set explicitly.
40{
41 // Only print pointer value, because most members are garbage at this point.
42 FLOW_LOG_TRACE("Server_socket [" << static_cast<void*>(this) << "] created.");
43}
44
46{
47 delete m_child_sock_opts; // May be 0 (that's okay).
48
49 FLOW_LOG_TRACE("Server_socket [" << this << "] destroyed.");
50}
51
53{
54 Lock_guard lock(m_mutex); // State is liable to change at any time.
55 return m_state;
56}
57
59{
60 Lock_guard lock(m_mutex); // m_node can simultaneously change to 0 if state changes to S_CLOSED.
61 return m_node;
62}
63
65{
66 Lock_guard lock(m_mutex);
67 return m_disconnect_cause;
68}
69
71{
72 return m_local_port; // No need to lock (it never changes).
73}
74
76{
78 // ^-- Call ourselves and return if err_code is null. If got to present line, err_code is not null.
79
80 // We are in user thread U != W.
81
82 Lock_guard lock(m_mutex);
83
84 const Ptr serv = shared_from_this();
85 if (!Node::ensure_sock_open(serv, err_code)) // Ensure it's open, so that we can access m_node.
86 {
87 return Peer_socket::Ptr();
88 }
89 // else m_node is valid.
90
91 // Forward the rest of the logic to Node, as is the general convention especially for logic affecting outside *this.
92 return m_node->accept(serv, err_code);
93} // Server_socket::accept()
94
96{
97 return sync_accept_impl(Fine_time_pt(), reactor_pattern, err_code);
98}
99
100Peer_socket::Ptr Server_socket::sync_accept_impl(const Fine_time_pt& wait_until, bool reactor_pattern,
101 Error_code* err_code)
102{
103 using boost::adopt_lock;
104
106 // ^-- Call ourselves and return if err_code is null. If got to present line, err_code is not null.
107
108 // We are in user thread U != W.
109
110 Lock_guard lock(m_mutex);
111
112 const Ptr serv = shared_from_this();
113 if (!Node::ensure_sock_open(serv, err_code)) // Ensure it's open, so that we can access m_node.
114 {
115 return Peer_socket::Ptr();
116 }
117 // else m_node is valid.
118
119 lock.release(); // Release lock (mutex is still LOCKED). sync_op() takes over holding the lock and unlocking.
120
121 // See comment in Peer_socket::node_sync_send().
122
123 /* Operating on Server_sockets, returning Peer_socket::Ptr; Event_set socket set type is
124 * Server_sockets.
125 * Object is serv; non-blocking operation is m_node->accept(...) -- or N/A in "reactor pattern" mode..
126 * Peer_socket::Ptr() is the "would-block" return value for this operation.
127 * S_SERVER_SOCKET_ACCEPTABLE is the type of event to watch for here. */
128 return m_node
130 (serv,
131 reactor_pattern
133 : Function<Peer_socket::Ptr ()>([this, serv, err_code]() -> Peer_socket::Ptr
134 { return m_node->accept(serv, err_code); }),
136 wait_until, err_code);
137} // Server_socket::sync_accept_impl()
138
139// Node implementations (methods dealing with individual Server_sockets).
140
142 const Peer_socket_options* child_sock_opts)
143{
144 FLOW_ERROR_EXEC_AND_THROW_ON_ERROR(Server_socket::Ptr, listen, local_port, _1, child_sock_opts);
145 // ^-- Call ourselves and return if err_code is null. If got to present line, err_code is not null.
146
149 using boost::promise;
150 using boost::unique_future;
151
152 // We are in thread U != W.
153
154 if (!running())
155 {
157 return Server_socket::Ptr();
158 }
159 // else
160
161 /* Basically we now need to do the following.
162 * -1- Reserve local_port in m_ports.
163 * -2- Create Server_socket serv.
164 * -3- Save serv in m_servs.
165 * (Both -1- and -3- can result in error.)
166 *
167 * -2- must be done in thread W, as m_servs is by design only to be accessed there. -1-, however,
168 * can be done in thread U, assuming m_ports is properly synchronized. So here are our choices:
169 *
170 * I. Perform -1- in U. If error, return. Perform -2-. Post callback to do -3- in W. Return
171 * serv in U. Meanwhile -2- will, at some point, be performed in W. If error in -3-, save it
172 * in serv. User will discover when trying to serv->accept().
173 *
174 * II. Perform -1- in U. If error, return. Perform -2-. Post callback to do -3- in W. Use a
175 * future/promise pair to wait for -3- to complete. Meanwhile -3- will, at some point, be performed
176 * in W. If error in -3-, save it in serv. Either way, U will wait for the result and return
177 * error or serv to the user.
178 *
179 * III. Post callback to do -1-, -2-, -3- in W. Use a future/promise pair to wait for -1-3- to complete.
180 * Meanwhile -1-3- will, at some point, be performed in W. If error in -1-3-, save it in
181 * serv. Either way, U will wait. If error, return error to the user. Else return serv to
182 * the user.
183 *
184 * Let's pick one. III > II due to simpler code; future used either way but most code is in one
185 * thread and one function (the W callback); the U part is a short wrapper. Also, Port_space need
186 * not be synchronized. So it's I vs. III. Advantage of I is speed; listen() will return in U
187 * faster, especially if thread W is loaded with work. (However III is still non-blocking, i.e.,
188 * no waiting for network events.) Advantage of III is simplicity of code (everything in one
189 * thread and callback, in W; no explicit locking); and any pre-networking error is immediately
190 * returned by listen() instead of being discovered later. I believe here III wins, especially
191 * because listen() should be fairly infrequent, so a split second speed difference should not be
192 * significant.
193 *
194 * So we choose III. We set up connect_worker() to run in W and wait for
195 * it to succeed or fail. asio_exec_ctx_post() does the promise/future stuff, or equivalent, so
196 * the code is really simple. */
197
198 // Load this onto thread W boost.asio work queue. We won't return until it's done, so [&] is OK.
200 asio_exec_ctx_post(get_logger(), &m_task_engine, Synchronicity::S_ASYNC_AND_AWAIT_CONCURRENT_COMPLETION,
201 [&]() { listen_worker(local_port, child_sock_opts, &serv); });
202 // If got here, the task has completed in thread W and signaled us to that effect.
203
204 // listen_worker() indicates success or failure through this data member.
205 if (serv->m_disconnect_cause)
206 {
207 *err_code = serv->m_disconnect_cause;
208 return Server_socket::Ptr(); // serv will go out of scope and thus will be destroyed.
209 }
210 // else
211 err_code->clear();
212 return serv;
213} // Node::listen()
214
215void Node::listen_worker(flow_port_t local_port, const Peer_socket_options* child_sock_opts,
216 Server_socket::Ptr* serv_ptr)
217{
218 assert(serv_ptr);
219
220 // We are in thread W. listen() is waiting for us to set serv_promise in thread U.
221
222 // Create new socket and set all members that may be immediately accessed by user from thread U after we're done.
223
224 auto& serv = *serv_ptr;
225 if (child_sock_opts)
226 {
227 /* They provided custom per-socket options to distribute to any Peer_sockets created when people
228 * connect to this server. Before we give those to the new server socket, let's validate them
229 * (for proper values and internal consistency, etc.). */
230
231 Error_code err_code;
232 const bool opts_ok = sock_validate_options(*child_sock_opts, 0, &err_code);
233
234 // Due to the advertised interface of the current method, we must create a socket even on error.
235 serv.reset(serv_create(child_sock_opts));
236
237 // Now report error if indeed options were invalid. err_code is already set and logged in that case.
238 if (!opts_ok)
239 {
240 serv->m_disconnect_cause = err_code;
241 return;
242 }
243 // else
244 }
245 else
246 {
247 /* More typically, they did not provide per-socket options. So just pass null pointer into
248 * Peer_socket constructor; this will mean that when a Peer_socket is generated on connection,
249 * the code is to provide a copy of the global template for the per-socket options. That will
250 * happen later; we just pass in null. */
251 serv.reset(serv_create(0));
252 }
253
254 // Server socket created; set members.
255
256 serv->m_node = this;
258
259 // Allocate given port.
260 serv->m_local_port = m_ports.reserve_port(local_port, &serv->m_disconnect_cause);
261 if (serv->m_local_port == S_PORT_ANY)
262 {
263 // Error already logged and is in serv->m_disconnect_cause.
264 return;
265 }
266 // else
267 local_port = serv->m_local_port; // If they'd specified S_PORT_ANY, this is now a random port.
268
269 FLOW_LOG_INFO("NetFlow worker thread listening for passive-connects on [" << serv << "].");
270
271 if (util::key_exists(m_servs, local_port))
272 {
273 /* This is a passive connect (we're accepting future connections). Therefore in particular it
274 * should be impossible that our local_port() equals an already existing connection's
275 * local_port(); Port_space is supposed to prevent the same port from being handed out to more
276 * than one connection. Therefore this must be a programming error. */
277
278 FLOW_LOG_WARNING("Cannot set up [" << serv << "], because server at port [" << local_port << "] already exists! "
279 "This is a port reservation error and constitutes either a bug or an extremely "
280 "unlikely condition.");
281
282 // Mark/log error.
283 Error_code* err_code = &serv->m_disconnect_cause;
285
286 // Return port.
287 Error_code return_err_code;
288 m_ports.return_port(serv->m_local_port, &return_err_code);
289 assert(!return_err_code);
290 return;
291 } // if (that server entry already exists)
292 // else
293
294 m_servs[local_port] = serv; // Now SYNs will be accepted.
295} // Node::listen_worker()
296
298{
299 // We are in user thread U != W.
300
301 // IMPORTANT: The logic here must be consistent with serv_is_acceptable().
302
303 // serv->m_mutex already locked.
304
305 if (serv->m_state == Server_socket::State::S_CLOSING)
306 {
307 /* This is the same to the user as CLOSED -- only different in that serv has not been disowned by the Node yet.
308 * See rationale for this in the accept() documentation header. */
309 FLOW_ERROR_EMIT_ERROR_LOG_INFO(serv->m_disconnect_cause);
310
311 // Not listening anymore; pretend nothing on queue.
312 return Peer_socket::Ptr();
313 }
314 // else
315 assert(serv->m_state == Server_socket::State::S_LISTENING);
316
317 if (serv->m_unaccepted_socks.empty())
318 {
319 // Nothing on the queue. As advertised, this is not an error in LISTENING state.
320 err_code->clear();
321 return Peer_socket::Ptr();
322 }
323 // else
324
325 // Pop from queue. Linked_hash_set queues things up at the front (via insert()), so pop from the back.
326 Peer_socket::Ptr sock = serv->m_unaccepted_socks.const_back();
327 serv->m_unaccepted_socks.pop_back();
328
329 /* Now that it's accepted, remove reference to the server socket, so that when the server socket
330 * is closed, sock is not closed (since it's a fully functioning independent socket now). */
331 sock->m_originating_serv.reset(); // This is synchronized together with m_unaccepted_socks.
332
333 FLOW_LOG_INFO("Connection [" << sock << "] on [" << serv << "] accepted.");
334
335 err_code->clear();
336 return sock;
337} // Node::accept()
338
339bool Node::serv_is_acceptable(const boost::any& serv_as_any) const
340{
341 using boost::any_cast;
342
343 const Server_socket::Const_ptr serv = any_cast<Server_socket::Ptr>(serv_as_any);
344
345 Peer_socket::Lock_guard lock(serv->m_mutex); // Many threads can access/write below state.
346
347 /* Our task here is to return true if and only if at this very moment calling serv->accept()would
348 * yield either a non-null return value OR a non-success *err_code. In other words,
349 * accept() would return "something." This is used for Event_set machinery.
350 *
351 * This should mirror accept()'s algorithm. @todo Should accept() call this, for code reuse?
352 * Maybe/maybe not. Consider performance when deciding.
353 *
354 * Basically, CLOSING/CLOSED => error (Acceptable); LISTENING + non-empty accept queue =>
355 * returns a socket (Acceptable); LISTENING + empty queue => returns null and no error (not
356 * Acceptable). So the latter is the only way (though quite common) it can be NOT Acceptable. */
357
358 return !((serv->m_state == Server_socket::State::S_LISTENING) && serv->m_unaccepted_socks.empty());
359} // Node::serv_is_acceptable()
360
362 const Error_code& err_code, bool defer_delta_check)
363{
364 // We are in thread W.
365
366 // Check explicitly documented pre-conditions.
367
368 assert(serv->m_state != Server_socket::State::S_CLOSED);
369 // Caller should have closed all the associated sockets already.
370 assert(serv->m_connecting_socks.empty());
371 {
372 Server_socket::Lock_guard lock(serv->m_mutex); // At least m_unaccepted_socks can be accessed by user.
373 assert(serv->m_unaccepted_socks.empty());
374 }
375
376 FLOW_ERROR_LOG_ERROR(err_code);
377 FLOW_LOG_INFO("Closing and destroying [" << serv << "].");
378
379 serv_close_detected(serv, err_code, true); // Sets S_CLOSED public state (and related data).
380
381 // Next, remove serv from our main server list.
382
383#ifndef NDEBUG
384 const bool erased = 1 ==
385#endif
386 m_servs.erase(local_port);
387 assert(erased); // Not S_CLOSED => it's in m_servs. Otherwise there's a serious bug somewhere.
388
389 // Return the port.
390 Error_code return_err_code;
391 m_ports.return_port(local_port, &return_err_code);
392 assert(!return_err_code);
393
394 /* serv has changed to CLOSED state. Performing serv->accept() would therefore
395 * certainly return an error. Returning an error from that method (as opposed to null but no
396 * error) is considered Acceptable (as we want to alert the user to the error, so her wait [if
397 * any] wakes up and notices the error). Therefore we should soon inform anyone waiting on any
398 * Event_sets for serv to become Acceptable
399 *
400 * Caveat: Similar to that in Node::handle_syn_ack_ack_to_syn_rcvd() at similar point in the
401 * code. */
402
403 // Accumulate the event into the Node store (note: not any Event_set yet).
405 {
406 // Possibly inform the user for any applicable Event_sets right now.
407 event_set_all_check_delta(defer_delta_check);
408 }
409} // Node::close_empty_server_immediately()
410
412{
413 Server_socket::Lock_guard lock(serv->m_mutex);
414
415 // @todo Add TRACE logging.
416
417 serv->m_state = state;
419 {
420 /* Important convention: S_CLOSED means socket is permanently incapable of accepting more
421 * connections. At this point the originating Node removes the
422 * socket from its internal structures. Therefore, the Node itself may even go away -- while
423 * this Server_socket still exists. Since we use shared_ptr when giving our socket objects,
424 * that's fine -- but we want to avoid returning an invalid Node* in node(). So, when
425 * S_CLOSED, serv->m_node = 0. */
426 serv->m_node = 0;
427 }
428}
429
431 boost::shared_ptr<const Syn_packet> syn,
432 const util::Udp_endpoint& low_lvl_remote_endpoint)
433{
434 using util::Blob;
435 using boost::random::uniform_int_distribution;
436
437 // We are in thread W.
438
439 /* We just got SYN (an overture from the other side). Create a peer-to-peer socket to track that
440 * connection being established. */
441
442 Peer_socket::Ptr sock;
443 if (serv->m_child_sock_opts)
444 {
445 /* They provided custom per-socket options in listen(), and we've been storing them in *serv.
446 * They're already validated at listen() time, so we just give them to Peer_socket constructor,
447 * which copies them. */
448 sock.reset(sock_create(*serv->m_child_sock_opts));
449 }
450 else
451 {
452 /* More typically, they did not provide per-socket options. So we just pass our global template
453 * for the per-socket options to the Peer_socket constructor. The only caveat is that template
454 * may be concurrently changed, so we must lock it. Could do it with opt(), but that introduces
455 * an extra copy of the entire struct, so just do it explicitly (read-only lock for
456 * performance).
457 *
458 * Note: no need to validate; global options (including per-socket ones) are validated
459 * elsewhere when set. */
462 }
463
464 // Socket created; set members.
465
466 sock->m_active_connect = false;
467 sock->m_node = this;
469 sock->m_remote_endpoint = Remote_endpoint{ low_lvl_remote_endpoint, syn->m_packed.m_src_port };
470 sock->m_local_port = serv->m_local_port;
471 // Save it for user to be able to call sock->get_connect_metadata(). Add const to express we want copy, not move.
472 sock->m_serialized_metadata = static_cast<const Blob&>(syn->m_serialized_metadata);
473 sock->m_int_state = Peer_socket::Int_state::S_CLOSED; // Kind of pedantic. We'll set SYN_RCVD a bit later on.
474 // Save the start of the sequence number series based on their initial sequence number.
475 sock->m_rcv_init_seq_num = syn->m_init_seq_num;
476 sock->m_rcv_next_seq_num = sock->m_rcv_init_seq_num + 1;
477
478 /* Initialize the connection's send bandwidth estimator (object that estimates available
479 * outgoing bandwidth based on incoming acknowledgments). It may be used by m_snd_cong_ctl,
480 * depending on the strategy chosen, but may be useful in its own right. Hence it's a separate
481 * object, not inside *m_snd_cong_ctl. */
482 sock->m_snd_bandwidth_estimator.reset(new Send_bandwidth_estimator(get_logger(), sock));
483
484 // Initialize the connection's congestion control strategy based on the configured strategy.
485 sock->m_snd_cong_ctl.reset
486 (Congestion_control_selector::create_strategy(sock->m_opts.m_st_cong_ctl_strategy, get_logger(), sock));
487 // ^-- No need to use opt() yet: user doesn't have socket and cannot set_options() on it yet.
488
489 const Socket_id& socket_id = Node::socket_id(sock);
490 FLOW_LOG_INFO("NetFlow worker thread starting passive-connect of [" << sock << "] on [" << serv << "]. "
491 "Received [" << syn->m_type_ostream_manip << "] with ISN [" << syn->m_init_seq_num << "].");
492
493 // Ensure we can support the specified packet options.
494
495 if (syn->m_opt_rexmit_on != sock->rexmit_on())
496 {
497 FLOW_LOG_WARNING("NetFlow worker thread starting passive-connect of [" << sock << "] on [" << serv << "]. "
498 "Received [" << syn->m_type_ostream_manip << "] with "
499 "opt_rexmit_on [" << syn->m_opt_rexmit_on << "]; was configured otherwise on this side; "
500 "resetting connection.");
501 /* We'd inform the user here, but they didn't open the connection (it's a passive open, and they
502 * haven't yet called accept()). We can respond with RST, however, to tell the other side this
503 * connection isn't going to happen. We didn't place sock into m_socks, so just let it
504 * disappear via shared_ptr<> magic. */
506 return Peer_socket::Ptr();
507 }
508 // else
509
510 // Ensure this socket pair does not yet exist in our peer-to-peer socket table.
511
513 {
514 /* This is a passive connect (they're intiating the connection). Therefore in particular it
515 * should be impossible that our local_port() equals an already existing connection's
516 * local_port(); Port_space is supposed to prevent the same ephemeral or service port from being
517 * handed out to more than one connection. Therefore this must be a programming error. */
518
519 FLOW_LOG_WARNING("Cannot add [" << sock << "], because such a connection already exists. "
520 "This is an ephemeral or service port collision and "
521 "constitutes either a bug or an extremely unlikely condition.");
522
523 // Same reasoning as above: send RST, and let sock disappear.
524 async_no_sock_low_lvl_rst_send(syn, low_lvl_remote_endpoint);
525 return Peer_socket::Ptr();
526 } // if (that socket pair already exists)
527 // else
528
529 /* Try the packet send (as just below) again if SYN_ACK not acknowledged within a certain amount of
530 * time. Give up if that happens too many times.
531 * Follow same order of ops (schedule, send) as in the SYN case elsewhere. */
533
534 // Send SYN_ACK to continue the handshake. Save some *sock data first, as they are used in create_syn_ack().
535
536 /* Initial Sequence Number (the start of our own series).
537 * Remember it in case we must retransmit the SYN. (m_snd_next_seq_num may have been further increased by then.) */
538 Sequence_number& init_seq_num = sock->m_snd_init_seq_num;
540 // Same comment as when calling sock->m_snd_init_seq_num.set_metadata() elsewhere. See that.
541 init_seq_num.set_metadata('L',init_seq_num + 1, sock->max_block_size());
542 // Sequence number of first bit of actual data.
543 sock->m_snd_next_seq_num = init_seq_num + 1;
544 // Security token. Random number from entire numeric range. Remember it for later verification.
545 sock->m_security_token = m_rnd_security_tokens();
546 // Initial receive window is simply the entire empty Receive buffer.
547 sock->m_rcv_last_sent_rcv_wnd = sock_rcv_wnd(sock);
548
549 // Make a packet; fill out common fields in and asynchronously send it.
550 auto syn_ack = create_syn_ack(sock);
552
553 /* send will happen asynchronously, and the registered completion handler will execute in this
554 * thread when done (NO SOONER than this method finishes executing). */
555
556 // No more errors: Map socket pair to the socket data structure (kind of analogous to a TCP net-stack's TCB).
557 m_socks[socket_id] = sock;
558
559 // Also record it within the server socket (more comments inside this method).
560 serv_peer_socket_init(serv, sock);
561
562 // CLOSED -> SYN_RCVD.
564
565 return sock;
566} // Node::handle_syn_to_listening_server()
567
569 Peer_socket::Ptr sock,
570 boost::shared_ptr<const Syn_ack_ack_packet> syn_ack_ack)
571{
572 using boost::shared_ptr;
573
574 // We are in thread W.
575
576 /* We'd sent SYN_ACK and just got SYN_ACK_ACK. Assuming their SYN_ACK_ACK is valid, our side of
577 * connection can move to ESTABLISHED state, as theirs already has. */
578
579 FLOW_LOG_INFO("NetFlow worker thread continuing passive-connect of socket [" << sock << "]. "
580 "Received [" << syn_ack_ack->m_type_ostream_manip << "]; "
581 "security token [" << syn_ack_ack->m_packed.m_security_token << "].");
582
583 // First, validate their security token equals the one we've sent.
584 if (sock->m_security_token != syn_ack_ack->m_packed.m_security_token)
585 {
586 FLOW_LOG_WARNING("Received [" << syn_ack_ack->m_type_ostream_manip << "] targeted at state "
587 "[" << Peer_socket::Int_state::S_SYN_RCVD << "] socket [" << sock << "] "
588 "with mismatching security token "
589 "[" << syn_ack_ack->m_packed.m_security_token << "]; we had received and sent and expect "
590 "[" << sock->m_security_token << "]. Closing.");
591 /* Close connection in our structures (inform user if necessary as well). Pre-conditions
592 * assumed by call: sock in m_socks and sock->state() == S_OPEN (yes, since m_int_state ==
593 * S_SYN_RCVD); 3rd arg contains the reason for the close (yes). */
595 // ^-- defer_delta_check == true: for similar reason as at the end of this method.
596 return;
597 }
598 // else OK.
599
600 // No more errors.
601
602 // Move ourselves to connected state.
603
604 // The server socket to which the other side sent SYN to create the peer socket sock.
605 const Server_socket::Ptr serv = sock->m_originating_serv;
606
607 // Public state (thread-safe).
609 // Internal state. SYN_RCVD -> ESTABLISHED.
611
612 // Got the acknowledgment to SYN_ACK, so cancel retransmits and the timeout for that SYN_ACK.
613 cancel_timers(sock);
614
615 // Setup the Drop Timeout engine (m_snd_drop_timer).
617
618 // Add the peer socket to the server socket's accept queue (thread-safe)! accept() will return this.
619 serv_peer_socket_acceptable(serv, sock);
620 // BTW serv->m_originating_serv is now null.
621
622 // Record initial rcv_wnd; it should be the entire size of the other side's Receive buffer.
623 sock->m_snd_remote_rcv_wnd = syn_ack_ack->m_packed.m_rcv_wnd;
624
625 /* We may have queued up some DATA packets while we were SYN_RCVD (due to loss and/or
626 * re-ordering). See handle_data_to_syn_rcvd() for more information. So, handle the queued DATA
627 * packets as if they'd just arrived. */
628 for (shared_ptr<Data_packet> qd_packet : sock->m_rcv_syn_rcvd_data_q)
629 {
630 auto const logger_ptr = get_logger();
631 if (logger_ptr && logger_ptr->should_log(log::Sev::S_TRACE, get_log_component()))
632 {
634 ("Handling [" << qd_packet->m_type_ostream_manip << "] packet "
635 "received/queued in [" << Peer_socket::Int_state::S_SYN_RCVD << "] state; "
636 "packet data size = [" << qd_packet->m_data.size() << "].");
637
638 // Very verbose and CPU-intensive, especially DATA version!
639 if (logger_ptr->should_log(log::Sev::S_DATA, get_log_component()))
640 {
641 FLOW_LOG_DATA_WITHOUT_CHECKING("Readable representation is: "
642 "[\n" << qd_packet->m_verbose_ostream_manip << "].");
643 }
644 else
645 {
646 FLOW_LOG_TRACE_WITHOUT_CHECKING("Readable representation is: "
647 "[\n" << qd_packet->m_concise_ostream_manip << "].");
648 }
649 }
650
651 handle_data_to_established(socket_id, sock, qd_packet, true); // true <=> packet was queued during SYN_RCVD.
652 // qd_packet has probably been decimated for performance, so don't rely on qd_packet.m_data at this point!
653 }
654 if (!sock->m_rcv_syn_rcvd_data_q.empty())
655 {
656 FLOW_LOG_TRACE("Handled a total of [" << sock->m_rcv_syn_rcvd_data_q.size() << "] queued packets with "
657 "cumulative data size [" << sock->m_rcv_syn_rcvd_data_cumulative_size << "].");
658 }
659 sock->m_rcv_syn_rcvd_data_q.clear(); // Save memory.
660
661 /* Since we just added sock to serv's acceptable socket queue, certainly serv is now Acceptable.
662 * Therefore we should soon inform anyone waiting on any Event_sets for serv to become Acceptable.
663 *
664 * Caveat: The user could have called serv->accept() right after the previous statement in this
665 * method, which could indeed make serv not Acceptable again. That is OK. We only promise to
666 * inform the user of an event within a "non-blocking" amount of time of it occurring. If that
667 * same user decides to mess himself over by acting on these events prematurely, that is not our
668 * problem [assuming we don't crash things, which we do not]. Worst case is that the user will
669 * detect the event, try to accept() and get nothing [which is an eventuality for which any decent
670 * user code would prepare]. */
671
672 // Accumulate the event into the Node store (note: not any Event_set yet).
674 {
675 // Possibly inform the user for any applicable Event_sets right now.
677 /* ^-- defer_delta_check == true: because the only way to get to this method is from
678 * async_low_lvl_recv(), which will perform event_set_all_check_delta(false) at the end of itself,
679 * before the boost.asio handler exits. See Node::m_sock_events doc header for details. */
680 }
681
682 /* Do not m_sock_events[S_PEER_SOCKET_WRITABLE].insert(sock), as sock has not been accept()ed and
683 * therefore cannot be waited on currently. */
684} // Node::handle_syn_ack_ack_to_syn_rcvd()
685
687 boost::shared_ptr<Data_packet> packet)
688{
689 // We are in thread W.
690
691 /* We'd sent SYN_ACK, were waiting for SYN_ACK_ACK, but instead we got a DATA.
692 * This seems wrong at a first glance but can be legitimate. One possibility is they sent
693 * SYN_ACK_ACK and then some DATA, but the SYN_ACK_ACK was dropped (recall that the SYN_ACK_ACK is
694 * not itself acknowledged in our scheme). Another possibility is they sent both, but then DATA
695 * got re-ordered to in front of SYN_ACK_ACK.
696 *
697 * What does TCP do here? Well, it doesn't really have this problem, because every segment must
698 * have an ACK in it. So if a TCP gets data in SYN_RCVD, it must also contain the ACK to the
699 * SYN_ACK (what we call SYN_ACK_ACK) (in TCP, any packet without ACK is simply considered
700 * corrupt/invalid and would not be sent in the first place). So it's impossible to send data
701 * without acknowledging the SYN_ACK at the same time.
702 *
703 * For us, however ACK packets are independent of DATA packets, as are SYN_ACK_ACK packets.
704 * Therefore we should either drop these DATAs and hope whatever reliability implementation is
705 * used restores them later, or we should queue them for consumption when ESTABLISHED arrives.
706 * Let's do the latter. It's hard enough to deal with actual loss; introducing loss when we
707 * actually have the stuff seems absurd.
708 *
709 * So we just save them in a packet queue, and when we're ESTABLISHED we feed all the packets to
710 * handle_incoming() as if they'd just arrived. The only caveat is size of this queue. Since
711 * we have a maximum on the Receive buffer (sock->m_rcv_buf) and the packets-with-gaps structure
712 * (sock->m_rcv_packets_with_gaps), we must have one here as well. Since Receive buffer is
713 * empty until ESTABLISHED, it seems natural to limit this queue's cumulative byte size
714 * according to the limit imposed on Receive buffer. (There is some extra overhead to store the
715 * packet header info, but it's close enough.) After that, as when the Receive buffer fills up,
716 * we drop packets. */
717
718 assert(sock->m_int_state == Peer_socket::Int_state::S_SYN_RCVD);
719 const bool first_time = sock->m_rcv_syn_rcvd_data_q.empty();
720
721 // Not a WARNING, because we didn't do anything wrong; could be network conditions; and avoid verbosity after 1st one.
723 "NetFlow worker thread received [" << packet->m_type_ostream_manip << "] packet while "
724 "in [" << Peer_socket::Int_state::S_SYN_RCVD << "] state for [" << sock << "]; "
725 "saving for processing later when in [" << Peer_socket::Int_state::S_ESTABLISHED << "] "
726 "state; packet data size = [" << packet->m_data.size() << "]; "
727 "first time? = [" << first_time << "].");
728
729 if (first_time)
730 {
731 sock->m_rcv_syn_rcvd_data_cumulative_size = 0; // It's garbage at the moment.
732 }
733 else if ((sock->m_rcv_syn_rcvd_data_cumulative_size + packet->m_data.size())
734 > sock->opt(sock->m_opts.m_st_snd_buf_max_size))
735 {
736 // Not a WARNING, because we didn't do anything wrong; could be network conditions.
737 FLOW_LOG_INFO("NetFlow worker thread received [" << packet->m_type_ostream_manip << "] packet while "
738 "in [" << Peer_socket::Int_state::S_SYN_RCVD << "] state for [" << sock << "]; "
739 "dropping because Receive queue full at [" << sock->m_rcv_syn_rcvd_data_cumulative_size << "].");
740 return;
741 }
742 // else
743
744 sock->m_rcv_syn_rcvd_data_cumulative_size += packet->m_data.size();
745 sock->m_rcv_syn_rcvd_data_q.push_back(packet); // Note that this is not a copy of the packet (just a pointer).
746
747 FLOW_LOG_TRACE("Receive queue now has [" << sock->m_rcv_syn_rcvd_data_q.size() << "] packets; "
748 "cumulative data size is [" << sock->m_rcv_syn_rcvd_data_cumulative_size << "].");
749} // Node::handle_data_to_syn_rcvd()
750
752 const Error_code& disconnect_cause, bool close)
753{
754 /* @todo Nothing calls this yet, as we don't support any way to close a Server_socket yet.
755 * Probably will reconsider this method when we do. */
756
757 Server_socket::Lock_guard lock(serv->m_mutex);
758 serv->m_disconnect_cause = disconnect_cause;
759 if (close)
760 {
761 // DONE.
762 serv_set_state(serv, Server_socket::State::S_CLOSED); // Reentrant mutex => OK.
763 }
764 else
765 {
766 // This socket is screwed but not yet out of the Node's system.
767 serv_set_state(serv, Server_socket::State::S_CLOSING); // Reentrant mutex => OK.
768 }
769}
770
772{
773 using std::list;
774
775 // We are in thread W.
776
777 /* sock is in one of two stages:
778 * - stage 1: serv->m_connecting_socks (not available via accept()), the earlier stage;
779 * - stage 2: serv->m_unaccepted_socks (available via accept()), the later stage.
780 *
781 * Try stage 1, then stage 2. */
782
783 // Stage 1.
784 const bool erased = serv->m_connecting_socks.erase(sock) == 1;
785 if (erased)
786 {
787 /* Maintain invariant. No need to lock mutex, because sock is in serv->m_connecting_socks, which
788 * means it is not in serv->m_unaccepted_socks yet, which means accept() cannot yield it, which means
789 * no non-W thread could be accessing m_originating_serv at the same time. */
790 sock->m_originating_serv.reset();
791 return;
792 }
793 // else
794
795 // Stage 2.
796
797 /* Remove from serv->m_unaccepted_socks. At this point accept() can access serv->m_unaccepted_socks and
798 * m_originating_serv, so we must lock. */
799 Server_socket::Lock_guard lock(serv->m_mutex);
800
801 sock->m_originating_serv.reset(); // Maintain invariant.
802
803 // O(1)ish.
804 serv->m_unaccepted_socks.erase(sock);
805
806 /* Notes:
807 *
808 * The unaccepted socket queue of serv can be accessed by accept()ing threads outside
809 * of thread W. So we must lock object at least to avoid corruption. We do that above.
810 *
811 * Now, let's think about the race. Suppose close_connection_immediately() starts and wins
812 * the race to lock *this; removes sock from serv->m_unaccepted_socks; unlocks *this; then the
813 * user immediately gets to call accept(). The user will not get sock as the result of the
814 * accept(), as we'd removed it in time. Good. Now suppose close_connection_immediately() starts
815 * but loses the race to lock *sock; user calls accept() first, and accept() yields sock (in
816 * S_ESTABLISHED state, though with empty Receive buffer, which is a pre-condition for
817 * close_connection_immediately()); then we lock sock and remove sock from
818 * serv->m_unaccepted_socks. Is this OK? Well, it is not different from this situation: they
819 * accept()ed, and then quickly there was an error on the resulting socket, so we closed it
820 * before any data came in. Therefore, yes, this is also OK. */
821} // Node::serv_peer_socket_closed()
822
824{
825 // We are in thread W.
826
827 {
828 Server_socket::Lock_guard lock(serv->m_mutex);
829 serv->m_unaccepted_socks.insert(sock); // Remember that Linked_hash_set<> insert()s at the *front*.
830 }
831 // This guy is only to be accessed from thread W (which we're in), so no lock needed.
832 serv->m_connecting_socks.erase(sock);
833}
834
836{
837 // We are in thread W.
838
839 /* Add this connecting socket to the pool of such connecting sockets maintained by the
840 * Server_socket. Once it is fully established, it will move from here to the queue
841 * serv->m_unaccepted_socks, where users can claim it via accept(). The utility of serv->m_unaccepted_socks
842 * is obvious, but why keep serv->m_connecting_socks? After all we've already added sock to m_socks, so
843 * demultiplexing of those messages (like SYN_ACK_ACK) will work without any additional structure.
844 * Answer: we need serv->m_connecting_socks at least for the case where we or the user close the
845 * listening socket (serv). In this case all pending connections must be aborted via RST (to let
846 * the other side know), and we'd know which ones to contact via serv->m_connecting_socks.
847 * The disallowing of accept()s after the associated listen() has been canceled is discussed in
848 * Server_socket documentation, but in short that behavior is similar to the BSD sockets
849 * behavior. */
850 serv->m_connecting_socks.insert(sock);
851
852 // And vice versa: maintain the invariant.
853 sock->m_originating_serv = serv;
854
855 // We didn't lock, because socket not yet available via accept(), so not accessed from non-W threads.
856} // Node::serv_peer_socket_init()
857
858Server_socket* Node::serv_create(const Peer_socket_options* child_sock_opts) // Virtual.
859{
860 // Just make a regular net_flow::Server_socket.
861 return serv_create_forward_plus_ctor_args<Server_socket>(child_sock_opts);
862}
863
864// Free implementations.
865
866std::ostream& operator<<(std::ostream& os, const Server_socket* serv)
867{
868 return
869 serv
870 ? (os
871 << "NetFlow_server [NetFlow [:" << serv->local_port() << "]] @" << static_cast<const void*>(serv))
872 : (os << "NetFlow_server@null");
873}
874
875/// @cond
876/* -^- Doxygen, please ignore the following. (Don't want docs generated for temp macro; this is more maintainable
877 * than specifying the macro name to omit it, in Doxygen-config EXCLUDE_SYMBOLS.) */
878
879// That's right, I did this. Wanna fight about it?
880#define STATE_TO_CASE_STATEMENT(ARG_state) \
881 case Server_socket::State::S_##ARG_state: \
882 return os << #ARG_state
883
884// -v- Doxygen, please stop ignoring.
885/// @endcond
886
887std::ostream& operator<<(std::ostream& os, Server_socket::State state)
888{
889 switch (state)
890 {
891 STATE_TO_CASE_STATEMENT(LISTENING);
892 STATE_TO_CASE_STATEMENT(CLOSING);
893 STATE_TO_CASE_STATEMENT(CLOSED);
894 }
895 return os;
896#undef STATE_TO_CASE_STATEMENT
897}
898
899} // namespace flow::net_flow
const Component & get_log_component() const
Returns reference to the stored Component object, particularly as many FLOW_LOG_*() macros expect.
Definition: log.cpp:222
Logger * get_logger() const
Returns the stored Logger pointer, particularly as many FLOW_LOG_*() macros expect.
Definition: log.cpp:217
Interface that the user should implement, passing the implementing Logger into logging classes (Flow'...
Definition: log.hpp:1284
static Congestion_control_strategy * create_strategy(Strategy_choice strategy_choice, log::Logger *logger_ptr, Peer_socket::Const_ptr sock)
Factory method that, given an enum identifying the desired strategy, allocates the appropriate Conges...
Definition: cong_ctl.cpp:101
@ S_SERVER_SOCKET_ACCEPTABLE
Event type specifying the condition of interest wherein a target Server_socket serv is such that call...
An object of this class is a single Flow-protocol networking node, in the sense that: (1) it has a di...
Definition: node.hpp:934
void serv_peer_socket_init(Server_socket::Ptr serv, Peer_socket::Ptr sock)
Records a new (just received SYN) peer socket from the given server socket.
void handle_data_to_established(const Socket_id &socket_id, Peer_socket::Ptr sock, boost::shared_ptr< Data_packet > packet, bool syn_rcvd_qd_packet)
Handles a just-deserialized, just-demultiplexed, low-level DATA packet delivered to the given peer so...
virtual Server_socket * serv_create(const Peer_socket_options *child_sock_opts)
Internal factory used for ALL Server_socket objects created by this Node (including subclasses).
void serv_set_state(Server_socket::Ptr serv, Server_socket::State state)
Sets Server_socket::m_state.
static bool ensure_sock_open(Socket_ptr sock, Error_code *err_code)
Helper method that checks whether the given Peer_socket or Server_socket is CLOSED; if so,...
Definition: node.hpp:4099
void handle_syn_ack_ack_to_syn_rcvd(const Socket_id &socket_id, Peer_socket::Ptr sock, boost::shared_ptr< const Syn_ack_ack_packet > syn_ack_ack)
Handles a just-deserialized, just-demultiplexed low-level SYN_ACK_ACK packet delivered to the given p...
Non_blocking_func_ret_type sync_op(typename Socket::Ptr sock, const Function< Non_blocking_func_ret_type()> &non_blocking_func, Non_blocking_func_ret_type would_block_ret_val, Event_set::Event_type ev_type, const Fine_time_pt &wait_until, Error_code *err_code)
Implementation of core blocking transfer methods, namely Peer_socket::sync_send(),...
Definition: node.hpp:3935
void serv_close_detected(Server_socket::Ptr serv, const Error_code &disconnect_cause, bool close)
Records that thread W shows this socket is not to listen to incoming connections and is to abort any ...
Server_socket::Ptr listen(flow_port_t local_port, Error_code *err_code=0, const Peer_socket_options *child_sock_opts=0)
Sets up a server on the given local Flow port and returns Server_socket which can be used to accept s...
void cancel_timers(Peer_socket::Ptr sock)
Cancel any timers and scheduled tasks active in the given socket.
bool sock_validate_options(const Peer_socket_options &opts, const Peer_socket_options *prev_opts, Error_code *err_code) const
Analogous to validate_options() but checks per-socket options instead of per-Node options.
Options_mutex m_opts_mutex
The mutex protecting m_opts.
Definition: node.hpp:3665
bool serv_is_acceptable(const boost::any &serv_as_any) const
Returns true if and only if calling serv->accept() with at least some arguments would return either n...
Syn_ack_packet::Ptr create_syn_ack(Peer_socket::Const_ptr sock)
Like create_syn() but for SYN_ACK.
virtual Peer_socket * sock_create(const Peer_socket_options &opts)
Internal factory used for ALL Peer_socket objects created by this Node (including subclasses).
void async_no_sock_low_lvl_rst_send(Low_lvl_packet::Const_ptr causing_packet, const util::Udp_endpoint &low_lvl_remote_endpoint)
Sends an RST to the given UDP endpoint in response to the given incoming low-level packet that came f...
Definition: low_lvl_io.cpp:580
Sequence_number::Generator m_seq_num_generator
Sequence number generator (at least to generate ISNs). Only thread W can access this.
Definition: node.hpp:3738
void event_set_all_check_delta(bool defer_delta_check)
For each WAITING Event_set within the Node: checks for any events that hold, and if any do hold,...
Definition: event_set.cpp:1127
void serv_peer_socket_closed(Server_socket::Ptr serv, Peer_socket::Ptr sock)
Records that a Server_socket-contained (i.e., currently un-established, or established but not yet ac...
Socket_id_to_socket_map m_socks
The peer-to-peer connections this Node is currently tracking.
Definition: node.hpp:3750
Peer_socket::Options_lock Options_lock
Short-hand for lock that acquires exclusive access to an Options_mutex.
Definition: node.hpp:1436
static Socket_id socket_id(Peer_socket::Const_ptr sock)
Constructs the socket pair (connection ID) for the given socket.
void sock_set_int_state(Peer_socket::Ptr sock, Peer_socket::Int_state new_state)
Sets internal state of given socket to the given state and logs a TRACE message about it.
Peer_socket::Ptr handle_syn_to_listening_server(Server_socket::Ptr serv, boost::shared_ptr< const Syn_packet > syn, const util::Udp_endpoint &low_lvl_remote_endpoint)
Handles a just-deserialized, just-demultiplexed low-level SYN packet delivered to the given server so...
bool running() const
Returns true if and only if the Node is operating.
Definition: node.cpp:420
Port_to_server_map m_servs
The server sockets this Node is currently tracking.
Definition: node.hpp:3756
Event_set::Ev_type_to_socks_map m_sock_events
All sockets that have been detected to be "ready" (by the Event_set doc header definition) at any poi...
Definition: node.hpp:3788
void setup_drop_timer(const Socket_id &socket_id, Peer_socket::Ptr sock)
Creates a new Drop Timer and saves it to sock->m_snd_drop_timer.
void listen_worker(flow_port_t local_port, const Peer_socket_options *child_sock_opts, Server_socket::Ptr *serv)
Thread W implementation of listen().
void serv_peer_socket_acceptable(Server_socket::Ptr serv, Peer_socket::Ptr sock)
Records that an unestablished socket sock (Peer_socket::Int_state::S_SYN_RCVD) has just become establ...
void handle_data_to_syn_rcvd(Peer_socket::Ptr sock, boost::shared_ptr< Data_packet > packet)
Handles a just-deserialized, just-demultiplexed, low-level DATA packet delivered to the given peer so...
void setup_connection_timers(const Socket_id &socket_id, Peer_socket::Ptr sock, bool initial)
Assuming we've just sent SYN or SYN_ACK, sets up an asynchronous scheduled task to fire within some a...
size_t sock_rcv_wnd(Peer_socket::Const_ptr sock) const
Computes and returns the currently correct rcv_wnd value; that is the amount of space free in Receive...
Peer_socket::Ptr accept(Server_socket::Ptr serv, Error_code *err_code)
Implementation of non-blocking serv->accept() for server socket serv in all cases except when serv->s...
void sock_set_state(Peer_socket::Ptr sock, Peer_socket::State state, Peer_socket::Open_sub_state open_sub_state=Peer_socket::Open_sub_state::S_CONNECTED)
Sets Peer_socket::m_state and Peer_socket::m_open_sub_state.
Node_options m_opts
This Node's global set of options.
Definition: node.hpp:3662
void close_empty_server_immediately(const flow_port_t local_port, Server_socket::Ptr serv, const Error_code &err_code, bool defer_delta_check)
Handles the transition of the given server socket from S_LISTENING/S_CLOSING to S_CLOSED (including e...
util::Rnd_gen_uniform_range< Peer_socket::security_token_t > m_rnd_security_tokens
Random number generator for picking security tokens; seeded on time at Node construction and generate...
Definition: node.hpp:3744
void async_sock_low_lvl_packet_send_paced(const Peer_socket::Ptr &sock, Low_lvl_packet::Ptr &&packet)
Begins the process of asynchronously sending the given low-level packet to the remote Node specified ...
Definition: low_lvl_io.cpp:599
util::Task_engine m_task_engine
The main loop engine, functioning in the single-threaded-but-asynchronous callback-based "reactor" st...
Definition: node.hpp:3697
Port_space m_ports
Flow port space for both client and server sockets. All threads may access this.
Definition: node.hpp:3735
void rst_and_close_connection_immediately(const Socket_id &socket_id, Peer_socket::Ptr sock, const Error_code &err_code, bool defer_delta_check)
Asynchronously send RST to the other side of the given socket and close_connection_immediately().
@ S_OPEN
Future reads or writes may be possible. A socket in this state may be Writable or Readable.
@ S_CONNECTED
This Peer_socket was created through a passive connect (Node::accept() and the like) or an active con...
@ S_CONNECTING
This Peer_socket was created through an active connect (Node::connect() and the like),...
util::Lock_guard< Mutex > Lock_guard
Short-hand for RAII lock guard of Mutex.
@ S_ESTABLISHED
Public state is OPEN+CONNECTED; in our opinion the connection is established.
@ S_CLOSED
Closed (dead or new) socket.
@ S_SYN_RCVD
Public state is OPEN+CONNECTING; other side requested passive connect via SYN; we sent SYN_ACK and ar...
flow_port_t reserve_port(flow_port_t port, Error_code *err_code)
Reserve the specified service port, or reserve_ephemeral_port() if the specified port is S_PORT_ANY.
Definition: port_space.cpp:75
void return_port(flow_port_t port, Error_code *err_code)
Return a previously reserved port (of any type).
Definition: port_space.cpp:175
A per-Peer_socket module that tries to estimate the bandwidth available to the outgoing flow.
Definition: bandwidth.hpp:125
Sequence_number generate_init_seq_num()
Returns an initial sequence number (ISN) for use in a new connection.
Definition: seq_num.cpp:48
An internal net_flow sequence number identifying a piece of data.
Definition: seq_num.hpp:126
void set_metadata(char num_line_id=0, const Sequence_number &zero_point=Sequence_number(), seq_num_delta_t multiple_size=0)
Updates the full set of metadata (used at least for convenient convention-based logging but not actua...
Definition: seq_num.cpp:268
A server socket able to listen on a single Flow port for incoming connections and return peer sockets...
Peer_socket_ptr sync_accept(const boost::chrono::duration< Rep, Period > &max_wait, bool reactor_pattern=false, Error_code *err_code=0)
Blocking (synchronous) version of accept().
Server_socket(log::Logger *logger, const Peer_socket_options *child_sock_opts)
Constructs object; initializes most values to well-defined (0, empty, etc.) but not necessarily meani...
Mutex m_mutex
This object's mutex.
Error_code disconnect_cause() const
The error code that perviously caused state() to become State::S_CLOSED, or success code if state is ...
Peer_socket_options const *const m_child_sock_opts
Either null or the pointer to a copy of the template Peer_socket_options intended for resulting Peer_...
State m_state
See state(). Should be set before user gets access to *this. Must not be modified by non-W threads.
Peer_socket_ptr sync_accept_impl(const Fine_time_pt &wait_until, bool reactor_pattern, Error_code *err_code)
Same as sync_accept() but uses a Fine_clock-based Fine_duration non-template type for implementation ...
State
State of a Server_socket.
@ S_CLOSED
No accept()s are or will be possible, AND Node has disowned the Server_socket.
@ S_CLOSING
No accept()s are or will be possible, but Node is still finishing up the closing operation.
@ S_LISTENING
Future or current accept()s may be possible. A socket in this state may be Acceptable.
State state() const
Current State of the socket.
flow_port_t m_local_port
See local_port().
~Server_socket() override
Boring virtual destructor. Note that deletion is to be handled exclusively via shared_ptr,...
flow_port_t local_port() const
The local Flow-protocol port on which this server is or was listening.
util::Lock_guard< Mutex > Lock_guard
Short-hand for RAII lock guard of Mutex.
Node * node() const
Node that produced this Server_socket.
Error_code m_disconnect_cause
The Error_code causing this server's move from LISTENING state (if this has occurred); otherwise a cl...
Peer_socket_ptr accept(Error_code *err_code=0)
Non-blocking accept: obtain socket for the least recently established not-yet-obtained peer connectio...
static Const_ptr const_ptr_cast(const From_ptr &ptr_to_cast)
Identical to ptr_cast() but adds const-ness (immutability) to the pointed-to type.
static Ptr ptr_cast(const From_ptr &ptr_to_cast)
Provides syntactic-sugary way to perform a static_pointer_cast<> from a compatible smart pointer type...
boost::shared_ptr< Peer_socket > Ptr
Short-hand for ref-counted pointer to mutable values of type Target_type::element_type (a-la T*).
Const_target_ptr Const_ptr
Short-hand for ref-counted pointer to immutable values of type Target_type::element_type (a-la T cons...
#define FLOW_ERROR_LOG_ERROR(ARG_val)
Logs a warning about the given error code using FLOW_LOG_WARNING().
Definition: error.hpp:233
#define FLOW_ERROR_EXEC_AND_THROW_ON_ERROR(ARG_ret_type, ARG_function_name,...)
Narrow-use macro that implements the error code/exception semantics expected of most public-facing Fl...
Definition: error.hpp:363
#define FLOW_ERROR_EMIT_ERROR(ARG_val)
Sets *err_code to ARG_val and logs a warning about the error using FLOW_LOG_WARNING().
Definition: error.hpp:202
#define FLOW_ERROR_EMIT_ERROR_LOG_INFO(ARG_val)
Identical to FLOW_ERROR_EMIT_ERROR(), but the message logged has flow::log::Sev::S_INFO severity inst...
Definition: error.hpp:218
#define FLOW_LOG_INFO(ARG_stream_fragment)
Logs an INFO message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
Definition: log.hpp:197
#define FLOW_LOG_WARNING(ARG_stream_fragment)
Logs a WARNING message into flow::log::Logger *get_logger() with flow::log::Component get_log_compone...
Definition: log.hpp:152
#define FLOW_LOG_WITH_CHECKING(ARG_sev, ARG_stream_fragment)
Logs a message of the specified severity into flow::log::Logger *get_logger() with flow::log::Compone...
Definition: log.hpp:489
#define FLOW_LOG_TRACE_WITHOUT_CHECKING(ARG_stream_fragment)
Logs a TRACE message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
Definition: log.hpp:354
#define FLOW_LOG_DATA_WITHOUT_CHECKING(ARG_stream_fragment)
Logs a DATA message into flow::log::Logger *get_logger() with flow::log::Component get_log_component(...
Definition: log.hpp:372
#define FLOW_LOG_TRACE(ARG_stream_fragment)
Logs a TRACE message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
Definition: log.hpp:227
Synchronicity
Enumeration indicating the manner in which asio_exec_ctx_post(), and various boost....
Definition: async_fwd.hpp:223
void asio_exec_ctx_post(log::Logger *logger_ptr, Execution_context *exec_ctx, Synchronicity synchronicity, Task &&task)
An extension of boost.asio's post() and dispatch() free function templates, this free function templa...
Definition: util.hpp:31
@ S_DATA
Message satisfies Sev::S_TRACE description AND contains variable-length structure (like packet,...
@ S_TRACE
Message indicates any condition that may occur with great frequency (thus verbose if logged).
@ S_INFO
Message indicates a not-"bad" condition that is not frequent enough to be of severity Sev::S_TRACE.
@ S_CONN_RESET_BAD_PEER_BEHAVIOR
Connection reset because of unexpected/illegal behavior by the other side.
@ S_INTERNAL_ERROR_PORT_COLLISION
Internal error: Ephemeral port double reservation allowed.
@ S_NODE_NOT_RUNNING
Node not running.
Flow module containing the API and implementation of the Flow network protocol, a TCP-inspired stream...
Definition: node.cpp:25
uint16_t flow_port_t
Logical Flow port type (analogous to a UDP/TCP port in spirit but in no way relevant to UDP/TCP).
const flow_port_t S_PORT_ANY
Special Flow port value used to indicate "invalid port" or "please pick a random available ephemeral ...
Definition: port_space.cpp:33
std::ostream & operator<<(std::ostream &os, const Congestion_control_selector::Strategy_choice &strategy_choice)
Serializes a Peer_socket_options::Congestion_control_strategy_choice enum to a standard ostream – the...
Definition: cong_ctl.cpp:146
bool key_exists(const Container &container, const typename Container::key_type &key)
Returns true if and only if the given key is present at least once in the given associative container...
Definition: util.hpp:301
Blob_with_log_context<> Blob
A concrete Blob_with_log_context that compile-time-disables Basic_blob::share() and the sharing API d...
Definition: blob_fwd.hpp:60
boost::asio::ip::udp::endpoint Udp_endpoint
Short-hand for the UDP endpoint (IP/port) type.
Definition: util_fwd.hpp:208
boost::system::error_code Error_code
Short-hand for a boost.system error code (which basically encapsulates an integer/enum error code and...
Definition: common.hpp:508
Flow_log_component
The flow::log::Component payload enumeration comprising various log components used by Flow's own int...
Definition: common.hpp:638
Fine_clock::time_point Fine_time_pt
A high-res time point as returned by Fine_clock::now() and suitable for precise time math in general.
Definition: common.hpp:413
The data nugget uniquely identifying a peer-to-peer connection from a remote endpoint to a port in th...
Definition: node.hpp:3862
Peer_socket_options m_dyn_sock_opts
The set of per-Peer_socket options in this per-Node set of options.
Definition: options.hpp:580
A set of low-level options affecting a single Peer_socket.
Definition: options.hpp:36
Represents the remote endpoint of a Flow-protocol connection; identifies the UDP endpoint of the remo...
Definition: endpoint.hpp:93