Flow 1.0.0
Flow project: Full implementation reference.
server_socket.cpp
Go to the documentation of this file.
1/* Flow
2 * Copyright 2023 Akamai Technologies, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the
5 * "License"); you may not use this file except in
6 * compliance with the License. You may obtain a copy
7 * of the License at
8 *
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in
12 * writing, software distributed under the License is
13 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
14 * CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing
16 * permissions and limitations under the License. */
17
18/// @file
23#include "flow/async/util.hpp"
24
25namespace flow::net_flow
26{
27
28// Server_socket implementation.
29
30Server_socket::Server_socket(log::Logger* logger_ptr, const Peer_socket_options* child_sock_opts) :
31 Log_context(logger_ptr, Flow_log_component::S_NET_FLOW),
32 /* If they supplied a Peer_socket_options, store a copy of it. When new Peer_sockets are made
33 * (when people connect to us), each peer socket's per-socket options will be copies of this. If
34 * they did not supply a Peer_socket_options, the Node's global Peer_socket_options will be used
35 * for each subsequent Peer_socket. */
36 m_child_sock_opts(child_sock_opts ? new Peer_socket_options(*child_sock_opts) : 0),
37 m_state(State::S_CLOSED), // Incorrect; set explicitly.
38 m_node(0), // Incorrect; set explicitly.
39 m_local_port(S_PORT_ANY) // Incorrect; set explicitly.
40{
41 // Only print pointer value, because most members are garbage at this point.
42 FLOW_LOG_TRACE("Server_socket [" << static_cast<void*>(this) << "] created.");
43}
44
46{
47 delete m_child_sock_opts; // May be 0 (that's okay).
48
49 FLOW_LOG_TRACE("Server_socket [" << this << "] destroyed.");
50}
51
53{
54 Lock_guard lock(m_mutex); // State is liable to change at any time.
55 return m_state;
56}
57
59{
60 Lock_guard lock(m_mutex); // m_node can simultaneously change to 0 if state changes to S_CLOSED.
61 return m_node;
62}
63
65{
66 Lock_guard lock(m_mutex);
67 return m_disconnect_cause;
68}
69
71{
72 return m_local_port; // No need to lock (it never changes).
73}
74
76{
78 // ^-- Call ourselves and return if err_code is null. If got to present line, err_code is not null.
79
80 // We are in user thread U != W.
81
82 Lock_guard lock(m_mutex);
83
84 const Ptr serv = shared_from_this();
85 if (!Node::ensure_sock_open(serv, err_code)) // Ensure it's open, so that we can access m_node.
86 {
87 return Peer_socket::Ptr();
88 }
89 // else m_node is valid.
90
91 // Forward the rest of the logic to Node, as is the general convention especially for logic affecting outside *this.
92 return m_node->accept(serv, err_code);
93} // Server_socket::accept()
94
96{
97 return sync_accept_impl(Fine_time_pt(), reactor_pattern, err_code);
98}
99
100Peer_socket::Ptr Server_socket::sync_accept_impl(const Fine_time_pt& wait_until, bool reactor_pattern,
101 Error_code* err_code)
102{
103 namespace bind_ns = util::bind_ns;
104 using bind_ns::bind;
105 using boost::adopt_lock;
106
108 bind_ns::cref(wait_until), reactor_pattern, _1);
109 // ^-- Call ourselves and return if err_code is null. If got to present line, err_code is not null.
110
111 // We are in user thread U != W.
112
113 Lock_guard lock(m_mutex);
114
115 const Ptr serv = shared_from_this();
116 if (!Node::ensure_sock_open(serv, err_code)) // Ensure it's open, so that we can access m_node.
117 {
118 return Peer_socket::Ptr();
119 }
120 // else m_node is valid.
121
122 lock.release(); // Release lock (mutex is still LOCKED). sync_op() takes over holding the lock and unlocking.
123
124 // See comment in Peer_socket::node_sync_send().
125
126 /* Operating on Server_sockets, returning Peer_socket::Ptr; Event_set socket set type is
127 * Server_sockets.
128 * Object is serv; non-blocking operation is m_node->accept(...) -- or N/A in "reactor pattern" mode..
129 * Peer_socket::Ptr() is the "would-block" return value for this operation.
130 * S_SERVER_SOCKET_ACCEPTABLE is the type of event to watch for here. */
131 return m_node
133 (serv,
134 reactor_pattern
136 : Function<Peer_socket::Ptr ()>([this, serv, err_code]() -> Peer_socket::Ptr
137 { return m_node->accept(serv, err_code); }),
139 wait_until, err_code);
140} // Server_socket::sync_accept_impl()
141
142// Node implementations (methods dealing with individual Server_sockets).
143
145 const Peer_socket_options* child_sock_opts)
146{
147 FLOW_ERROR_EXEC_AND_THROW_ON_ERROR(Server_socket::Ptr, Node::listen, local_port, _1, child_sock_opts);
148 // ^-- Call ourselves and return if err_code is null. If got to present line, err_code is not null.
149
152 using boost::promise;
153 using boost::unique_future;
154
155 // We are in thread U != W.
156
157 if (!running())
158 {
160 return Server_socket::Ptr();
161 }
162 // else
163
164 /* Basically we now need to do the following.
165 * -1- Reserve local_port in m_ports.
166 * -2- Create Server_socket serv.
167 * -3- Save serv in m_servs.
168 * (Both -1- and -3- can result in error.)
169 *
170 * -2- must be done in thread W, as m_servs is by design only to be accessed there. -1-, however,
171 * can be done in thread U, assuming m_ports is properly synchronized. So here are our choices:
172 *
173 * I. Perform -1- in U. If error, return. Perform -2-. Post callback to do -3- in W. Return
174 * serv in U. Meanwhile -2- will, at some point, be performed in W. If error in -3-, save it
175 * in serv. User will discover when trying to serv->accept().
176 *
177 * II. Perform -1- in U. If error, return. Perform -2-. Post callback to do -3- in W. Use a
178 * future/promise pair to wait for -3- to complete. Meanwhile -3- will, at some point, be performed
179 * in W. If error in -3-, save it in serv. Either way, U will wait for the result and return
180 * error or serv to the user.
181 *
182 * III. Post callback to do -1-, -2-, -3- in W. Use a future/promise pair to wait for -1-3- to complete.
183 * Meanwhile -1-3- will, at some point, be performed in W. If error in -1-3-, save it in
184 * serv. Either way, U will wait. If error, return error to the user. Else return serv to
185 * the user.
186 *
187 * Let's pick one. III > II due to simpler code; future used either way but most code is in one
188 * thread and one function (the W callback); the U part is a short wrapper. Also, Port_space need
189 * not be synchronized. So it's I vs. III. Advantage of I is speed; listen() will return in U
190 * faster, especially if thread W is loaded with work. (However III is still non-blocking, i.e.,
191 * no waiting for network events.) Advantage of III is simplicity of code (everything in one
192 * thread and callback, in W; no explicit locking); and any pre-networking error is immediately
193 * returned by listen() instead of being discovered later. I believe here III wins, especially
194 * because listen() should be fairly infrequent, so a split second speed difference should not be
195 * significant.
196 *
197 * So we choose III. We set up connect_worker() to run in W and wait for
198 * it to succeed or fail. asio_exec_ctx_post() does the promise/future stuff, or equivalent, so
199 * the code is really simple. */
200
201 // Load this onto thread W boost.asio work queue. We won't return until it's done, so [&] is OK.
203 asio_exec_ctx_post(get_logger(), &m_task_engine, Synchronicity::S_ASYNC_AND_AWAIT_CONCURRENT_COMPLETION,
204 [&]() { listen_worker(local_port, child_sock_opts, &serv); });
205 // If got here, the task has completed in thread W and signaled us to that effect.
206
207 // listen_worker() indicates success or failure through this data member.
208 if (serv->m_disconnect_cause)
209 {
210 *err_code = serv->m_disconnect_cause;
211 return Server_socket::Ptr(); // serv will go out of scope and thus will be destroyed.
212 }
213 // else
214 err_code->clear();
215 return serv;
216} // Node::listen()
217
218void Node::listen_worker(flow_port_t local_port, const Peer_socket_options* child_sock_opts,
219 Server_socket::Ptr* serv_ptr)
220{
221 assert(serv_ptr);
222
223 // We are in thread W. listen() is waiting for us to set serv_promise in thread U.
224
225 // Create new socket and set all members that may be immediately accessed by user from thread U after we're done.
226
227 auto& serv = *serv_ptr;
228 if (child_sock_opts)
229 {
230 /* They provided custom per-socket options to distribute to any Peer_sockets created when people
231 * connect to this server. Before we give those to the new server socket, let's validate them
232 * (for proper values and internal consistency, etc.). */
233
234 Error_code err_code;
235 const bool opts_ok = sock_validate_options(*child_sock_opts, 0, &err_code);
236
237 // Due to the advertised interface of the current method, we must create a socket even on error.
238 serv.reset(serv_create(child_sock_opts));
239
240 // Now report error if indeed options were invalid. err_code is already set and logged in that case.
241 if (!opts_ok)
242 {
243 serv->m_disconnect_cause = err_code;
244 return;
245 }
246 // else
247 }
248 else
249 {
250 /* More typically, they did not provide per-socket options. So just pass null pointer into
251 * Peer_socket constructor; this will mean that when a Peer_socket is generated on connection,
252 * the code is to provide a copy of the global template for the per-socket options. That will
253 * happen later; we just pass in null. */
254 serv.reset(serv_create(0));
255 }
256
257 // Server socket created; set members.
258
259 serv->m_node = this;
261
262 // Allocate given port.
263 serv->m_local_port = m_ports.reserve_port(local_port, &serv->m_disconnect_cause);
264 if (serv->m_local_port == S_PORT_ANY)
265 {
266 // Error already logged and is in serv->m_disconnect_cause.
267 return;
268 }
269 // else
270 local_port = serv->m_local_port; // If they'd specified S_PORT_ANY, this is now a random port.
271
272 FLOW_LOG_INFO("NetFlow worker thread listening for passive-connects on [" << serv << "].");
273
274 if (util::key_exists(m_servs, local_port))
275 {
276 /* This is a passive connect (we're accepting future connections). Therefore in particular it
277 * should be impossible that our local_port() equals an already existing connection's
278 * local_port(); Port_space is supposed to prevent the same port from being handed out to more
279 * than one connection. Therefore this must be a programming error. */
280
281 FLOW_LOG_WARNING("Cannot set up [" << serv << "], because server at port [" << local_port << "] already exists! "
282 "This is a port reservation error and constitutes either a bug or an extremely "
283 "unlikely condition.");
284
285 // Mark/log error.
286 Error_code* err_code = &serv->m_disconnect_cause;
288
289 // Return port.
290 Error_code return_err_code;
291 m_ports.return_port(serv->m_local_port, &return_err_code);
292 assert(!return_err_code);
293 return;
294 } // if (that server entry already exists)
295 // else
296
297 m_servs[local_port] = serv; // Now SYNs will be accepted.
298} // Node::listen_worker()
299
301{
302 // We are in user thread U != W.
303
304 // IMPORTANT: The logic here must be consistent with serv_is_acceptable().
305
306 // serv->m_mutex already locked.
307
308 if (serv->m_state == Server_socket::State::S_CLOSING)
309 {
310 /* This is the same to the user as CLOSED -- only different in that serv has not been disowned by the Node yet.
311 * See rationale for this in the accept() documentation header. */
312 FLOW_ERROR_EMIT_ERROR_LOG_INFO(serv->m_disconnect_cause);
313
314 // Not listening anymore; pretend nothing on queue.
315 return Peer_socket::Ptr();
316 }
317 // else
318 assert(serv->m_state == Server_socket::State::S_LISTENING);
319
320 if (serv->m_unaccepted_socks.empty())
321 {
322 // Nothing on the queue. As advertised, this is not an error in LISTENING state.
323 err_code->clear();
324 return Peer_socket::Ptr();
325 }
326 // else
327
328 // Pop from queue. Linked_hash_set queues things up at the front (via insert()), so pop from the back.
329 Peer_socket::Ptr sock = serv->m_unaccepted_socks.const_back();
330 serv->m_unaccepted_socks.pop_back();
331
332 /* Now that it's accepted, remove reference to the server socket, so that when the server socket
333 * is closed, sock is not closed (since it's a fully functioning independent socket now). */
334 sock->m_originating_serv.reset(); // This is synchronized together with m_unaccepted_socks.
335
336 FLOW_LOG_INFO("Connection [" << sock << "] on [" << serv << "] accepted.");
337
338 err_code->clear();
339 return sock;
340} // Node::accept()
341
342bool Node::serv_is_acceptable(const boost::any& serv_as_any) const
343{
344 using boost::any_cast;
345
346 const Server_socket::Const_ptr serv = any_cast<Server_socket::Ptr>(serv_as_any);
347
348 Peer_socket::Lock_guard lock(serv->m_mutex); // Many threads can access/write below state.
349
350 /* Our task here is to return true if and only if at this very moment calling serv->accept()would
351 * yield either a non-null return value OR a non-success *err_code. In other words,
352 * accept() would return "something." This is used for Event_set machinery.
353 *
354 * This should mirror accept()'s algorithm. @todo Should accept() call this, for code reuse?
355 * Maybe/maybe not. Consider performance when deciding.
356 *
357 * Basically, CLOSING/CLOSED => error (Acceptable); LISTENING + non-empty accept queue =>
358 * returns a socket (Acceptable); LISTENING + empty queue => returns null and no error (not
359 * Acceptable). So the latter is the only way (though quite common) it can be NOT Acceptable. */
360
361 return !((serv->m_state == Server_socket::State::S_LISTENING) && serv->m_unaccepted_socks.empty());
362} // Node::serv_is_acceptable()
363
365 const Error_code& err_code, bool defer_delta_check)
366{
367 // We are in thread W.
368
369 // Check explicitly documented pre-conditions.
370
371 assert(serv->m_state != Server_socket::State::S_CLOSED);
372 // Caller should have closed all the associated sockets already.
373 assert(serv->m_connecting_socks.empty());
374 {
375 Server_socket::Lock_guard lock(serv->m_mutex); // At least m_unaccepted_socks can be accessed by user.
376 assert(serv->m_unaccepted_socks.empty());
377 }
378
379 FLOW_ERROR_LOG_ERROR(err_code);
380 FLOW_LOG_INFO("Closing and destroying [" << serv << "].");
381
382 serv_close_detected(serv, err_code, true); // Sets S_CLOSED public state (and related data).
383
384 // Next, remove serv from our main server list.
385
386#ifndef NDEBUG
387 const bool erased = 1 ==
388#endif
389 m_servs.erase(local_port);
390 assert(erased); // Not S_CLOSED => it's in m_servs. Otherwise there's a serious bug somewhere.
391
392 // Return the port.
393 Error_code return_err_code;
394 m_ports.return_port(local_port, &return_err_code);
395 assert(!return_err_code);
396
397 /* serv has changed to CLOSED state. Performing serv->accept() would therefore
398 * certainly return an error. Returning an error from that method (as opposed to null but no
399 * error) is considered Acceptable (as we want to alert the user to the error, so her wait [if
400 * any] wakes up and notices the error). Therefore we should soon inform anyone waiting on any
401 * Event_sets for serv to become Acceptable
402 *
403 * Caveat: Similar to that in Node::handle_syn_ack_ack_to_syn_rcvd() at similar point in the
404 * code. */
405
406 // Accumulate the event into the Node store (note: not any Event_set yet).
408 {
409 // Possibly inform the user for any applicable Event_sets right now.
410 event_set_all_check_delta(defer_delta_check);
411 }
412} // Node::close_empty_server_immediately()
413
415{
416 Server_socket::Lock_guard lock(serv->m_mutex);
417
418 // @todo Add TRACE logging.
419
420 serv->m_state = state;
422 {
423 /* Important convention: S_CLOSED means socket is permanently incapable of accepting more
424 * connections. At this point the originating Node removes the
425 * socket from its internal structures. Therefore, the Node itself may even go away -- while
426 * this Server_socket still exists. Since we use shared_ptr when giving our socket objects,
427 * that's fine -- but we want to avoid returning an invalid Node* in node(). So, when
428 * S_CLOSED, serv->m_node = 0. */
429 serv->m_node = 0;
430 }
431}
432
434 boost::shared_ptr<const Syn_packet> syn,
435 const util::Udp_endpoint& low_lvl_remote_endpoint)
436{
437 using util::Blob;
438 using boost::random::uniform_int_distribution;
439
440 // We are in thread W.
441
442 /* We just got SYN (an overture from the other side). Create a peer-to-peer socket to track that
443 * connection being established. */
444
445 Peer_socket::Ptr sock;
446 if (serv->m_child_sock_opts)
447 {
448 /* They provided custom per-socket options in listen(), and we've been storing them in *serv.
449 * They're already validated at listen() time, so we just give them to Peer_socket constructor,
450 * which copies them. */
451 sock.reset(sock_create(*serv->m_child_sock_opts));
452 }
453 else
454 {
455 /* More typically, they did not provide per-socket options. So we just pass our global template
456 * for the per-socket options to the Peer_socket constructor. The only caveat is that template
457 * may be concurrently changed, so we must lock it. Could do it with opt(), but that introduces
458 * an extra copy of the entire struct, so just do it explicitly (read-only lock for
459 * performance).
460 *
461 * Note: no need to validate; global options (including per-socket ones) are validated
462 * elsewhere when set. */
465 }
466
467 // Socket created; set members.
468
469 sock->m_active_connect = false;
470 sock->m_node = this;
472 sock->m_remote_endpoint = Remote_endpoint{ low_lvl_remote_endpoint, syn->m_packed.m_src_port };
473 sock->m_local_port = serv->m_local_port;
474 // Save it for user to be able to call sock->get_connect_metadata(). Add const to express we want copy, not move.
475 sock->m_serialized_metadata = static_cast<const Blob&>(syn->m_serialized_metadata);
476 sock->m_int_state = Peer_socket::Int_state::S_CLOSED; // Kind of pedantic. We'll set SYN_RCVD a bit later on.
477 // Save the start of the sequence number series based on their initial sequence number.
478 sock->m_rcv_init_seq_num = syn->m_init_seq_num;
479 sock->m_rcv_next_seq_num = sock->m_rcv_init_seq_num + 1;
480
481 /* Initialize the connection's send bandwidth estimator (object that estimates available
482 * outgoing bandwidth based on incoming acknowledgments). It may be used by m_snd_cong_ctl,
483 * depending on the strategy chosen, but may be useful in its own right. Hence it's a separate
484 * object, not inside *m_snd_cong_ctl. */
485 sock->m_snd_bandwidth_estimator.reset(new Send_bandwidth_estimator(get_logger(), sock));
486
487 // Initialize the connection's congestion control strategy based on the configured strategy.
488 sock->m_snd_cong_ctl.reset
489 (Congestion_control_selector::create_strategy(sock->m_opts.m_st_cong_ctl_strategy, get_logger(), sock));
490 // ^-- No need to use opt() yet: user doesn't have socket and cannot set_options() on it yet.
491
492 const Socket_id& socket_id = Node::socket_id(sock);
493 FLOW_LOG_INFO("NetFlow worker thread starting passive-connect of [" << sock << "] on [" << serv << "]. "
494 "Received [" << syn->m_type_ostream_manip << "] with ISN [" << syn->m_init_seq_num << "].");
495
496 // Ensure we can support the specified packet options.
497
498 if (syn->m_opt_rexmit_on != sock->rexmit_on())
499 {
500 FLOW_LOG_WARNING("NetFlow worker thread starting passive-connect of [" << sock << "] on [" << serv << "]. "
501 "Received [" << syn->m_type_ostream_manip << "] with "
502 "opt_rexmit_on [" << syn->m_opt_rexmit_on << "]; was configured otherwise on this side; "
503 "resetting connection.");
504 /* We'd inform the user here, but they didn't open the connection (it's a passive open, and they
505 * haven't yet called accept()). We can respond with RST, however, to tell the other side this
506 * connection isn't going to happen. We didn't place sock into m_socks, so just let it
507 * disappear via shared_ptr<> magic. */
509 return Peer_socket::Ptr();
510 }
511 // else
512
513 // Ensure this socket pair does not yet exist in our peer-to-peer socket table.
514
516 {
517 /* This is a passive connect (they're intiating the connection). Therefore in particular it
518 * should be impossible that our local_port() equals an already existing connection's
519 * local_port(); Port_space is supposed to prevent the same ephemeral or service port from being
520 * handed out to more than one connection. Therefore this must be a programming error. */
521
522 FLOW_LOG_WARNING("Cannot add [" << sock << "], because such a connection already exists. "
523 "This is an ephemeral or service port collision and "
524 "constitutes either a bug or an extremely unlikely condition.");
525
526 // Same reasoning as above: send RST, and let sock disappear.
527 async_no_sock_low_lvl_rst_send(syn, low_lvl_remote_endpoint);
528 return Peer_socket::Ptr();
529 } // if (that socket pair already exists)
530 // else
531
532 /* Try the packet send (as just below) again if SYN_ACK not acknowledged within a certain amount of
533 * time. Give up if that happens too many times.
534 * Follow same order of ops (schedule, send) as in the SYN case elsewhere. */
536
537 // Send SYN_ACK to continue the handshake. Save some *sock data first, as they are used in create_syn_ack().
538
539 /* Initial Sequence Number (the start of our own series).
540 * Remember it in case we must retransmit the SYN. (m_snd_next_seq_num may have been further increased by then.) */
541 Sequence_number& init_seq_num = sock->m_snd_init_seq_num;
543 // Same comment as when calling sock->m_snd_init_seq_num.set_metadata() elsewhere. See that.
544 init_seq_num.set_metadata('L',init_seq_num + 1, sock->max_block_size());
545 // Sequence number of first bit of actual data.
546 sock->m_snd_next_seq_num = init_seq_num + 1;
547 // Security token. Random number from entire numeric range. Remember it for later verification.
548 sock->m_security_token = m_rnd_security_tokens();
549 // Initial receive window is simply the entire empty Receive buffer.
550 sock->m_rcv_last_sent_rcv_wnd = sock_rcv_wnd(sock);
551
552 // Make a packet; fill out common fields in and asynchronously send it.
553 auto syn_ack = create_syn_ack(sock);
554 Error_code dummy;
557 &dummy)) // Warns on error.
558 {
559 /* Serialization error. Very unlikely. We'd inform the user here, but they didn't open the
560 * connection (it's a passive open, and they haven't yet called accept()). We'd send RST to the
561 * other side, but we couldn't even serialize a SYN_ACK, so nothing to do except give up
562 * silently. We didn't place sock into m_socks, so just let it disappear via shared_ptr<>
563 * magic. */
564
565 cancel_timers(sock); // Cancel timers set up above.
566 return Peer_socket::Ptr();
567 }
568 /* send will happen asynchronously, and the registered completion handler will execute in this
569 * thread when done (NO SOONER than this method finishes executing). */
570
571 // No more errors: Map socket pair to the socket data structure (kind of analogous to a TCP net-stack's TCB).
572 m_socks[socket_id] = sock;
573
574 // Also record it within the server socket (more comments inside this method).
575 serv_peer_socket_init(serv, sock);
576
577 // CLOSED -> SYN_RCVD.
579
580 return sock;
581} // Node::handle_syn_to_listening_server()
582
584 Peer_socket::Ptr sock,
585 boost::shared_ptr<const Syn_ack_ack_packet> syn_ack_ack)
586{
587 using boost::shared_ptr;
588
589 // We are in thread W.
590
591 /* We'd sent SYN_ACK and just got SYN_ACK_ACK. Assuming their SYN_ACK_ACK is valid, our side of
592 * connection can move to ESTABLISHED state, as theirs already has. */
593
594 FLOW_LOG_INFO("NetFlow worker thread continuing passive-connect of socket [" << sock << "]. "
595 "Received [" << syn_ack_ack->m_type_ostream_manip << "]; "
596 "security token [" << syn_ack_ack->m_packed.m_security_token << "].");
597
598 // First, validate their security token equals the one we've sent.
599 if (sock->m_security_token != syn_ack_ack->m_packed.m_security_token)
600 {
601 FLOW_LOG_WARNING("Received [" << syn_ack_ack->m_type_ostream_manip << "] targeted at state "
602 "[" << Peer_socket::Int_state::S_SYN_RCVD << "] socket [" << sock << "] "
603 "with mismatching security token "
604 "[" << syn_ack_ack->m_packed.m_security_token << "]; we had received and sent and expect "
605 "[" << sock->m_security_token << "]. Closing.");
606 /* Close connection in our structures (inform user if necessary as well). Pre-conditions
607 * assumed by call: sock in m_socks and sock->state() == S_OPEN (yes, since m_int_state ==
608 * S_SYN_RCVD); 3rd arg contains the reason for the close (yes). */
610 // ^-- defer_delta_check == true: for similar reason as at the end of this method.
611 return;
612 }
613 // else OK.
614
615 // No more errors.
616
617 // Move ourselves to connected state.
618
619 // The server socket to which the other side sent SYN to create the peer socket sock.
620 const Server_socket::Ptr serv = sock->m_originating_serv;
621
622 // Public state (thread-safe).
624 // Internal state. SYN_RCVD -> ESTABLISHED.
626
627 // Got the acknowledgment to SYN_ACK, so cancel retransmits and the timeout for that SYN_ACK.
628 cancel_timers(sock);
629
630 // Setup the Drop Timeout engine (m_snd_drop_timer).
632
633 // Add the peer socket to the server socket's accept queue (thread-safe)! accept() will return this.
634 serv_peer_socket_acceptable(serv, sock);
635 // BTW serv->m_originating_serv is now null.
636
637 // Record initial rcv_wnd; it should be the entire size of the other side's Receive buffer.
638 sock->m_snd_remote_rcv_wnd = syn_ack_ack->m_packed.m_rcv_wnd;
639
640 /* We may have queued up some DATA packets while we were SYN_RCVD (due to loss and/or
641 * re-ordering). See handle_data_to_syn_rcvd() for more information. So, handle the queued DATA
642 * packets as if they'd just arrived. */
643 for (shared_ptr<Data_packet> qd_packet : sock->m_rcv_syn_rcvd_data_q)
644 {
645 auto const logger_ptr = get_logger();
646 if (logger_ptr && logger_ptr->should_log(log::Sev::S_TRACE, get_log_component()))
647 {
649 ("Handling [" << qd_packet->m_type_ostream_manip << "] packet "
650 "received/queued in [" << Peer_socket::Int_state::S_SYN_RCVD << "] state; "
651 "packet data size = [" << qd_packet->m_data.size() << "].");
652
653 // Very verbose and CPU-intensive, especially DATA version!
654 if (logger_ptr->should_log(log::Sev::S_DATA, get_log_component()))
655 {
656 FLOW_LOG_DATA_WITHOUT_CHECKING("Readable representation is: "
657 "[\n" << qd_packet->m_verbose_ostream_manip << "].");
658 }
659 else
660 {
661 FLOW_LOG_TRACE_WITHOUT_CHECKING("Readable representation is: "
662 "[\n" << qd_packet->m_concise_ostream_manip << "].");
663 }
664 }
665
666 handle_data_to_established(socket_id, sock, qd_packet, true); // true <=> packet was queued during SYN_RCVD.
667 // qd_packet has probably been decimated for performance, so don't rely on qd_packet.m_data at this point!
668 }
669 if (!sock->m_rcv_syn_rcvd_data_q.empty())
670 {
671 FLOW_LOG_TRACE("Handled a total of [" << sock->m_rcv_syn_rcvd_data_q.size() << "] queued packets with "
672 "cumulative data size [" << sock->m_rcv_syn_rcvd_data_cumulative_size << "].");
673 }
674 sock->m_rcv_syn_rcvd_data_q.clear(); // Save memory.
675
676 /* Since we just added sock to serv's acceptable socket queue, certainly serv is now Acceptable.
677 * Therefore we should soon inform anyone waiting on any Event_sets for serv to become Acceptable.
678 *
679 * Caveat: The user could have called serv->accept() right after the previous statement in this
680 * method, which could indeed make serv not Acceptable again. That is OK. We only promise to
681 * inform the user of an event within a "non-blocking" amount of time of it occurring. If that
682 * same user decides to mess himself over by acting on these events prematurely, that is not our
683 * problem [assuming we don't crash things, which we do not]. Worst case is that the user will
684 * detect the event, try to accept() and get nothing [which is an eventuality for which any decent
685 * user code would prepare]. */
686
687 // Accumulate the event into the Node store (note: not any Event_set yet).
689 {
690 // Possibly inform the user for any applicable Event_sets right now.
692 /* ^-- defer_delta_check == true: because the only way to get to this method is from
693 * async_low_lvl_recv(), which will perform event_set_all_check_delta(false) at the end of itself,
694 * before the boost.asio handler exits. See Node::m_sock_events doc header for details. */
695 }
696
697 /* Do not m_sock_events[S_PEER_SOCKET_WRITABLE].insert(sock), as sock has not been accept()ed and
698 * therefore cannot be waited on currently. */
699} // Node::handle_syn_ack_ack_to_syn_rcvd()
700
702 boost::shared_ptr<Data_packet> packet)
703{
704 // We are in thread W.
705
706 /* We'd sent SYN_ACK, were waiting for SYN_ACK_ACK, but instead we got a DATA.
707 * This seems wrong at a first glance but can be legitimate. One possibility is they sent
708 * SYN_ACK_ACK and then some DATA, but the SYN_ACK_ACK was dropped (recall that the SYN_ACK_ACK is
709 * not itself acknowledged in our scheme). Another possibility is they sent both, but then DATA
710 * got re-ordered to in front of SYN_ACK_ACK.
711 *
712 * What does TCP do here? Well, it doesn't really have this problem, because every segment must
713 * have an ACK in it. So if a TCP gets data in SYN_RCVD, it must also contain the ACK to the
714 * SYN_ACK (what we call SYN_ACK_ACK) (in TCP, any packet without ACK is simply considered
715 * corrupt/invalid and would not be sent in the first place). So it's impossible to send data
716 * without acknowledging the SYN_ACK at the same time.
717 *
718 * For us, however ACK packets are independent of DATA packets, as are SYN_ACK_ACK packets.
719 * Therefore we should either drop these DATAs and hope whatever reliability implementation is
720 * used restores them later, or we should queue them for consumption when ESTABLISHED arrives.
721 * Let's do the latter. It's hard enough to deal with actual loss; introducing loss when we
722 * actually have the stuff seems absurd.
723 *
724 * So we just save them in a packet queue, and when we're ESTABLISHED we feed all the packets to
725 * handle_incoming() as if they'd just arrived. The only caveat is size of this queue. Since
726 * we have a maximum on the Receive buffer (sock->m_rcv_buf) and the packets-with-gaps structure
727 * (sock->m_rcv_packets_with_gaps), we must have one here as well. Since Receive buffer is
728 * empty until ESTABLISHED, it seems natural to limit this queue's cumulative byte size
729 * according to the limit imposed on Receive buffer. (There is some extra overhead to store the
730 * packet header info, but it's close enough.) After that, as when the Receive buffer fills up,
731 * we drop packets. */
732
733 assert(sock->m_int_state == Peer_socket::Int_state::S_SYN_RCVD);
734 const bool first_time = sock->m_rcv_syn_rcvd_data_q.empty();
735
736 // Not a WARNING, because we didn't do anything wrong; could be network conditions; and avoid verbosity after 1st one.
738 "NetFlow worker thread received [" << packet->m_type_ostream_manip << "] packet while "
739 "in [" << Peer_socket::Int_state::S_SYN_RCVD << "] state for [" << sock << "]; "
740 "saving for processing later when in [" << Peer_socket::Int_state::S_ESTABLISHED << "] "
741 "state; packet data size = [" << packet->m_data.size() << "]; "
742 "first time? = [" << first_time << "].");
743
744 if (first_time)
745 {
746 sock->m_rcv_syn_rcvd_data_cumulative_size = 0; // It's garbage at the moment.
747 }
748 else if ((sock->m_rcv_syn_rcvd_data_cumulative_size + packet->m_data.size())
749 > sock->opt(sock->m_opts.m_st_snd_buf_max_size))
750 {
751 // Not a WARNING, because we didn't do anything wrong; could be network conditions.
752 FLOW_LOG_INFO("NetFlow worker thread received [" << packet->m_type_ostream_manip << "] packet while "
753 "in [" << Peer_socket::Int_state::S_SYN_RCVD << "] state for [" << sock << "]; "
754 "dropping because Receive queue full at [" << sock->m_rcv_syn_rcvd_data_cumulative_size << "].");
755 return;
756 }
757 // else
758
759 sock->m_rcv_syn_rcvd_data_cumulative_size += packet->m_data.size();
760 sock->m_rcv_syn_rcvd_data_q.push_back(packet); // Note that this is not a copy of the packet (just a pointer).
761
762 FLOW_LOG_TRACE("Receive queue now has [" << sock->m_rcv_syn_rcvd_data_q.size() << "] packets; "
763 "cumulative data size is [" << sock->m_rcv_syn_rcvd_data_cumulative_size << "].");
764} // Node::handle_data_to_syn_rcvd()
765
767 const Error_code& disconnect_cause, bool close)
768{
769 /* @todo Nothing calls this yet, as we don't support any way to close a Server_socket yet.
770 * Probably will reconsider this method when we do. */
771
772 Server_socket::Lock_guard lock(serv->m_mutex);
773 serv->m_disconnect_cause = disconnect_cause;
774 if (close)
775 {
776 // DONE.
777 serv_set_state(serv, Server_socket::State::S_CLOSED); // Reentrant mutex => OK.
778 }
779 else
780 {
781 // This socket is screwed but not yet out of the Node's system.
782 serv_set_state(serv, Server_socket::State::S_CLOSING); // Reentrant mutex => OK.
783 }
784}
785
787{
788 using std::list;
789
790 // We are in thread W.
791
792 /* sock is in one of two stages:
793 * - stage 1: serv->m_connecting_socks (not available via accept()), the earlier stage;
794 * - stage 2: serv->m_unaccepted_socks (available via accept()), the later stage.
795 *
796 * Try stage 1, then stage 2. */
797
798 // Stage 1.
799 const bool erased = serv->m_connecting_socks.erase(sock) == 1;
800 if (erased)
801 {
802 /* Maintain invariant. No need to lock mutex, because sock is in serv->m_connecting_socks, which
803 * means it is not in serv->m_unaccepted_socks yet, which means accept() cannot yield it, which means
804 * no non-W thread could be accessing m_originating_serv at the same time. */
805 sock->m_originating_serv.reset();
806 return;
807 }
808 // else
809
810 // Stage 2.
811
812 /* Remove from serv->m_unaccepted_socks. At this point accept() can access serv->m_unaccepted_socks and
813 * m_originating_serv, so we must lock. */
814 Server_socket::Lock_guard lock(serv->m_mutex);
815
816 sock->m_originating_serv.reset(); // Maintain invariant.
817
818 // O(1)ish.
819 serv->m_unaccepted_socks.erase(sock);
820
821 /* Notes:
822 *
823 * The unaccepted socket queue of serv can be accessed by accept()ing threads outside
824 * of thread W. So we must lock object at least to avoid corruption. We do that above.
825 *
826 * Now, let's think about the race. Suppose close_connection_immediately() starts and wins
827 * the race to lock *this; removes sock from serv->m_unaccepted_socks; unlocks *this; then the
828 * user immediately gets to call accept(). The user will not get sock as the result of the
829 * accept(), as we'd removed it in time. Good. Now suppose close_connection_immediately() starts
830 * but loses the race to lock *sock; user calls accept() first, and accept() yields sock (in
831 * S_ESTABLISHED state, though with empty Receive buffer, which is a pre-condition for
832 * close_connection_immediately()); then we lock sock and remove sock from
833 * serv->m_unaccepted_socks. Is this OK? Well, it is not different from this situation: they
834 * accept()ed, and then quickly there was an error on the resulting socket, so we closed it
835 * before any data came in. Therefore, yes, this is also OK. */
836} // Node::serv_peer_socket_closed()
837
839{
840 // We are in thread W.
841
842 {
843 Server_socket::Lock_guard lock(serv->m_mutex);
844 serv->m_unaccepted_socks.insert(sock); // Remember that Linked_hash_set<> insert()s at the *front*.
845 }
846 // This guy is only to be accessed from thread W (which we're in), so no lock needed.
847 serv->m_connecting_socks.erase(sock);
848}
849
851{
852 // We are in thread W.
853
854 /* Add this connecting socket to the pool of such connecting sockets maintained by the
855 * Server_socket. Once it is fully established, it will move from here to the queue
856 * serv->m_unaccepted_socks, where users can claim it via accept(). The utility of serv->m_unaccepted_socks
857 * is obvious, but why keep serv->m_connecting_socks? After all we've already added sock to m_socks, so
858 * demultiplexing of those messages (like SYN_ACK_ACK) will work without any additional structure.
859 * Answer: we need serv->m_connecting_socks at least for the case where we or the user close the
860 * listening socket (serv). In this case all pending connections must be aborted via RST (to let
861 * the other side know), and we'd know which ones to contact via serv->m_connecting_socks.
862 * The disallowing of accept()s after the associated listen() has been canceled is discussed in
863 * Server_socket documentation, but in short that behavior is similar to the BSD sockets
864 * behavior. */
865 serv->m_connecting_socks.insert(sock);
866
867 // And vice versa: maintain the invariant.
868 sock->m_originating_serv = serv;
869
870 // We didn't lock, because socket not yet available via accept(), so not accessed from non-W threads.
871} // Node::serv_peer_socket_init()
872
873Server_socket* Node::serv_create(const Peer_socket_options* child_sock_opts) // Virtual.
874{
875 // Just make a regular net_flow::Server_socket.
876 return serv_create_forward_plus_ctor_args<Server_socket>(child_sock_opts);
877}
878
879// Free implementations.
880
881std::ostream& operator<<(std::ostream& os, const Server_socket* serv)
882{
883 return
884 serv
885 ? (os
886 << "NetFlow_server [NetFlow [:" << serv->local_port() << "]] @" << static_cast<const void*>(serv))
887 : (os << "NetFlow_server@null");
888}
889
890/// @cond
891/* -^- Doxygen, please ignore the following. (Don't want docs generated for temp macro; this is more maintainable
892 * than specifying the macro name to omit it, in Doxygen-config EXCLUDE_SYMBOLS.) */
893
894// That's right, I did this. Wanna fight about it?
895#define STATE_TO_CASE_STATEMENT(ARG_state) \
896 case Server_socket::State::S_##ARG_state: \
897 return os << #ARG_state
898
899// -v- Doxygen, please stop ignoring.
900/// @endcond
901
902std::ostream& operator<<(std::ostream& os, Server_socket::State state)
903{
904 switch (state)
905 {
906 STATE_TO_CASE_STATEMENT(LISTENING);
907 STATE_TO_CASE_STATEMENT(CLOSING);
908 STATE_TO_CASE_STATEMENT(CLOSED);
909 }
910 return os;
911#undef STATE_TO_CASE_STATEMENT
912}
913
914} // namespace flow::net_flow
const Component & get_log_component() const
Returns reference to the stored Component object, particularly as many FLOW_LOG_*() macros expect.
Definition: log.cpp:229
Logger * get_logger() const
Returns the stored Logger pointer, particularly as many FLOW_LOG_*() macros expect.
Definition: log.cpp:224
Interface that the user should implement, passing the implementing Logger into logging classes (Flow'...
Definition: log.hpp:1291
static Congestion_control_strategy * create_strategy(Strategy_choice strategy_choice, log::Logger *logger_ptr, Peer_socket::Const_ptr sock)
Factory method that, given an enum identifying the desired strategy, allocates the appropriate Conges...
Definition: cong_ctl.cpp:101
@ S_SERVER_SOCKET_ACCEPTABLE
Event type specifying the condition of interest wherein a target Server_socket serv is such that call...
An object of this class is a single Flow-protocol networking node, in the sense that: (1) it has a di...
Definition: node.hpp:937
void serv_peer_socket_init(Server_socket::Ptr serv, Peer_socket::Ptr sock)
Records a new (just received SYN) peer socket from the given server socket.
void handle_data_to_established(const Socket_id &socket_id, Peer_socket::Ptr sock, boost::shared_ptr< Data_packet > packet, bool syn_rcvd_qd_packet)
Handles a just-deserialized, just-demultiplexed, low-level DATA packet delivered to the given peer so...
virtual Server_socket * serv_create(const Peer_socket_options *child_sock_opts)
Internal factory used for ALL Server_socket objects created by this Node (including subclasses).
void serv_set_state(Server_socket::Ptr serv, Server_socket::State state)
Sets Server_socket::m_state.
static bool ensure_sock_open(Socket_ptr sock, Error_code *err_code)
Helper method that checks whether the given Peer_socket or Server_socket is CLOSED; if so,...
Definition: node.hpp:4141
void handle_syn_ack_ack_to_syn_rcvd(const Socket_id &socket_id, Peer_socket::Ptr sock, boost::shared_ptr< const Syn_ack_ack_packet > syn_ack_ack)
Handles a just-deserialized, just-demultiplexed low-level SYN_ACK_ACK packet delivered to the given p...
Non_blocking_func_ret_type sync_op(typename Socket::Ptr sock, const Function< Non_blocking_func_ret_type()> &non_blocking_func, Non_blocking_func_ret_type would_block_ret_val, Event_set::Event_type ev_type, const Fine_time_pt &wait_until, Error_code *err_code)
Implementation of core blocking transfer methods, namely Peer_socket::sync_send(),...
Definition: node.hpp:3977
void serv_close_detected(Server_socket::Ptr serv, const Error_code &disconnect_cause, bool close)
Records that thread W shows this socket is not to listen to incoming connections and is to abort any ...
bool async_sock_low_lvl_packet_send_paced(const Peer_socket::Ptr &sock, Low_lvl_packet::Ptr &&packet, Error_code *err_code)
Begins the process of asynchronously sending the given low-level packet to the remote Node specified ...
Definition: low_lvl_io.cpp:605
Server_socket::Ptr listen(flow_port_t local_port, Error_code *err_code=0, const Peer_socket_options *child_sock_opts=0)
Sets up a server on the given local Flow port and returns Server_socket which can be used to accept s...
void cancel_timers(Peer_socket::Ptr sock)
Cancel any timers and scheduled tasks active in the given socket.
bool sock_validate_options(const Peer_socket_options &opts, const Peer_socket_options *prev_opts, Error_code *err_code) const
Analogous to validate_options() but checks per-socket options instead of per-Node options.
Options_mutex m_opts_mutex
The mutex protecting m_opts.
Definition: node.hpp:3707
bool serv_is_acceptable(const boost::any &serv_as_any) const
Returns true if and only if calling serv->accept() with at least some arguments would return either n...
Syn_ack_packet::Ptr create_syn_ack(Peer_socket::Const_ptr sock)
Like create_syn() but for SYN_ACK.
virtual Peer_socket * sock_create(const Peer_socket_options &opts)
Internal factory used for ALL Peer_socket objects created by this Node (including subclasses).
void async_no_sock_low_lvl_rst_send(Low_lvl_packet::Const_ptr causing_packet, const util::Udp_endpoint &low_lvl_remote_endpoint)
Sends an RST to the given UDP endpoint in response to the given incoming low-level packet that came f...
Definition: low_lvl_io.cpp:586
Sequence_number::Generator m_seq_num_generator
Sequence number generator (at least to generate ISNs). Only thread W can access this.
Definition: node.hpp:3780
void event_set_all_check_delta(bool defer_delta_check)
For each WAITING Event_set within the Node: checks for any events that hold, and if any do hold,...
Definition: event_set.cpp:1129
void serv_peer_socket_closed(Server_socket::Ptr serv, Peer_socket::Ptr sock)
Records that a Server_socket-contained (i.e., currently un-established, or established but not yet ac...
Socket_id_to_socket_map m_socks
The peer-to-peer connections this Node is currently tracking.
Definition: node.hpp:3792
Peer_socket::Options_lock Options_lock
Short-hand for lock that acquires exclusive access to an Options_mutex.
Definition: node.hpp:1439
static Socket_id socket_id(Peer_socket::Const_ptr sock)
Constructs the socket pair (connection ID) for the given socket.
void sock_set_int_state(Peer_socket::Ptr sock, Peer_socket::Int_state new_state)
Sets internal state of given socket to the given state and logs a TRACE message about it.
Peer_socket::Ptr handle_syn_to_listening_server(Server_socket::Ptr serv, boost::shared_ptr< const Syn_packet > syn, const util::Udp_endpoint &low_lvl_remote_endpoint)
Handles a just-deserialized, just-demultiplexed low-level SYN packet delivered to the given server so...
bool running() const
Returns true if and only if the Node is operating.
Definition: node.cpp:420
Port_to_server_map m_servs
The server sockets this Node is currently tracking.
Definition: node.hpp:3798
Event_set::Ev_type_to_socks_map m_sock_events
All sockets that have been detected to be "ready" (by the Event_set doc header definition) at any poi...
Definition: node.hpp:3830
void setup_drop_timer(const Socket_id &socket_id, Peer_socket::Ptr sock)
Creates a new Drop Timer and saves it to sock->m_snd_drop_timer.
void listen_worker(flow_port_t local_port, const Peer_socket_options *child_sock_opts, Server_socket::Ptr *serv)
Thread W implementation of listen().
void serv_peer_socket_acceptable(Server_socket::Ptr serv, Peer_socket::Ptr sock)
Records that an unestablished socket sock (Peer_socket::Int_state::S_SYN_RCVD) has just become establ...
void handle_data_to_syn_rcvd(Peer_socket::Ptr sock, boost::shared_ptr< Data_packet > packet)
Handles a just-deserialized, just-demultiplexed, low-level DATA packet delivered to the given peer so...
void setup_connection_timers(const Socket_id &socket_id, Peer_socket::Ptr sock, bool initial)
Assuming we've just sent SYN or SYN_ACK, sets up an asynchronous scheduled task to fire within some a...
size_t sock_rcv_wnd(Peer_socket::Const_ptr sock) const
Computes and returns the currently correct rcv_wnd value; that is the amount of space free in Receive...
Peer_socket::Ptr accept(Server_socket::Ptr serv, Error_code *err_code)
Implementation of non-blocking serv->accept() for server socket serv in all cases except when serv->s...
void sock_set_state(Peer_socket::Ptr sock, Peer_socket::State state, Peer_socket::Open_sub_state open_sub_state=Peer_socket::Open_sub_state::S_CONNECTED)
Sets Peer_socket::m_state and Peer_socket::m_open_sub_state.
Node_options m_opts
This Node's global set of options.
Definition: node.hpp:3704
void close_empty_server_immediately(const flow_port_t local_port, Server_socket::Ptr serv, const Error_code &err_code, bool defer_delta_check)
Handles the transition of the given server socket from S_LISTENING/S_CLOSING to S_CLOSED (including e...
util::Rnd_gen_uniform_range< Peer_socket::security_token_t > m_rnd_security_tokens
Random number generator for picking security tokens; seeded on time at Node construction and generate...
Definition: node.hpp:3786
util::Task_engine m_task_engine
The main loop engine, functioning in the single-threaded-but-asynchronous callback-based "reactor" st...
Definition: node.hpp:3739
Port_space m_ports
Flow port space for both client and server sockets. All threads may access this.
Definition: node.hpp:3777
void rst_and_close_connection_immediately(const Socket_id &socket_id, Peer_socket::Ptr sock, const Error_code &err_code, bool defer_delta_check)
Asynchronously send RST to the other side of the given socket and close_connection_immediately().
@ S_OPEN
Future reads or writes may be possible. A socket in this state may be Writable or Readable.
@ S_CONNECTED
This Peer_socket was created through a passive connect (Node::accept() and the like) or an active con...
@ S_CONNECTING
This Peer_socket was created through an active connect (Node::connect() and the like),...
util::Lock_guard< Mutex > Lock_guard
Short-hand for RAII lock guard of Mutex.
@ S_ESTABLISHED
Public state is OPEN+CONNECTED; in our opinion the connection is established.
@ S_CLOSED
Closed (dead or new) socket.
@ S_SYN_RCVD
Public state is OPEN+CONNECTING; other side requested passive connect via SYN; we sent SYN_ACK and ar...
flow_port_t reserve_port(flow_port_t port, Error_code *err_code)
Reserve the specified service port, or reserve_ephemeral_port() if the specified port is S_PORT_ANY.
Definition: port_space.cpp:75
void return_port(flow_port_t port, Error_code *err_code)
Return a previously reserved port (of any type).
Definition: port_space.cpp:175
A per-Peer_socket module that tries to estimate the bandwidth available to the outgoing flow.
Definition: bandwidth.hpp:125
Sequence_number generate_init_seq_num()
Returns an initial sequence number (ISN) for use in a new connection.
Definition: seq_num.cpp:48
An internal net_flow sequence number identifying a piece of data.
Definition: seq_num.hpp:126
void set_metadata(char num_line_id=0, const Sequence_number &zero_point=Sequence_number(), seq_num_delta_t multiple_size=0)
Updates the full set of metadata (used at least for convenient convention-based logging but not actua...
Definition: seq_num.cpp:268
A server socket able to listen on a single Flow port for incoming connections and return peer sockets...
Peer_socket_ptr sync_accept(const boost::chrono::duration< Rep, Period > &max_wait, bool reactor_pattern=false, Error_code *err_code=0)
Blocking (synchronous) version of accept().
Server_socket(log::Logger *logger, const Peer_socket_options *child_sock_opts)
Constructs object; initializes most values to well-defined (0, empty, etc.) but not necessarily meani...
Mutex m_mutex
This object's mutex.
Error_code disconnect_cause() const
The error code that perviously caused state() to become State::S_CLOSED, or success code if state is ...
Peer_socket_options const *const m_child_sock_opts
Either null or the pointer to a copy of the template Peer_socket_options intended for resulting Peer_...
State m_state
See state(). Should be set before user gets access to *this. Must not be modified by non-W threads.
Peer_socket_ptr sync_accept_impl(const Fine_time_pt &wait_until, bool reactor_pattern, Error_code *err_code)
Same as sync_accept() but uses a Fine_clock-based Fine_duration non-template type for implementation ...
State
State of a Server_socket.
@ S_CLOSED
No accept()s are or will be possible, AND Node has disowned the Server_socket.
@ S_CLOSING
No accept()s are or will be possible, but Node is still finishing up the closing operation.
@ S_LISTENING
Future or current accept()s may be possible. A socket in this state may be Acceptable.
State state() const
Current State of the socket.
flow_port_t m_local_port
See local_port().
~Server_socket() override
Boring virtual destructor. Note that deletion is to be handled exclusively via shared_ptr,...
flow_port_t local_port() const
The local Flow-protocol port on which this server is or was listening.
util::Lock_guard< Mutex > Lock_guard
Short-hand for RAII lock guard of Mutex.
Node * node() const
Node that produced this Server_socket.
Error_code m_disconnect_cause
The Error_code causing this server's move from LISTENING state (if this has occurred); otherwise a cl...
Peer_socket_ptr accept(Error_code *err_code=0)
Non-blocking accept: obtain socket for the least recently established not-yet-obtained peer connectio...
static Const_ptr const_ptr_cast(const From_ptr &ptr_to_cast)
Identical to ptr_cast() but adds const-ness (immutability) to the pointed-to type.
static Ptr ptr_cast(const From_ptr &ptr_to_cast)
Provides syntactic-sugary way to perform a static_pointer_cast<> from a compatible smart pointer type...
boost::shared_ptr< Peer_socket > Ptr
Short-hand for ref-counted pointer to mutable values of type Target_type::element_type (a-la T*).
Const_target_ptr Const_ptr
Short-hand for ref-counted pointer to immutable values of type Target_type::element_type (a-la T cons...
#define FLOW_ERROR_EXEC_AND_THROW_ON_ERROR(ARG_ret_type, ARG_method_name,...)
Narrow-use macro that implements the error code/exception semantics expected of most public-facing Fl...
Definition: error.hpp:357
#define FLOW_ERROR_LOG_ERROR(ARG_val)
Logs a warning about the given error code using FLOW_LOG_WARNING().
Definition: error.hpp:233
#define FLOW_ERROR_EMIT_ERROR(ARG_val)
Sets *err_code to ARG_val and logs a warning about the error using FLOW_LOG_WARNING().
Definition: error.hpp:202
#define FLOW_ERROR_EMIT_ERROR_LOG_INFO(ARG_val)
Identical to FLOW_ERROR_EMIT_ERROR(), but the message logged has flow::log::Sev::S_INFO severity inst...
Definition: error.hpp:218
#define FLOW_LOG_INFO(ARG_stream_fragment)
Logs an INFO message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
Definition: log.hpp:197
#define FLOW_LOG_WARNING(ARG_stream_fragment)
Logs a WARNING message into flow::log::Logger *get_logger() with flow::log::Component get_log_compone...
Definition: log.hpp:152
#define FLOW_LOG_WITH_CHECKING(ARG_sev, ARG_stream_fragment)
Logs a message of the specified severity into flow::log::Logger *get_logger() with flow::log::Compone...
Definition: log.hpp:489
#define FLOW_LOG_TRACE_WITHOUT_CHECKING(ARG_stream_fragment)
Logs a TRACE message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
Definition: log.hpp:354
#define FLOW_LOG_DATA_WITHOUT_CHECKING(ARG_stream_fragment)
Logs a DATA message into flow::log::Logger *get_logger() with flow::log::Component get_log_component(...
Definition: log.hpp:372
#define FLOW_LOG_TRACE(ARG_stream_fragment)
Logs a TRACE message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
Definition: log.hpp:227
Synchronicity
Enumeration indicating the manner in which asio_exec_ctx_post(), and various boost....
Definition: async_fwd.hpp:223
void asio_exec_ctx_post(log::Logger *logger_ptr, Execution_context *exec_ctx, Synchronicity synchronicity, Task &&task)
An extension of boost.asio's post() and dispatch() free function templates, this free function templa...
Definition: util.hpp:31
@ S_DATA
Message satisfies Sev::S_TRACE description AND contains variable-length structure (like packet,...
@ S_TRACE
Message indicates any condition that may occur with great frequency (thus verbose if logged).
@ S_INFO
Message indicates a not-"bad" condition that is not frequent enough to be of severity Sev::S_TRACE.
@ S_CONN_RESET_BAD_PEER_BEHAVIOR
Connection reset because of unexpected/illegal behavior by the other side.
@ S_INTERNAL_ERROR_PORT_COLLISION
Internal error: Ephemeral port double reservation allowed.
@ S_NODE_NOT_RUNNING
Node not running.
Flow module containing the API and implementation of the Flow network protocol, a TCP-inspired stream...
Definition: node.cpp:25
uint16_t flow_port_t
Logical Flow port type (analogous to a UDP/TCP port in spirit but in no way relevant to UDP/TCP).
const flow_port_t S_PORT_ANY
Special Flow port value used to indicate "invalid port" or "please pick a random available ephemeral ...
Definition: port_space.cpp:33
std::ostream & operator<<(std::ostream &os, const Congestion_control_selector::Strategy_choice &strategy_choice)
Serializes a Peer_socket_options::Congestion_control_strategy_choice enum to a standard ostream – the...
Definition: cong_ctl.cpp:146
bool key_exists(const Container &container, const typename Container::key_type &key)
Returns true if and only if the given key is present at least once in the given associative container...
Definition: util.hpp:276
Blob_with_log_context<> Blob
A concrete Blob_with_log_context that compile-time-disables Basic_blob::share() and the sharing API d...
Definition: blob_fwd.hpp:60
boost::asio::ip::udp::endpoint Udp_endpoint
Short-hand for the UDP endpoint (IP/port) type.
Definition: util_fwd.hpp:208
boost::system::error_code Error_code
Short-hand for a boost.system error code (which basically encapsulates an integer/enum error code and...
Definition: common.hpp:502
Flow_log_component
The flow::log::Component payload enumeration comprising various log components used by Flow's own int...
Definition: common.hpp:632
Fine_clock::time_point Fine_time_pt
A high-res time point as returned by Fine_clock::now() and suitable for precise time math in general.
Definition: common.hpp:407
The data nugget uniquely identifying a peer-to-peer connection from a remote endpoint to a port in th...
Definition: node.hpp:3904
Peer_socket_options m_dyn_sock_opts
The set of per-Peer_socket options in this per-Node set of options.
Definition: options.hpp:580
A set of low-level options affecting a single Peer_socket.
Definition: options.hpp:36
Represents the remote endpoint of a Flow-protocol connection; identifies the UDP endpoint of the remo...
Definition: endpoint.hpp:93