32#include <boost/unordered_map.hpp>
936 private boost::noncopyable
1137 const boost::asio::const_buffer& serialized_metadata,
1193 template<
typename Rep,
typename Period>
1214 template<
typename Rep,
typename Period>
1216 const boost::chrono::duration<Rep, Period>& max_wait,
1217 const boost::asio::const_buffer& serialized_metadata,
1251 const boost::asio::const_buffer& serialized_metadata,
1381 template<
typename Peer_socket_impl_type>
1393 template<
typename Server_socket_impl_type>
1530 template<
typename Opt_type>
1564 template<
typename Opt_type>
1565 Opt_type
opt(
const Opt_type& opt_val_ref)
const;
1631 bool is_sim_duplicate_packet =
false);
1662 bool delayed_by_pacing);
1728 const Error_code& sys_err_code,
size_t bytes_transferred);
1914 bool defer_delta_check);
1975 boost::shared_ptr<const Syn_ack_packet> syn_ack);
1989 boost::shared_ptr<const Syn_ack_packet> syn_ack);
2012 boost::shared_ptr<Data_packet> packet,
2013 bool syn_rcvd_qd_packet);
2054 boost::shared_ptr<const Data_packet> packet,
2055 bool* dupe,
bool* slide,
size_t* slide_size);
2073 boost::shared_ptr<Data_packet> packet);
2110 boost::shared_ptr<const Data_packet> packet,
2112 bool* slide,
size_t* slide_size);
2134 boost::shared_ptr<Data_packet> packet);
2249 boost::shared_ptr<const Ack_packet> ack);
2362 const boost::unordered_set<Peer_socket::order_num_t>& flying_now_acked_pkts);
2407 size_t* cong_ctl_dropped_pkts,
size_t* cong_ctl_dropped_bytes,
2408 size_t* dropped_pkts,
size_t* dropped_bytes,
2409 std::vector<Peer_socket::order_num_t>* pkts_marked_to_drop);
2460 const boost::asio::const_buffer& serialized_metadata,
2485 const boost::asio::const_buffer& serialized_metadata,
2605 const Error_code& err_code,
bool defer_delta_check);
2639 boost::shared_ptr<const Syn_ack_packet>& syn_ack);
2655 const Error_code& err_code,
bool defer_delta_check);
2686 const Function<
size_t (
size_t max_data_size)>& snd_buf_feed_func,
2779 const Function<
size_t ()>& rcv_buf_consume_func,
2974 const Error_code& disconnect_cause,
bool close);
3170 bool defer_delta_check);
3191 boost::shared_ptr<const Data_packet> data);
3219 template<
typename Packet_map_iter>
3309 boost::shared_ptr<const Syn_packet> syn,
3326 boost::shared_ptr<const Syn_ack_ack_packet> syn_ack_ack);
3340 boost::shared_ptr<Data_packet> packet);
3369 const Error_code& err_code,
bool defer_delta_check);
3501 template<
typename Socket,
typename Non_blocking_func_ret_type>
3502 Non_blocking_func_ret_type
sync_op(
typename Socket::Ptr sock,
3503 const Function<Non_blocking_func_ret_type ()>& non_blocking_func,
3504 Non_blocking_func_ret_type would_block_ret_val,
3525 template<
typename Socket_ptr>
3918 size_t hash()
const;
3955template<
typename Rep,
typename Period>
3957 const boost::chrono::duration<Rep, Period>& max_wait,
3958 const boost::asio::const_buffer& serialized_metadata,
3962 assert(max_wait.count() > 0);
3966template<
typename Rep,
typename Period>
3968 const boost::chrono::duration<Rep, Period>& max_wait,
3976template<
typename Socket,
typename Non_blocking_func_ret_type>
3978 const Function<Non_blocking_func_ret_type ()>& non_blocking_func,
3979 Non_blocking_func_ret_type would_block_ret_val,
3984 using boost::adopt_lock;
3985 using boost::chrono::milliseconds;
3986 using boost::chrono::round;
3999 return would_block_ret_val;
4019 return would_block_ret_val;
4028 event_set->close(&dummy_prevents_throw);
4032 if (!(event_set->add_wanted_socket<Socket>(sock, ev_type, err_code)))
4034 return would_block_ret_val;
4038 Non_blocking_func_ret_type op_result;
4039 const bool timeout_given = wait_until !=
Fine_time_pt();
4047 const Fine_duration time_remaining = std::max(Fine_duration::zero(), wait_until - Fine_clock::now());
4056 wait_result = event_set->sync_wait(time_remaining, err_code);
4061 wait_result = event_set->sync_wait(err_code);
4068 return would_block_ret_val;
4075 const bool active = event_set->events_detected(err_code);
4090 if (sock->m_state == Socket::State::S_CLOSED)
4092 assert(sock->m_disconnect_cause);
4093 *err_code = sock->m_disconnect_cause;
4096 return would_block_ret_val;
4100 if (non_blocking_func.empty())
4102 FLOW_LOG_TRACE(
"Sync op of type [" << ev_type <<
"] with Event_set [" << event_set <<
"] in reactor pattern "
4103 "mode on object [" << sock <<
"] successful; returning without non-blocking op.");
4105 return would_block_ret_val;
4108 op_result = non_blocking_func();
4115 return would_block_ret_val;
4134 while (op_result == would_block_ret_val);
4140template<
typename Socket_ptr>
4145 if (sock->m_state == Socket_ptr::element_type::State::S_CLOSED)
4148 assert(!sock->m_node);
4149 assert(sock->m_disconnect_cause);
4160template<
typename Opt_type>
4166 if (new_val != old_val)
4169 FLOW_LOG_WARNING(
"Option [" << opt_id_nice <<
"] is static, but attempted to change "
4170 "from [" << old_val <<
"] to [" << new_val <<
"]. Ignoring entire option set.");
4179template<
typename Opt_type>
4192template<
typename Peer_socket_impl_type>
4198template<
typename Server_socket_impl_type>
4201 return new Server_socket_impl_type(
get_logger(), child_sock_opts);
Convenience class that simply stores a Logger and/or Component passed into a constructor; and returns...
Logger * get_logger() const
Returns the stored Logger pointer, particularly as many FLOW_LOG_*() macros expect.
Interface that the user should implement, passing the implementing Logger into logging classes (Flow'...
A user-set collection of sockets and desired conditions on those sockets (such as: "socket has data t...
Event_type
Type of event or condition of interest supported by class Event_set.
Objects of this class can be fed to Node to make it internally simulate network conditions like loss,...
An object of this class is a single Flow-protocol networking node, in the sense that: (1) it has a di...
void snd_flying_pkts_updated(Peer_socket::Ptr sock, Peer_socket::Sent_pkt_ordered_by_when_const_iter pkt_begin, const Peer_socket::Sent_pkt_ordered_by_when_const_iter &pkt_end, bool added)
Updates Peer_socket::m_snd_flying_bytes according to an operation (add packets, remove packets) calle...
void low_lvl_packet_sent(Peer_socket::Ptr sock, Low_lvl_packet::Const_ptr packet, size_t bytes_expected_transferred, const Error_code &sys_err_code, size_t bytes_transferred)
Completion handler for async_low_lvl_packet_send_impl(); called when the packet is either successfull...
void serv_peer_socket_init(Server_socket::Ptr serv, Peer_socket::Ptr sock)
Records a new (just received SYN) peer socket from the given server socket.
bool categorize_individual_ack(const Socket_id &socket_id, Peer_socket::Ptr sock, Ack_packet::Individual_ack::Const_ptr ack, bool *dupe_or_late, Peer_socket::Sent_pkt_ordered_by_when_iter *acked_pkt_it)
Helper of perform_accumulated_on_recv_tasks() that categorizes the given accumulated individual ackno...
void handle_data_to_established(const Socket_id &socket_id, Peer_socket::Ptr sock, boost::shared_ptr< Data_packet > packet, bool syn_rcvd_qd_packet)
Handles a just-deserialized, just-demultiplexed, low-level DATA packet delivered to the given peer so...
bool sock_is_writable(const boost::any &sock_as_any) const
Returns true if and only if calling sock->send() with at least some arguments would return either non...
Peer_socket_info sock_info(Peer_socket::Const_ptr sock)
Implementation of sock->info() for socket sock in all cases except when sock->state() == Peer_socket:...
void async_no_sock_low_lvl_packet_send(const util::Udp_endpoint &low_lvl_remote_endpoint, Low_lvl_packet::Const_ptr packet)
async_low_lvl_packet_send_impl() wrapper to call when packet is to be sent to the remote side of the ...
void receive_wnd_updated(Peer_socket::Ptr sock)
Placed by receive() onto W if it has dequeued data from Receive buffer and given it to the user,...
virtual Server_socket * serv_create(const Peer_socket_options *child_sock_opts)
Internal factory used for ALL Server_socket objects created by this Node (including subclasses).
void serv_set_state(Server_socket::Ptr serv, Server_socket::State state)
Sets Server_socket::m_state.
void async_sock_low_lvl_packet_send(Peer_socket::Ptr sock, Low_lvl_packet::Const_ptr &&packet, bool delayed_by_pacing)
async_low_lvl_packet_send_impl() wrapper to call when packet is to be sent to the remote side of the ...
void sock_track_new_data_after_gap_rexmit_off(Peer_socket::Ptr sock, boost::shared_ptr< const Data_packet > packet, size_t data_size, bool *slide, size_t *slide_size)
Helper for handle_data_to_established() that aims to register the given DATA packet as an out-of-orde...
bool sock_data_to_reassembly_q_unless_overflow(Peer_socket::Ptr sock, boost::shared_ptr< Data_packet > packet)
Helper for handle_data_to_established() that aims to register the given DATA packet as an out-of-orde...
void perform_accumulated_on_recv_tasks()
Performs all tasks to be performed at the end of low_lvl_recv_and_handle() or async part of async_wai...
static bool ensure_sock_open(Socket_ptr sock, Error_code *err_code)
Helper method that checks whether the given Peer_socket or Server_socket is CLOSED; if so,...
void send_worker(Peer_socket::Ptr sock, bool defer_delta_check)
Thread W implemention of send(): synchronously or asynchronously send the contents of sock->m_snd_buf...
void interrupt_all_waits(Error_code *err_code=0)
Interrupts any blocking operation, a/k/a wait, and informs the invoker of that operation that the blo...
void handle_accumulated_acks(const Socket_id &socket_id, Peer_socket::Ptr sock)
Helper of perform_accumulated_on_recv_tasks() that handles any incoming acknowledgments and rcv_wnd u...
Node_options options() const
Copies this Node's option set and returns that copy.
void handle_incoming(util::Blob *packet_data, const util::Udp_endpoint &low_lvl_remote_endpoint)
Handles a just-received, not-yet-deserialized low-level packet.
~Node() override
Destroys Node.
void async_rcv_wnd_recovery(Peer_socket::Ptr sock, size_t rcv_wnd)
receive_wnd_updated() helper that continues rcv_wnd recovery: that is, sends unsolicited ACK with a r...
void log_accumulated_acks(Peer_socket::Const_ptr sock) const
Helper of handle_accumulated_acks() that logs the about-to-be-handled accumulated individual acknowle...
boost::asio::signal_set Signal_set
Short-hand for a signal set.
void sock_free_memory(Peer_socket::Ptr sock)
Helper that clears all non-O(1)-space data structures stored inside sock.
static const size_t & S_NUM_PORTS
Total number of Flow ports in the port space, including S_PORT_ANY.
bool event_set_check_baseline(Event_set::Ptr event_set)
Checks each desired (Event_set::m_want) event in event_set; any that holds true is saved into event_s...
void sock_load_info_struct(Peer_socket::Const_ptr sock, Peer_socket_info *stats) const
Given a Peer_socket, copies all stats info (as available via Peer_socket::info()) from various struct...
void log_snd_window(Peer_socket::Const_ptr sock, bool force_verbose_info_logging=false) const
Logs TRACE or DATA messages thats show the detailed state of the sending sequence number space.
void send_worker_check_state(Peer_socket::Ptr sock)
Helper placed by send() onto W to invoke send_worker() but ensures that the socket has not entered so...
size_t m_low_lvl_max_buf_size
OS-reported m_low_lvl_sock UDP receive buffer maximum size, obtained right after we OS-set that setti...
bool set_options(const Node_options &opts, Error_code *err_code=0)
Dynamically replaces the current options set (options()) with the given options set.
void sync_sock_low_lvl_rst_send(Peer_socket::Ptr sock)
Sends an RST to the other side of the given socket, synchronously.
void mark_data_packet_sent(Peer_socket::Ptr sock, const Sequence_number &seq_num)
Performs important book-keeping based on the event "DATA packet was sent to destination....
void handle_syn_ack_ack_to_syn_rcvd(const Socket_id &socket_id, Peer_socket::Ptr sock, boost::shared_ptr< const Syn_ack_ack_packet > syn_ack_ack)
Handles a just-deserialized, just-demultiplexed low-level SYN_ACK_ACK packet delivered to the given p...
Non_blocking_func_ret_type sync_op(typename Socket::Ptr sock, const Function< Non_blocking_func_ret_type()> &non_blocking_func, Non_blocking_func_ret_type would_block_ret_val, Event_set::Event_type ev_type, const Fine_time_pt &wait_until, Error_code *err_code)
Implementation of core blocking transfer methods, namely Peer_socket::sync_send(),...
void serv_close_detected(Server_socket::Ptr serv, const Error_code &disconnect_cause, bool close)
Records that thread W shows this socket is not to listen to incoming connections and is to abort any ...
static const Fine_duration S_REGULAR_INFREQUENT_TASKS_PERIOD
Time interval between performing "infrequent periodic tasks," such as stat logging.
size_t sock_max_packets_after_unrecvd_packet(Peer_socket::Const_ptr sock) const
Computes and returns the max size for Peer_socket::m_rcv_packets_with_gaps for sock.
bool async_sock_low_lvl_packet_send_paced(const Peer_socket::Ptr &sock, Low_lvl_packet::Ptr &&packet, Error_code *err_code)
Begins the process of asynchronously sending the given low-level packet to the remote Node specified ...
Peer_socket::Sent_pkt_ordered_by_when_iter categorize_pkts_as_dropped_on_acks(Peer_socket::Ptr sock, const boost::unordered_set< Peer_socket::order_num_t > &flying_now_acked_pkts)
Helper of perform_accumulated_on_recv_tasks() that determines the range of In-flight packets that sho...
void rcv_get_first_gap_info(Peer_socket::Const_ptr sock, bool *first_gap_exists, Sequence_number *seq_num_after_first_gap)
Helper for handle_data_to_established() that gets simple info about Peer_socket::m_rcv_packets_with_g...
bool snd_deqable(Peer_socket::Const_ptr sock) const
Return true if and only if there are enough data either in Peer_socket::m_snd_rexmit_q of sock (if re...
Fine_time_pt m_last_loss_sock_log_when
For debugging, when we detect loss of data we'd sent, we log the corresponding socket's state; this i...
Server_socket::Ptr listen(flow_port_t local_port, Error_code *err_code=0, const Peer_socket_options *child_sock_opts=0)
Sets up a server on the given local Flow port and returns Server_socket which can be used to accept s...
void cancel_timers(Peer_socket::Ptr sock)
Cancel any timers and scheduled tasks active in the given socket.
void sock_pacing_time_slice_end(Peer_socket::Ptr sock, const Error_code &sys_err_code)
async_sock_low_lvl_packet_send_paced() pacing helper: If sock_pacing_process_q() ran out of the last ...
void sock_rcv_buf_now_readable(Peer_socket::Ptr sock, bool syn_rcvd_qd_packet)
Helper for handle_data_to_established() that assumes the given's socket Receive buffer is currently r...
boost::shared_ptr< Net_env_simulator > m_net_env_sim
The object used to simulate stuff like packet loss and latency via local means directly in the code.
void async_wait_latency_then_handle_incoming(const Fine_duration &latency, util::Blob *packet_data, const util::Udp_endpoint &low_lvl_remote_endpoint)
Sets up handle_incoming(packet_data, low_lvl_remote_endpoint) to be called asynchronously after a spe...
void snd_flying_pkts_erase_one(Peer_socket::Ptr sock, Peer_socket::Sent_pkt_ordered_by_when_iter pkt_it)
Erases (for example if considered Acknowledged or Dropped) a packet struct from the "scoreboard" (Pee...
Opt_type opt(const Opt_type &opt_val_ref) const
Obtain a copy of the value of a given option in a thread-safe manner.
bool sock_validate_options(const Peer_socket_options &opts, const Peer_socket_options *prev_opts, Error_code *err_code) const
Analogous to validate_options() but checks per-socket options instead of per-Node options.
Options_mutex m_opts_mutex
The mutex protecting m_opts.
boost::shared_ptr< util::Timer > Timer_ptr
boost.asio timer wrapped in a ref-counted pointer.
void event_set_close(Event_set::Ptr event_set, Error_code *err_code)
Implementation of Event_set::close() when Event_set::state() != Event_set::State::S_CLOSED for event_...
void handle_accumulated_pending_acks(const Socket_id &socket_id, Peer_socket::Ptr sock)
Helper of perform_accumulated_on_recv_tasks() that handles any additional individual outgoing acknowl...
void receive_wnd_recovery_data_received(Peer_socket::Ptr sock)
Pertaining to the async_rcv_wnd_recovery() mechanism, this handles the event that we have received an...
static Peer_socket::order_num_t sock_get_new_snd_order_num(Peer_socket::Ptr sock)
Returns the "order number" to use for Peer_socket::Sent_packet::Sent_when structure corresponding to ...
Peer_socket::Options_mutex Options_mutex
Short-hand for high-performance, non-reentrant, exclusive mutex used to lock m_opts.
Peer_socket::Ptr sync_connect_impl(const Remote_endpoint &to, const Fine_duration &max_wait, const boost::asio::const_buffer &serialized_metadata, Error_code *err_code, const Peer_socket_options *opts)
Implementation core of sync_connect*() that gets rid of templated or missing arguments thereof.
void event_set_check_baseline_assuming_state(Event_set::Ptr event_set)
Helper placed by event_set_async_wait() onto thread W to invoke event_set_check_baseline() but first ...
size_t max_block_size() const
The maximum number of bytes of user data per received or sent block on connections generated from thi...
const util::Udp_endpoint & local_low_lvl_endpoint() const
Return the UDP endpoint (IP address and UDP port) which will be used for receiving incoming and sendi...
boost::asio::ip::udp::socket Udp_socket
Short-hand for UDP socket.
bool serv_is_acceptable(const boost::any &serv_as_any) const
Returns true if and only if calling serv->accept() with at least some arguments would return either n...
void snd_flying_pkts_push_one(Peer_socket::Ptr sock, const Sequence_number &seq_num, Peer_socket::Sent_packet::Ptr sent_pkt)
Adds a new packet struct (presumably representing packet to be sent shortly) to the "scoreboard" (Pee...
void worker_run(const util::Udp_endpoint low_lvl_endpoint)
Worker thread W (main event loop) body.
Syn_packet::Ptr create_syn(Peer_socket::Const_ptr sock)
Helper that creates a new SYN packet object to the extent that is suitable for immediately passing to...
boost::unique_future< Error_code > m_event_loop_ready_result
The future object through which the non-W thread waits for m_event_loop_ready to be set to success/fa...
bool sock_pacing_new_packet_ready(Peer_socket::Ptr sock, Low_lvl_packet::Ptr packet, Error_code *err_code)
async_sock_low_lvl_packet_send_paced() pacing helper: Handles a DATA or ACK packet that was just pass...
void close_abruptly(Peer_socket::Ptr sock, Error_code *err_code)
Implementation of non-blocking sock->close_abruptly() for socket sock in all cases except when sock->...
static void get_seq_num_range(const Packet_map_iter &packet_it, Sequence_number *seq_num_start, Sequence_number *seq_num_end)
Given an iterator into a Peer_socket::Sent_pkt_by_sent_when_map or Peer_socket::Recv_pkt_map,...
Peer_socket::Ptr sync_connect_with_metadata(const Remote_endpoint &to, const boost::chrono::duration< Rep, Period > &max_wait, const boost::asio::const_buffer &serialized_metadata, Error_code *err_code=0, const Peer_socket_options *opts=0)
A combination of sync_connect() and connect_with_metadata() (blocking connect, with supplied metadata...
void event_set_close_worker(Event_set::Ptr event_set)
The guts of event_set_close_worker_check_state(): same thing, but assumes Event_set::state() == Event...
Syn_ack_packet::Ptr create_syn_ack(Peer_socket::Const_ptr sock)
Like create_syn() but for SYN_ACK.
virtual Peer_socket * sock_create(const Peer_socket_options &opts)
Internal factory used for ALL Peer_socket objects created by this Node (including subclasses).
bool snd_buf_enqable(Peer_socket::Const_ptr sock) const
Return true if and only if there is enough free space in Peer_socket::m_snd_buf of sock to enqueue an...
bool can_send(Peer_socket::Const_ptr sock) const
Answers the perennial question of congestion and flow control: assuming there is a DATA packet to sen...
void async_no_sock_low_lvl_rst_send(Low_lvl_packet::Const_ptr causing_packet, const util::Udp_endpoint &low_lvl_remote_endpoint)
Sends an RST to the given UDP endpoint in response to the given incoming low-level packet that came f...
void sock_slide_rcv_next_seq_num(Peer_socket::Ptr sock, size_t slide_size, bool reassembly_in_progress)
Helper for handle_data_to_established() that aims to register a set of received DATA packet data as i...
void sock_log_detail(Peer_socket::Const_ptr sock) const
Logs a verbose state report for the given socket.
boost::unordered_map< flow_port_t, Server_socket::Ptr > Port_to_server_map
A map from the local Flow port to the local Server_socket listening on that port.
static const size_t & S_NUM_EPHEMERAL_PORTS
Total number of Flow "ephemeral" ports (ones reserved locally at random with Node::listen(S_PORT_ANY)...
boost::unordered_set< Peer_socket::Ptr > m_socks_with_accumulated_pending_acks
Within a given low_lvl_recv_and_handle() or async part of async_wait_latency_then_handle_incoming() (...
boost::unordered_set< Peer_socket::Ptr > m_socks_with_accumulated_acks
Within a given low_lvl_recv_and_handle() or async part of async_wait_latency_then_handle_incoming() c...
friend size_t hash_value(const Socket_id &socket_id)
static void advance_seq_num(Sequence_number *seq_num, boost::shared_ptr< const Data_packet > data)
Assuming *seq_num points to the start of data.m_data, increments *seq_num to point to the datum just ...
void async_low_lvl_ack_send(Peer_socket::Ptr sock, bool defer_delta_check, const Error_code &sys_err_code=Error_code())
Sends a low-level ACK packet, with all accumulated in Peer_socket::m_rcv_pending_acks of sock individ...
static Sequence_number snd_past_last_flying_datum_seq_num(Peer_socket::Const_ptr sock)
Obtain the sequence number for the datum just past the last (latest) In-flight (i....
util::Thread m_worker
Worker thread (= thread W). Other members should be initialized before this to avoid race condition.
Sequence_number::Generator m_seq_num_generator
Sequence number generator (at least to generate ISNs). Only thread W can access this.
Peer_socket::Ptr connect(const Remote_endpoint &to, Error_code *err_code=0, const Peer_socket_options *opts=0)
Initiates an active connect to the specified remote Flow server.
void event_set_all_check_delta(bool defer_delta_check)
For each WAITING Event_set within the Node: checks for any events that hold, and if any do hold,...
void serv_peer_socket_closed(Server_socket::Ptr serv, Peer_socket::Ptr sock)
Records that a Server_socket-contained (i.e., currently un-established, or established but not yet ac...
bool validate_static_option(const Opt_type &new_val, const Opt_type &old_val, const std::string &opt_id, Error_code *err_code) const
Helper that compares new_val to old_val and, if they are not equal, logs and returns an error; used t...
bool rcv_buf_deqable(Peer_socket::Const_ptr sock) const
Return true if and only if there are enough data in Peer_socket::m_rcv_buf of sock to give the user s...
void async_acknowledge_packet(Peer_socket::Ptr sock, const Sequence_number &seq_num, unsigned int rexmit_id, size_t data_size)
Causes an acknowledgment of the given received packet to be included in a future Ack_packet sent to t...
Socket_id_to_socket_map m_socks
The peer-to-peer connections this Node is currently tracking.
unsigned int handle_incoming_with_simulation(util::Blob *packet_data, const util::Udp_endpoint &low_lvl_remote_endpoint, bool is_sim_duplicate_packet=false)
Helper for low_lvl_recv_and_handle() that calls handle_incoming() on the not-yet-deserialized low-lev...
bool validate_option_check(bool check, const std::string &check_str, Error_code *err_code) const
Helper that, if the given condition is false, logs and returns an error; used to check for option val...
static const flow_port_t & S_FIRST_SERVICE_PORT
The port number of the lowest service port, making the range of service ports [S_FIRST_SERVICE_PORT,...
Peer_socket::Options_lock Options_lock
Short-hand for lock that acquires exclusive access to an Options_mutex.
void low_lvl_recv_and_handle(Error_code sys_err_code)
Handles the pre-condition that m_low_lvl_sock has a UDP packet available for reading,...
friend bool operator==(const Socket_id &lhs, const Socket_id &rhs)
static Socket_id socket_id(Peer_socket::Const_ptr sock)
Constructs the socket pair (connection ID) for the given socket.
Udp_socket m_low_lvl_sock
The UDP socket used to receive low-level packets (to assemble into application layer data) and send t...
void async_low_lvl_packet_send_impl(const util::Udp_endpoint &low_lvl_remote_endpoint, Low_lvl_packet::Const_ptr packet, bool delayed_by_pacing, Peer_socket::Ptr sock)
Takes given low-level packet structure, serializes it, and initiates asynchronous send of these data ...
void handle_syn_ack_to_syn_sent(const Socket_id &socket_id, Peer_socket::Ptr sock, boost::shared_ptr< const Syn_ack_packet > syn_ack)
Handles a just-deserialized, just-demultiplexed low-level SYN_ACK packet delivered to the given peer ...
bool event_set_async_wait(Event_set::Ptr event_set, const Event_set::Event_handler &on_event, Error_code *err_code)
Implementation of Event_set::async_wait() when Event_set::state() == Event_set::State::S_INACTIVE.
size_t send(Peer_socket::Ptr sock, const Function< size_t(size_t max_data_size)> &snd_buf_feed_func, Error_code *err_code)
Implementation of non-blocking sock->send() for socket sock in all cases except when sock->state() ==...
static const size_t & S_NUM_SERVICE_PORTS
Total number of Flow "service" ports (ones that can be reserved by number with Node::listen()).
static const flow_port_t & S_FIRST_EPHEMERAL_PORT
The port number of the lowest ephemeral Flow port, making the range of ephemeral ports [S_FIRST_EPHEM...
void sock_set_int_state(Peer_socket::Ptr sock, Peer_socket::Int_state new_state)
Sets internal state of given socket to the given state and logs a TRACE message about it.
Peer_socket * sock_create_forward_plus_ctor_args(const Peer_socket_options &opts)
Returns a raw pointer to newly created Peer_socket or sub-instance like asio::Peer_socket,...
boost::promise< Error_code > m_event_loop_ready
Promise that thread W sets to truthy Error_code if it fails to initialize or falsy once event loop is...
bool sock_is_readable(const boost::any &sock_as_any) const
Returns true if and only if calling sock->receive() with at least some arguments would return either ...
bool async_sock_low_lvl_packet_send_or_close_immediately(const Peer_socket::Ptr &sock, Low_lvl_packet::Ptr &&packet, bool defer_delta_check)
Similar to async_sock_low_lvl_packet_send_paced() except it also calls close_connection_immediately(s...
log::Logger * this_thread_init_logger_setup(const std::string &thread_type, log::Logger *logger=0)
Helper to invoke for each thread in which this Node executes, whether or not it starts that thread,...
Peer_socket::Ptr handle_syn_to_listening_server(Server_socket::Ptr serv, boost::shared_ptr< const Syn_packet > syn, const util::Udp_endpoint &low_lvl_remote_endpoint)
Handles a just-deserialized, just-demultiplexed low-level SYN packet delivered to the given server so...
bool sock_data_to_rcv_buf_unless_overflow(Peer_socket::Ptr sock, boost::shared_ptr< Data_packet > packet)
Helper for handle_data_to_established() that aims to pass the payload of the given DATA packet to the...
bool sock_set_options(Peer_socket::Ptr sock, const Peer_socket_options &opts, Error_code *err_code)
Thread W implementation of sock->set_options().
bool running() const
Returns true if and only if the Node is operating.
Port_to_server_map m_servs
The server sockets this Node is currently tracking.
bool sock_pacing_process_q(Peer_socket::Ptr sock, Error_code *err_code, bool executing_after_delay)
async_sock_low_lvl_packet_send_paced() pacing helper: Given that we are currently in the pacing time ...
Event_set::Ev_type_to_socks_map m_sock_events
All sockets that have been detected to be "ready" (by the Event_set doc header definition) at any poi...
static const uint8_t S_DEFAULT_CONN_METADATA
Type and value to supply as user-supplied metadata in SYN, if user chooses to use [[a]sync_]connect()...
const Node_options & validate_options(const Node_options &opts, bool init, Error_code *err_code) const
Given a new set of Node_options intended to replace (or initialize) a Node's m_opts,...
Server_socket * serv_create_forward_plus_ctor_args(const Peer_socket_options *child_sock_opts)
Like sock_create_forward_plus_ctor_args() but for Server_sockets.
void setup_drop_timer(const Socket_id &socket_id, Peer_socket::Ptr sock)
Creates a new Drop Timer and saves it to sock->m_snd_drop_timer.
void handle_ack_to_established(Peer_socket::Ptr sock, boost::shared_ptr< const Ack_packet > ack)
Handles a just-deserialized, just-demultiplexed, low-level ACK packet delivered to the given peer soc...
Peer_socket::Ptr sync_connect(const Remote_endpoint &to, const boost::chrono::duration< Rep, Period > &max_wait, Error_code *err_code=0, const Peer_socket_options *opts=0)
The blocking (synchronous) version of connect().
void listen_worker(flow_port_t local_port, const Peer_socket_options *child_sock_opts, Server_socket::Ptr *serv)
Thread W implementation of listen().
void handle_syn_ack_to_established(Peer_socket::Ptr sock, boost::shared_ptr< const Syn_ack_packet > syn_ack)
Handles a just-deserialized, just-demultiplexed, duplicate (equal to already-received SYN_ACK) low-le...
Signal_set m_signal_set
Signal set which we may or may not be using to trap SIGINT and SIGTERM in order to auto-fire interrup...
void serv_peer_socket_acceptable(Server_socket::Ptr serv, Peer_socket::Ptr sock)
Records that an unestablished socket sock (Peer_socket::Int_state::S_SYN_RCVD) has just become establ...
void handle_data_to_syn_rcvd(Peer_socket::Ptr sock, boost::shared_ptr< Data_packet > packet)
Handles a just-deserialized, just-demultiplexed, low-level DATA packet delivered to the given peer so...
boost::unordered_map< Socket_id, Peer_socket::Ptr > Socket_id_to_socket_map
A map from the connection ID (= remote-local socket pair) to the local Peer_socket that is the local ...
void setup_connection_timers(const Socket_id &socket_id, Peer_socket::Ptr sock, bool initial)
Assuming we've just sent SYN or SYN_ACK, sets up an asynchronous scheduled task to fire within some a...
void log_rcv_window(Peer_socket::Const_ptr sock, bool force_verbose_info_logging=false) const
Logs TRACE or DATA messages that show the detailed state of the receiving sequence number space.
size_t sock_rcv_wnd(Peer_socket::Const_ptr sock) const
Computes and returns the currently correct rcv_wnd value; that is the amount of space free in Receive...
Peer_socket::Ptr accept(Server_socket::Ptr serv, Error_code *err_code)
Implementation of non-blocking serv->accept() for server socket serv in all cases except when serv->s...
void connect_worker(const Remote_endpoint &to, const boost::asio::const_buffer &serialized_metadata, const Peer_socket_options *opts, Peer_socket::Ptr *sock)
Thread W implementation of connect().
bool drop_pkts_on_acks(Peer_socket::Ptr sock, const Peer_socket::Sent_pkt_ordered_by_when_iter &last_dropped_pkt_it, size_t *cong_ctl_dropped_pkts, size_t *cong_ctl_dropped_bytes, size_t *dropped_pkts, size_t *dropped_bytes, std::vector< Peer_socket::order_num_t > *pkts_marked_to_drop)
Helper of perform_accumulated_on_recv_tasks() that acts on the determination made by categorize_pkts_...
static const Peer_socket::Sent_packet::ack_count_t S_MAX_LATER_ACKS_BEFORE_CONSIDERING_DROPPED
For a given unacknowledged sent packet P, the maximum number of times any individual packet with high...
bool async_low_lvl_syn_ack_ack_send_or_close_immediately(const Peer_socket::Ptr &sock, boost::shared_ptr< const Syn_ack_packet > &syn_ack)
Helper to create, fully fill out, and asynchronously send via async_sock_low_lvl_packet_send_or_close...
Node(log::Logger *logger, const util::Udp_endpoint &low_lvl_endpoint, Net_env_simulator *net_env_sim=0, Error_code *err_code=0, const Node_options &opts=Node_options())
Constructs Node.
util::Blob m_packet_data
Stores incoming raw packet data; re-used repeatedly for possible performance gains.
Error_code sock_categorize_data_to_established(Peer_socket::Ptr sock, boost::shared_ptr< const Data_packet > packet, bool *dupe, bool *slide, size_t *slide_size)
Helper for handle_data_to_established() that categorizes the DATA packet received as either illegal; ...
void async_sock_low_lvl_rst_send(Peer_socket::Ptr sock)
Sends an RST to the other side of the given socket asynchronously when possible.
Event_set::Ptr event_set_create(Error_code *err_code=0)
Creates a new Event_set in Event_set::State::S_INACTIVE state with no sockets/events stored; returns ...
void interrupt_all_waits_worker()
Thread W implementation of interrupt_all_waits().
void sock_set_state(Peer_socket::Ptr sock, Peer_socket::State state, Peer_socket::Open_sub_state open_sub_state=Peer_socket::Open_sub_state::S_CONNECTED)
Sets Peer_socket::m_state and Peer_socket::m_open_sub_state.
void receive_emptied_rcv_buf_while_disconnecting(Peer_socket::Ptr sock)
Placed by receive() onto W during a graceful close, after the Receive buffer had been emptied by the ...
void sock_disconnect_detected(Peer_socket::Ptr sock, const Error_code &disconnect_cause, bool close)
Records that thread W shows underlying connection is broken (graceful termination,...
size_t receive(Peer_socket::Ptr sock, const Function< size_t()> &rcv_buf_consume_func, Error_code *err_code)
Implementation of non-blocking sock->receive() for socket sock in all cases except when sock->state()...
void handle_connection_rexmit_timer_event(const Socket_id &socket_id, Peer_socket::Ptr sock)
Handles the triggering of the retransmit timer wait set up by setup_connection_timers(); it will re-s...
Node_options m_opts
This Node's global set of options.
void async_low_lvl_recv()
Registers so that during the current or next m_task_engine.run(), the latter will wait for a receivab...
void close_connection_immediately(const Socket_id &socket_id, Peer_socket::Ptr sock, const Error_code &err_code, bool defer_delta_check)
A thread W method that handles the transition of the given socket from OPEN (any sub-state) to CLOSED...
void sock_disconnect_completed(Peer_socket::Ptr sock)
While in S_OPEN+S_DISCONNECTING state (i.e., after beginning a graceful close with sock_disconnect_de...
Fine_duration compute_rtt_on_ack(Peer_socket::Sent_packet::Const_ptr flying_pkt, const Fine_time_pt &time_now, Ack_packet::Individual_ack::Const_ptr ack, const Peer_socket::Sent_packet::Sent_when **sent_when) const
Helper of perform_accumulated_on_recv_tasks() that computes the RTT implied by a given individual ack...
void close_empty_server_immediately(const flow_port_t local_port, Server_socket::Ptr serv, const Error_code &err_code, bool defer_delta_check)
Handles the transition of the given server socket from S_LISTENING/S_CLOSING to S_CLOSED (including e...
Event_sets m_event_sets
Every Event_set to have been returned by event_set_create() and not subsequently reached Event_set::S...
util::Rnd_gen_uniform_range< Peer_socket::security_token_t > m_rnd_security_tokens
Random number generator for picking security tokens; seeded on time at Node construction and generate...
void interrupt_all_waits_internal_sig_handler(const Error_code &sys_err_code, int sig_number)
signal_set handler, executed on SIGINT and SIGTERM, if user has enabled this feature: causes interrup...
Peer_socket::Ptr connect_with_metadata(const Remote_endpoint &to, const boost::asio::const_buffer &serialized_metadata, Error_code *err_code=0, const Peer_socket_options *opts=0)
Same as connect() but sends, as part of the connection handshake, the user-supplied metadata,...
void new_round_trip_time_sample(Peer_socket::Ptr sock, Fine_duration round_trip_time)
Handles a just-computed new RTT (round trip time) measurement for an individual packet earlier sent: ...
void perform_regular_infrequent_tasks(bool reschedule)
Performs low-priority tasks that should be run on an infrequent, regular basis, such as stat logging ...
util::Udp_endpoint m_low_lvl_endpoint
After we bind m_low_lvl_sock to a UDP endpoint, this is a copy of that endpoint.
bool ok_to_rexmit_or_close(Peer_socket::Ptr sock, const Peer_socket::Sent_pkt_ordered_by_when_iter &pkt_it, bool defer_delta_check)
Checks whether the given sent packet has been retransmitted the maximum number of allowed times; if s...
util::Task_engine m_task_engine
The main loop engine, functioning in the single-threaded-but-asynchronous callback-based "reactor" st...
void sock_pacing_new_time_slice(Peer_socket::Ptr sock, const Fine_time_pt &now)
async_sock_low_lvl_packet_send_paced() pacing helper: Resets the socket's Send_pacing_data structure ...
Port_space m_ports
Flow port space for both client and server sockets. All threads may access this.
void event_set_fire_if_got_events(Event_set::Ptr event_set)
Check whether given Event_set contains any active sockets (Event_set::m_can); if so,...
void rst_and_close_connection_immediately(const Socket_id &socket_id, Peer_socket::Ptr sock, const Error_code &err_code, bool defer_delta_check)
Asynchronously send RST to the other side of the given socket and close_connection_immediately().
boost::unordered_set< Event_set::Ptr > Event_sets
A set of Event_set objects.
void drop_timer_action(Peer_socket::Ptr sock, bool drop_all_packets)
Handles a Drop_timer (Peer_socket::m_snd_drop_timer) event in ESTABLISHED state by dropping the speci...
A peer (non-server) socket operating over the Flow network protocol, with optional stream-of-bytes an...
util::Mutex_non_recursive Options_mutex
Short-hand for high-performance, non-reentrant, exclusive mutex used to lock m_opts.
State
State of a Peer_socket.
Open_sub_state
The sub-state of a Peer_socket when state is State::S_OPEN.
@ S_CONNECTED
This Peer_socket was created through a passive connect (Node::accept() and the like) or an active con...
Sequence_number::seq_num_t order_num_t
Short-hand for order number type. 0 is reserved. Caution: Keep in sync with Drop_timer::packet_id_t.
Int_state
The state of the socket (and the connection from this end's point of view) for the internal state mac...
util::Lock_guard< Options_mutex > Options_lock
Short-hand for lock that acquires exclusive access to an Options_mutex.
Sent_pkt_by_sent_when_map::iterator Sent_pkt_ordered_by_when_iter
Short-hand for m_snd_flying_pkts_by_sent_when iterator type.
Sent_pkt_by_sent_when_map::const_iterator Sent_pkt_ordered_by_when_const_iter
Short-hand for m_snd_flying_pkts_by_sent_when const iterator type.
Internal net_flow class that maintains the available Flow-protocol port space, somewhat similarly to ...
An object of this type generates a series of initial sequence numbers (ISN) that are meant to be suff...
An internal net_flow sequence number identifying a piece of data.
A server socket able to listen on a single Flow port for incoming connections and return peer sockets...
State
State of a Server_socket.
An empty interface, consisting of nothing but a default virtual destructor, intended as a boiler-plat...
boost::shared_ptr< Peer_socket > Ptr
Short-hand for ref-counted pointer to mutable values of type Target_type::element_type (a-la T*).
Const_target_ptr Const_ptr
Short-hand for ref-counted pointer to immutable values of type Target_type::element_type (a-la T cons...
#define FLOW_ERROR_EMIT_ERROR(ARG_val)
Sets *err_code to ARG_val and logs a warning about the error using FLOW_LOG_WARNING().
#define FLOW_ERROR_EMIT_ERROR_LOG_INFO(ARG_val)
Identical to FLOW_ERROR_EMIT_ERROR(), but the message logged has flow::log::Sev::S_INFO severity inst...
#define FLOW_LOG_WARNING(ARG_stream_fragment)
Logs a WARNING message into flow::log::Logger *get_logger() with flow::log::Component get_log_compone...
#define FLOW_LOG_SET_CONTEXT(ARG_logger_ptr, ARG_component_payload)
For the rest of the block within which this macro is instantiated, causes all FLOW_LOG_....
#define FLOW_LOG_TRACE(ARG_stream_fragment)
Logs a TRACE message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
@ S_STATIC_OPTION_CHANGED
When setting options, tried to set an unchangeable (static) option.
@ S_NODE_NOT_RUNNING
Node not running.
Flow module containing the API and implementation of the Flow network protocol, a TCP-inspired stream...
uint16_t flow_port_t
Logical Flow port type (analogous to a UDP/TCP port in spirit but in no way relevant to UDP/TCP).
size_t hash_value(const Sequence_number &seq_num)
Free function that returns seq_num.hash(); has to be a free function named hash_value for boost....
bool operator==(const Remote_endpoint &lhs, const Remote_endpoint &rhs)
Whether lhs is equal to rhs.
const flow_port_t S_PORT_ANY
Special Flow port value used to indicate "invalid port" or "please pick a random available ephemeral ...
Auto_cleanup setup_auto_cleanup(const Cleanup_func &func)
Provides a way to execute arbitrary (cleanup) code at the exit of the current block.
boost::unique_lock< Mutex > Lock_guard
Short-hand for advanced-capability RAII lock guard for any mutex, ensuring exclusive ownership of tha...
Fine_duration chrono_duration_to_fine_duration(const boost::chrono::duration< Rep, Period > &dur)
Helper that takes a non-negative duration of arbitrary precision/period and converts it to Fine_durat...
boost::shared_ptr< void > Auto_cleanup
Helper type for setup_auto_cleanup().
boost::asio::io_service Task_engine
Short-hand for boost.asio event service, the central class of boost.asio.
boost::thread Thread
Short-hand for standard thread class.
Blob_with_log_context<> Blob
A concrete Blob_with_log_context that compile-time-disables Basic_blob::share() and the sharing API d...
boost::asio::ip::udp::endpoint Udp_endpoint
Short-hand for the UDP endpoint (IP/port) type.
boost::system::error_code Error_code
Short-hand for a boost.system error code (which basically encapsulates an integer/enum error code and...
Fine_clock::duration Fine_duration
A high-res time duration as computed from two Fine_time_pts.
Fine_clock::time_point Fine_time_pt
A high-res time point as returned by Fine_clock::now() and suitable for precise time math in general.
unsigned char uint8_t
Byte. Best way to represent a byte of binary data. This is 8 bits on all modern systems.
boost::shared_ptr< const Individual_ack > Const_ptr
Short-hand for ref-counted pointer to immutable objects of this class.
The data nugget uniquely identifying a peer-to-peer connection from a remote endpoint to a port in th...
const Remote_endpoint m_remote_endpoint
The other side of the connection.
size_t hash() const
Hash value of this Socket_id for unordered<>.
const flow_port_t m_local_port
This side of the connection (within this Node).
A set of low-level options affecting a single Flow Node, including Peer_socket objects and other obje...
static std::string opt_id_to_str(const std::string &opt_id)
Helper that, for a given option m_blah, takes something like "m_blah_blah" and returns the similar mo...
Data store to keep timing related info when a packet is sent out.
uint16_t ack_count_t
Type used for m_acks_after_me.
A data store that keeps stats about the a Peer_socket connection.
A set of low-level options affecting a single Peer_socket.
Represents the remote endpoint of a Flow-protocol connection; identifies the UDP endpoint of the remo...