37 m_state(
State::S_CLOSED),
42 FLOW_LOG_TRACE(
"Server_socket [" <<
static_cast<void*
>(
this) <<
"] created.");
84 const Ptr serv = shared_from_this();
103 namespace bind_ns = util::bind_ns;
105 using boost::adopt_lock;
108 bind_ns::cref(wait_until), reactor_pattern, _1);
115 const Ptr serv = shared_from_this();
139 wait_until, err_code);
152 using boost::promise;
153 using boost::unique_future;
204 [&]() {
listen_worker(local_port, child_sock_opts, &serv); });
208 if (serv->m_disconnect_cause)
210 *err_code = serv->m_disconnect_cause;
227 auto& serv = *serv_ptr;
243 serv->m_disconnect_cause = err_code;
270 local_port = serv->m_local_port;
272 FLOW_LOG_INFO(
"NetFlow worker thread listening for passive-connects on [" << serv <<
"].");
281 FLOW_LOG_WARNING(
"Cannot set up [" << serv <<
"], because server at port [" << local_port <<
"] already exists! "
282 "This is a port reservation error and constitutes either a bug or an extremely "
283 "unlikely condition.");
286 Error_code* err_code = &serv->m_disconnect_cause;
292 assert(!return_err_code);
320 if (serv->m_unaccepted_socks.empty())
330 serv->m_unaccepted_socks.pop_back();
334 sock->m_originating_serv.reset();
336 FLOW_LOG_INFO(
"Connection [" << sock <<
"] on [" << serv <<
"] accepted.");
344 using boost::any_cast;
365 const Error_code& err_code,
bool defer_delta_check)
373 assert(serv->m_connecting_socks.empty());
376 assert(serv->m_unaccepted_socks.empty());
387 const bool erased = 1 ==
395 assert(!return_err_code);
420 serv->m_state = state;
434 boost::shared_ptr<const Syn_packet> syn,
438 using boost::random::uniform_int_distribution;
446 if (serv->m_child_sock_opts)
469 sock->m_active_connect =
false;
472 sock->m_remote_endpoint =
Remote_endpoint{ low_lvl_remote_endpoint, syn->m_packed.m_src_port };
473 sock->m_local_port = serv->m_local_port;
475 sock->m_serialized_metadata =
static_cast<const Blob&
>(syn->m_serialized_metadata);
478 sock->m_rcv_init_seq_num = syn->m_init_seq_num;
479 sock->m_rcv_next_seq_num = sock->m_rcv_init_seq_num + 1;
488 sock->m_snd_cong_ctl.reset
493 FLOW_LOG_INFO(
"NetFlow worker thread starting passive-connect of [" << sock <<
"] on [" << serv <<
"]. "
494 "Received [" << syn->m_type_ostream_manip <<
"] with ISN [" << syn->m_init_seq_num <<
"].");
498 if (syn->m_opt_rexmit_on != sock->rexmit_on())
500 FLOW_LOG_WARNING(
"NetFlow worker thread starting passive-connect of [" << sock <<
"] on [" << serv <<
"]. "
501 "Received [" << syn->m_type_ostream_manip <<
"] with "
502 "opt_rexmit_on [" << syn->m_opt_rexmit_on <<
"]; was configured otherwise on this side; "
503 "resetting connection.");
522 FLOW_LOG_WARNING(
"Cannot add [" << sock <<
"], because such a connection already exists. "
523 "This is an ephemeral or service port collision and "
524 "constitutes either a bug or an extremely unlikely condition.");
544 init_seq_num.
set_metadata(
'L',init_seq_num + 1, sock->max_block_size());
546 sock->m_snd_next_seq_num = init_seq_num + 1;
585 boost::shared_ptr<const Syn_ack_ack_packet> syn_ack_ack)
587 using boost::shared_ptr;
594 FLOW_LOG_INFO(
"NetFlow worker thread continuing passive-connect of socket [" << sock <<
"]. "
595 "Received [" << syn_ack_ack->m_type_ostream_manip <<
"]; "
596 "security token [" << syn_ack_ack->m_packed.m_security_token <<
"].");
599 if (sock->m_security_token != syn_ack_ack->m_packed.m_security_token)
601 FLOW_LOG_WARNING(
"Received [" << syn_ack_ack->m_type_ostream_manip <<
"] targeted at state "
603 "with mismatching security token "
604 "[" << syn_ack_ack->m_packed.m_security_token <<
"]; we had received and sent and expect "
605 "[" << sock->m_security_token <<
"]. Closing.");
638 sock->m_snd_remote_rcv_wnd = syn_ack_ack->m_packed.m_rcv_wnd;
643 for (shared_ptr<Data_packet> qd_packet : sock->m_rcv_syn_rcvd_data_q)
649 (
"Handling [" << qd_packet->m_type_ostream_manip <<
"] packet "
651 "packet data size = [" << qd_packet->m_data.size() <<
"].");
657 "[\n" << qd_packet->m_verbose_ostream_manip <<
"].");
662 "[\n" << qd_packet->m_concise_ostream_manip <<
"].");
669 if (!sock->m_rcv_syn_rcvd_data_q.empty())
671 FLOW_LOG_TRACE(
"Handled a total of [" << sock->m_rcv_syn_rcvd_data_q.size() <<
"] queued packets with "
672 "cumulative data size [" << sock->m_rcv_syn_rcvd_data_cumulative_size <<
"].");
674 sock->m_rcv_syn_rcvd_data_q.clear();
702 boost::shared_ptr<Data_packet> packet)
734 const bool first_time = sock->m_rcv_syn_rcvd_data_q.empty();
738 "NetFlow worker thread received [" << packet->m_type_ostream_manip <<
"] packet while "
741 "state; packet data size = [" << packet->m_data.size() <<
"]; "
742 "first time? = [" << first_time <<
"].");
746 sock->m_rcv_syn_rcvd_data_cumulative_size = 0;
748 else if ((sock->m_rcv_syn_rcvd_data_cumulative_size + packet->m_data.size())
749 > sock->opt(sock->m_opts.m_st_snd_buf_max_size))
752 FLOW_LOG_INFO(
"NetFlow worker thread received [" << packet->m_type_ostream_manip <<
"] packet while "
754 "dropping because Receive queue full at [" << sock->m_rcv_syn_rcvd_data_cumulative_size <<
"].");
759 sock->m_rcv_syn_rcvd_data_cumulative_size += packet->m_data.size();
760 sock->m_rcv_syn_rcvd_data_q.push_back(packet);
762 FLOW_LOG_TRACE(
"Receive queue now has [" << sock->m_rcv_syn_rcvd_data_q.size() <<
"] packets; "
763 "cumulative data size is [" << sock->m_rcv_syn_rcvd_data_cumulative_size <<
"].");
767 const Error_code& disconnect_cause,
bool close)
773 serv->m_disconnect_cause = disconnect_cause;
799 const bool erased = serv->m_connecting_socks.erase(sock) == 1;
805 sock->m_originating_serv.reset();
816 sock->m_originating_serv.reset();
819 serv->m_unaccepted_socks.erase(sock);
844 serv->m_unaccepted_socks.insert(sock);
847 serv->m_connecting_socks.erase(sock);
865 serv->m_connecting_socks.insert(sock);
868 sock->m_originating_serv = serv;
876 return serv_create_forward_plus_ctor_args<Server_socket>(child_sock_opts);
886 <<
"NetFlow_server [NetFlow [:" << serv->
local_port() <<
"]] @" <<
static_cast<const void*
>(serv))
887 : (os <<
"NetFlow_server@null");
895#define STATE_TO_CASE_STATEMENT(ARG_state) \
896 case Server_socket::State::S_##ARG_state: \
897 return os << #ARG_state
906 STATE_TO_CASE_STATEMENT(LISTENING);
907 STATE_TO_CASE_STATEMENT(CLOSING);
908 STATE_TO_CASE_STATEMENT(CLOSED);
911#undef STATE_TO_CASE_STATEMENT
const Component & get_log_component() const
Returns reference to the stored Component object, particularly as many FLOW_LOG_*() macros expect.
Logger * get_logger() const
Returns the stored Logger pointer, particularly as many FLOW_LOG_*() macros expect.
Interface that the user should implement, passing the implementing Logger into logging classes (Flow'...
static Congestion_control_strategy * create_strategy(Strategy_choice strategy_choice, log::Logger *logger_ptr, Peer_socket::Const_ptr sock)
Factory method that, given an enum identifying the desired strategy, allocates the appropriate Conges...
@ S_SERVER_SOCKET_ACCEPTABLE
Event type specifying the condition of interest wherein a target Server_socket serv is such that call...
An object of this class is a single Flow-protocol networking node, in the sense that: (1) it has a di...
void serv_peer_socket_init(Server_socket::Ptr serv, Peer_socket::Ptr sock)
Records a new (just received SYN) peer socket from the given server socket.
void handle_data_to_established(const Socket_id &socket_id, Peer_socket::Ptr sock, boost::shared_ptr< Data_packet > packet, bool syn_rcvd_qd_packet)
Handles a just-deserialized, just-demultiplexed, low-level DATA packet delivered to the given peer so...
virtual Server_socket * serv_create(const Peer_socket_options *child_sock_opts)
Internal factory used for ALL Server_socket objects created by this Node (including subclasses).
void serv_set_state(Server_socket::Ptr serv, Server_socket::State state)
Sets Server_socket::m_state.
static bool ensure_sock_open(Socket_ptr sock, Error_code *err_code)
Helper method that checks whether the given Peer_socket or Server_socket is CLOSED; if so,...
void handle_syn_ack_ack_to_syn_rcvd(const Socket_id &socket_id, Peer_socket::Ptr sock, boost::shared_ptr< const Syn_ack_ack_packet > syn_ack_ack)
Handles a just-deserialized, just-demultiplexed low-level SYN_ACK_ACK packet delivered to the given p...
Non_blocking_func_ret_type sync_op(typename Socket::Ptr sock, const Function< Non_blocking_func_ret_type()> &non_blocking_func, Non_blocking_func_ret_type would_block_ret_val, Event_set::Event_type ev_type, const Fine_time_pt &wait_until, Error_code *err_code)
Implementation of core blocking transfer methods, namely Peer_socket::sync_send(),...
void serv_close_detected(Server_socket::Ptr serv, const Error_code &disconnect_cause, bool close)
Records that thread W shows this socket is not to listen to incoming connections and is to abort any ...
bool async_sock_low_lvl_packet_send_paced(const Peer_socket::Ptr &sock, Low_lvl_packet::Ptr &&packet, Error_code *err_code)
Begins the process of asynchronously sending the given low-level packet to the remote Node specified ...
Server_socket::Ptr listen(flow_port_t local_port, Error_code *err_code=0, const Peer_socket_options *child_sock_opts=0)
Sets up a server on the given local Flow port and returns Server_socket which can be used to accept s...
void cancel_timers(Peer_socket::Ptr sock)
Cancel any timers and scheduled tasks active in the given socket.
bool sock_validate_options(const Peer_socket_options &opts, const Peer_socket_options *prev_opts, Error_code *err_code) const
Analogous to validate_options() but checks per-socket options instead of per-Node options.
Options_mutex m_opts_mutex
The mutex protecting m_opts.
bool serv_is_acceptable(const boost::any &serv_as_any) const
Returns true if and only if calling serv->accept() with at least some arguments would return either n...
Syn_ack_packet::Ptr create_syn_ack(Peer_socket::Const_ptr sock)
Like create_syn() but for SYN_ACK.
virtual Peer_socket * sock_create(const Peer_socket_options &opts)
Internal factory used for ALL Peer_socket objects created by this Node (including subclasses).
void async_no_sock_low_lvl_rst_send(Low_lvl_packet::Const_ptr causing_packet, const util::Udp_endpoint &low_lvl_remote_endpoint)
Sends an RST to the given UDP endpoint in response to the given incoming low-level packet that came f...
Sequence_number::Generator m_seq_num_generator
Sequence number generator (at least to generate ISNs). Only thread W can access this.
void event_set_all_check_delta(bool defer_delta_check)
For each WAITING Event_set within the Node: checks for any events that hold, and if any do hold,...
void serv_peer_socket_closed(Server_socket::Ptr serv, Peer_socket::Ptr sock)
Records that a Server_socket-contained (i.e., currently un-established, or established but not yet ac...
Socket_id_to_socket_map m_socks
The peer-to-peer connections this Node is currently tracking.
Peer_socket::Options_lock Options_lock
Short-hand for lock that acquires exclusive access to an Options_mutex.
static Socket_id socket_id(Peer_socket::Const_ptr sock)
Constructs the socket pair (connection ID) for the given socket.
void sock_set_int_state(Peer_socket::Ptr sock, Peer_socket::Int_state new_state)
Sets internal state of given socket to the given state and logs a TRACE message about it.
Peer_socket::Ptr handle_syn_to_listening_server(Server_socket::Ptr serv, boost::shared_ptr< const Syn_packet > syn, const util::Udp_endpoint &low_lvl_remote_endpoint)
Handles a just-deserialized, just-demultiplexed low-level SYN packet delivered to the given server so...
bool running() const
Returns true if and only if the Node is operating.
Port_to_server_map m_servs
The server sockets this Node is currently tracking.
Event_set::Ev_type_to_socks_map m_sock_events
All sockets that have been detected to be "ready" (by the Event_set doc header definition) at any poi...
void setup_drop_timer(const Socket_id &socket_id, Peer_socket::Ptr sock)
Creates a new Drop Timer and saves it to sock->m_snd_drop_timer.
void listen_worker(flow_port_t local_port, const Peer_socket_options *child_sock_opts, Server_socket::Ptr *serv)
Thread W implementation of listen().
void serv_peer_socket_acceptable(Server_socket::Ptr serv, Peer_socket::Ptr sock)
Records that an unestablished socket sock (Peer_socket::Int_state::S_SYN_RCVD) has just become establ...
void handle_data_to_syn_rcvd(Peer_socket::Ptr sock, boost::shared_ptr< Data_packet > packet)
Handles a just-deserialized, just-demultiplexed, low-level DATA packet delivered to the given peer so...
void setup_connection_timers(const Socket_id &socket_id, Peer_socket::Ptr sock, bool initial)
Assuming we've just sent SYN or SYN_ACK, sets up an asynchronous scheduled task to fire within some a...
size_t sock_rcv_wnd(Peer_socket::Const_ptr sock) const
Computes and returns the currently correct rcv_wnd value; that is the amount of space free in Receive...
Peer_socket::Ptr accept(Server_socket::Ptr serv, Error_code *err_code)
Implementation of non-blocking serv->accept() for server socket serv in all cases except when serv->s...
void sock_set_state(Peer_socket::Ptr sock, Peer_socket::State state, Peer_socket::Open_sub_state open_sub_state=Peer_socket::Open_sub_state::S_CONNECTED)
Sets Peer_socket::m_state and Peer_socket::m_open_sub_state.
Node_options m_opts
This Node's global set of options.
void close_empty_server_immediately(const flow_port_t local_port, Server_socket::Ptr serv, const Error_code &err_code, bool defer_delta_check)
Handles the transition of the given server socket from S_LISTENING/S_CLOSING to S_CLOSED (including e...
util::Rnd_gen_uniform_range< Peer_socket::security_token_t > m_rnd_security_tokens
Random number generator for picking security tokens; seeded on time at Node construction and generate...
util::Task_engine m_task_engine
The main loop engine, functioning in the single-threaded-but-asynchronous callback-based "reactor" st...
Port_space m_ports
Flow port space for both client and server sockets. All threads may access this.
void rst_and_close_connection_immediately(const Socket_id &socket_id, Peer_socket::Ptr sock, const Error_code &err_code, bool defer_delta_check)
Asynchronously send RST to the other side of the given socket and close_connection_immediately().
@ S_OPEN
Future reads or writes may be possible. A socket in this state may be Writable or Readable.
@ S_CONNECTED
This Peer_socket was created through a passive connect (Node::accept() and the like) or an active con...
@ S_CONNECTING
This Peer_socket was created through an active connect (Node::connect() and the like),...
util::Lock_guard< Mutex > Lock_guard
Short-hand for RAII lock guard of Mutex.
@ S_ESTABLISHED
Public state is OPEN+CONNECTED; in our opinion the connection is established.
@ S_CLOSED
Closed (dead or new) socket.
@ S_SYN_RCVD
Public state is OPEN+CONNECTING; other side requested passive connect via SYN; we sent SYN_ACK and ar...
flow_port_t reserve_port(flow_port_t port, Error_code *err_code)
Reserve the specified service port, or reserve_ephemeral_port() if the specified port is S_PORT_ANY.
void return_port(flow_port_t port, Error_code *err_code)
Return a previously reserved port (of any type).
A per-Peer_socket module that tries to estimate the bandwidth available to the outgoing flow.
Sequence_number generate_init_seq_num()
Returns an initial sequence number (ISN) for use in a new connection.
An internal net_flow sequence number identifying a piece of data.
void set_metadata(char num_line_id=0, const Sequence_number &zero_point=Sequence_number(), seq_num_delta_t multiple_size=0)
Updates the full set of metadata (used at least for convenient convention-based logging but not actua...
A server socket able to listen on a single Flow port for incoming connections and return peer sockets...
Peer_socket_ptr sync_accept(const boost::chrono::duration< Rep, Period > &max_wait, bool reactor_pattern=false, Error_code *err_code=0)
Blocking (synchronous) version of accept().
Server_socket(log::Logger *logger, const Peer_socket_options *child_sock_opts)
Constructs object; initializes most values to well-defined (0, empty, etc.) but not necessarily meani...
Mutex m_mutex
This object's mutex.
Error_code disconnect_cause() const
The error code that perviously caused state() to become State::S_CLOSED, or success code if state is ...
Peer_socket_options const *const m_child_sock_opts
Either null or the pointer to a copy of the template Peer_socket_options intended for resulting Peer_...
State m_state
See state(). Should be set before user gets access to *this. Must not be modified by non-W threads.
Peer_socket_ptr sync_accept_impl(const Fine_time_pt &wait_until, bool reactor_pattern, Error_code *err_code)
Same as sync_accept() but uses a Fine_clock-based Fine_duration non-template type for implementation ...
State
State of a Server_socket.
@ S_CLOSED
No accept()s are or will be possible, AND Node has disowned the Server_socket.
@ S_CLOSING
No accept()s are or will be possible, but Node is still finishing up the closing operation.
@ S_LISTENING
Future or current accept()s may be possible. A socket in this state may be Acceptable.
State state() const
Current State of the socket.
flow_port_t m_local_port
See local_port().
~Server_socket() override
Boring virtual destructor. Note that deletion is to be handled exclusively via shared_ptr,...
flow_port_t local_port() const
The local Flow-protocol port on which this server is or was listening.
util::Lock_guard< Mutex > Lock_guard
Short-hand for RAII lock guard of Mutex.
Node * node() const
Node that produced this Server_socket.
Error_code m_disconnect_cause
The Error_code causing this server's move from LISTENING state (if this has occurred); otherwise a cl...
Peer_socket_ptr accept(Error_code *err_code=0)
Non-blocking accept: obtain socket for the least recently established not-yet-obtained peer connectio...
static Const_ptr const_ptr_cast(const From_ptr &ptr_to_cast)
Identical to ptr_cast() but adds const-ness (immutability) to the pointed-to type.
static Ptr ptr_cast(const From_ptr &ptr_to_cast)
Provides syntactic-sugary way to perform a static_pointer_cast<> from a compatible smart pointer type...
boost::shared_ptr< Peer_socket > Ptr
Short-hand for ref-counted pointer to mutable values of type Target_type::element_type (a-la T*).
Const_target_ptr Const_ptr
Short-hand for ref-counted pointer to immutable values of type Target_type::element_type (a-la T cons...
#define FLOW_ERROR_EXEC_AND_THROW_ON_ERROR(ARG_ret_type, ARG_method_name,...)
Narrow-use macro that implements the error code/exception semantics expected of most public-facing Fl...
#define FLOW_ERROR_LOG_ERROR(ARG_val)
Logs a warning about the given error code using FLOW_LOG_WARNING().
#define FLOW_ERROR_EMIT_ERROR(ARG_val)
Sets *err_code to ARG_val and logs a warning about the error using FLOW_LOG_WARNING().
#define FLOW_ERROR_EMIT_ERROR_LOG_INFO(ARG_val)
Identical to FLOW_ERROR_EMIT_ERROR(), but the message logged has flow::log::Sev::S_INFO severity inst...
#define FLOW_LOG_INFO(ARG_stream_fragment)
Logs an INFO message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
#define FLOW_LOG_WARNING(ARG_stream_fragment)
Logs a WARNING message into flow::log::Logger *get_logger() with flow::log::Component get_log_compone...
#define FLOW_LOG_WITH_CHECKING(ARG_sev, ARG_stream_fragment)
Logs a message of the specified severity into flow::log::Logger *get_logger() with flow::log::Compone...
#define FLOW_LOG_TRACE_WITHOUT_CHECKING(ARG_stream_fragment)
Logs a TRACE message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
#define FLOW_LOG_DATA_WITHOUT_CHECKING(ARG_stream_fragment)
Logs a DATA message into flow::log::Logger *get_logger() with flow::log::Component get_log_component(...
#define FLOW_LOG_TRACE(ARG_stream_fragment)
Logs a TRACE message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
Synchronicity
Enumeration indicating the manner in which asio_exec_ctx_post(), and various boost....
void asio_exec_ctx_post(log::Logger *logger_ptr, Execution_context *exec_ctx, Synchronicity synchronicity, Task &&task)
An extension of boost.asio's post() and dispatch() free function templates, this free function templa...
@ S_DATA
Message satisfies Sev::S_TRACE description AND contains variable-length structure (like packet,...
@ S_TRACE
Message indicates any condition that may occur with great frequency (thus verbose if logged).
@ S_INFO
Message indicates a not-"bad" condition that is not frequent enough to be of severity Sev::S_TRACE.
@ S_CONN_RESET_BAD_PEER_BEHAVIOR
Connection reset because of unexpected/illegal behavior by the other side.
@ S_INTERNAL_ERROR_PORT_COLLISION
Internal error: Ephemeral port double reservation allowed.
@ S_NODE_NOT_RUNNING
Node not running.
Flow module containing the API and implementation of the Flow network protocol, a TCP-inspired stream...
uint16_t flow_port_t
Logical Flow port type (analogous to a UDP/TCP port in spirit but in no way relevant to UDP/TCP).
const flow_port_t S_PORT_ANY
Special Flow port value used to indicate "invalid port" or "please pick a random available ephemeral ...
std::ostream & operator<<(std::ostream &os, const Congestion_control_selector::Strategy_choice &strategy_choice)
Serializes a Peer_socket_options::Congestion_control_strategy_choice enum to a standard ostream – the...
bool key_exists(const Container &container, const typename Container::key_type &key)
Returns true if and only if the given key is present at least once in the given associative container...
Blob_with_log_context<> Blob
A concrete Blob_with_log_context that compile-time-disables Basic_blob::share() and the sharing API d...
boost::asio::ip::udp::endpoint Udp_endpoint
Short-hand for the UDP endpoint (IP/port) type.
boost::system::error_code Error_code
Short-hand for a boost.system error code (which basically encapsulates an integer/enum error code and...
Flow_log_component
The flow::log::Component payload enumeration comprising various log components used by Flow's own int...
Fine_clock::time_point Fine_time_pt
A high-res time point as returned by Fine_clock::now() and suitable for precise time math in general.
The data nugget uniquely identifying a peer-to-peer connection from a remote endpoint to a port in th...
Peer_socket_options m_dyn_sock_opts
The set of per-Peer_socket options in this per-Node set of options.
A set of low-level options affecting a single Peer_socket.
Represents the remote endpoint of a Flow-protocol connection; identifies the UDP endpoint of the remo...