Flow 1.0.1
Flow project: Full implementation reference.
|
Internal net_flow
class that implements a socket buffer, as used by Peer_socket for Send and Receive buffers.
More...
#include <socket_buffer.hpp>
Public Member Functions | |
Socket_buffer (log::Logger *logger_ptr, size_t block_size_hint) | |
Initialize empty buffer. More... | |
size_t | data_size () const |
The total number of bytes of application-layer data stored in this object. More... | |
bool | empty () const |
Returns true if and only if data_size() == 0. More... | |
template<typename Const_buffer_sequence > | |
size_t | feed_bufs_copy (const Const_buffer_sequence &data, size_t max_data_size) |
Feeds (adds to the back of the byte buffer) the contents of the byte stream composed of the bytes in the given Const_buffer_sequence , in the order in which they are given – up to an internal buffer size. More... | |
size_t | feed_buf_move (util::Blob *data, size_t max_data_size) |
Feeds (adds to the back of the byte buffer) the byte sequence equal to the given byte sequence *data , up to an internal buffer size. More... | |
template<typename Mutable_buffer_sequence > | |
size_t | consume_bufs_copy (const Mutable_buffer_sequence &target_bufs) |
Consumes (removes from the front of the internal byte buffer and returns them to the caller) a byte sequence of length data_size() or the available space in target_bufs – whichever is smaller. More... | |
void | consume_buf_move (util::Blob *target_buf, size_t max_data_size) |
Consumes (removes from the front of the internal byte buffer and returns them to the caller) a byte sequence of length data_size() or max_data_size – whichever is smaller. More... | |
void | clear () |
Destroys all stored data. More... | |
Public Member Functions inherited from flow::log::Log_context | |
Log_context (Logger *logger=0) | |
Constructs Log_context by storing the given pointer to a Logger and a null Component. More... | |
template<typename Component_payload > | |
Log_context (Logger *logger, Component_payload component_payload) | |
Constructs Log_context by storing the given pointer to a Logger and a new Component storing the specified generically typed payload (an enum value). More... | |
Log_context (const Log_context &src) | |
Copy constructor that stores equal Logger* and Component values as the source. More... | |
Log_context (Log_context &&src) | |
Move constructor that makes this equal to src , while the latter becomes as-if default-constructed. More... | |
Log_context & | operator= (const Log_context &src) |
Assignment operator that behaves similarly to the copy constructor. More... | |
Log_context & | operator= (Log_context &&src) |
Move assignment operator that behaves similarly to the move constructor. More... | |
void | swap (Log_context &other) |
Swaps Logger pointers and Component objects held by *this and other . More... | |
Logger * | get_logger () const |
Returns the stored Logger pointer, particularly as many FLOW_LOG_*() macros expect. More... | |
const Component & | get_log_component () const |
Returns reference to the stored Component object, particularly as many FLOW_LOG_*() macros expect. More... | |
Private Types | |
using | Blob_ptr = boost::shared_ptr< util::Blob > |
Short-hand for byte sequence on the heap. (Using ref-counted pointer purely for convenience.) More... | |
using | Blob_const_ptr = boost::shared_ptr< const util::Blob > |
Short-hand for byte sequence on the heap. (Using ref-counted pointer purely for convenience.) More... | |
using | Queue = std::deque< Blob_ptr > |
FIFO of byte sequences, together comprising an overall byte sequence. More... | |
Static Private Member Functions | |
template<typename Const_buffer_sequence > | |
static void | copy_bytes_from_buf_seq (typename Const_buffer_sequence::const_iterator *cur_buf_it, size_t *pos_in_buf, size_t to_copy, util::Blob *dest_buf, util::Blob::Iterator dest) |
Helper that copies, to a given raw memory buffer, a given number of bytes from a given Const_buffer_sequence , starting at a given offset in a given member of that buffer sequence. More... | |
template<typename Mutable_buffer_sequence > | |
static void | copy_bytes_to_buf_seq (typename Mutable_buffer_sequence::const_iterator *cur_buf_it, size_t *pos_in_buf, size_t to_copy, const util::Blob &src_buf, util::Blob::Const_iterator src) |
Helper that copies, to a given raw memory buffer, a given number of bytes to a given Mutable_buffer_sequence , starting at a given offset in a given member of that buffer sequence. More... | |
Private Attributes | |
const size_t | m_block_size_hint |
The max_data_size argument value that the user is predicting to use when calling consume_buf_move(); or 0 if the user intends to instead use consume_bufs_copy(). More... | |
Queue | m_q |
The data in the buffer. More... | |
size_t | m_data_size |
The total amount of data stored, in bytes, stored in this object. More... | |
Friends | |
std::ostream & | operator<< (std::ostream &os, const Socket_buffer &sock_buf) |
Prints a printable representation of the data in sock_buf to the given standard ostream . More... | |
Internal net_flow
class that implements a socket buffer, as used by Peer_socket for Send and Receive buffers.
Users do not get direct access to such objects, but instead they access them indirectly through things like Peer_socket::send() and Peer_socket::receive(). Meanwhile Node modifies them directly when data are received or sent over the network.
In terms of the API semantics, the object represents a sequence of bytes. User can enqueue bytes at the back – from either a Const_buffer_sequence
(boost.asio style gather operation) or a vector of bytes. User can dequeue bytes from the front – to either a Mutable_buffer_sequence
(boost.asio style scatter operation) or a vector of bytes.
That's all quite simple, but the complexity comes from performance aspects. The goal is to avoid copying arrays (linear-time) and encourage moving them (constant-time) whenever the user enqueues or dequeues some bytes. To that end, the class can operate in one of two modes (permanently specified at construction), tailored to high performance in the two practical use cases of the class.
block_size_hint == 0
Intended for use as a Receive buffer. In this case, packetized blocks of data come from the network and can be of arbitrary sizes (though typically they'll be max-block-size in length if possible). They are then converted to a logical byte stream, and the user (via receive()) will dequeue them using a scatter operation. Since the size(s) of the buffer(s) the user will choose in the scatter receive() are entirely unpredictable, copying of the bytes during dequeuing is unavoidable. However, copying the packet blocks while enqueuing a packet upon receipt IS avoidable; since the Node no longer needs the packet data having given it to the Receive buffer, we can take ownership of the block's data without copying.
In mode 1, it is expected the class user will call feed_buf_move() for a constant-time enqueueing; and the class user will call consume_bufs_copy() for a linear-time dequeuing into the end user's (receive()
caller's) data structure.
block_size_hint > 0
Intended for use as a Send buffer. In this case, a stream of bytes comes from the end user (via send()
), in the form of a sequence of buffers of arbitrary length/structure, which is enqueued into this object. Node, when it is ready to send a packet, dequeues max-block-size bytes (or fewer, if fewer are available in Socket_buffer), packetizes them, and sends them off. Node will never ask for fewer than max-block-size bytes (in particular, it will not send anything until congestion window [CWND] has at least that much free space). Since the size(s) of the enqueued buffers are entirely unpredictable, copying of the bytes during enqueueing is unavoidable. However, copying the packets while dequeueing a packet to send IS avoidable. How? During enqueuing, we internally pack bytes into groups of block_size_hint
== max-block-size bytes. Then when a packet is dequeued, we can provide a ready-made front group of bytes with a constant-time operation.
In mode 2, it is expected the class user will call feed_bufs_copy() for a linear-time enqueueing from the end user's (send()
caller's) data structure; and the class user will call consume_buf_move() for a constant-time dequeuing into a low-level packet.
This narrow design for performance is the reason for the kind-of-odd asymmetrical set of methods for feeding and consuming bytes. The two modes differ only in performance, and technically you may mix and match the recommended calls for the 2 modes after construction. However that defeats the point of the class, so I wouldn't recommend that.
Separate objects: All functionality safe for simultaneous execution from multiple threads. Same object: Not safe to execute a non-const operation simultaneously with any other operation; otherwise safe. Rationale: While locking of Socket_buffer objects is essential, typically they must be locked together with various other external (to Socket_buffer) pieces of data (such as a given Peer_socket::m_disconnect_cause), and therefore the locking is left to the user of the class.
Definition at line 95 of file socket_buffer.hpp.
|
private |
Short-hand for byte sequence on the heap. (Using ref-counted pointer purely for convenience.)
Definition at line 247 of file socket_buffer.hpp.
|
private |
Short-hand for byte sequence on the heap. (Using ref-counted pointer purely for convenience.)
Definition at line 244 of file socket_buffer.hpp.
|
private |
FIFO of byte sequences, together comprising an overall byte sequence.
Why not just use a deque<uint8_t>
? We easily could, and it would make the code far simpler. Answer: this enables us to reduce or eliminate copying of data in certain likely use cases, chief among them when using Socket_buffer as the Send buffer. See m_q.
Why deque
instead of list
? Both offer ~O(1) push_back()
and pop_front()
. deque
is implemented, in practice, as a set of arrays and a master array with pointers to those arrays. list
is a doubly-linked list. deque
is therefore probably more compact and local in memory. list
may perform less reallocating/copying, depending on how the deque
implementation works. Using deque
on a hunch, however. Note, also, that at least gcc 4+ defaults queue<T>
to queue<T, deque>
, so I guess they also find it better on average.
Using container instead of adapter queue
due to need to iterate over the whole thing in some situations.
Definition at line 267 of file socket_buffer.hpp.
|
explicit |
Initialize empty buffer.
The block_size_hint argument is an important parameter that determines the algorithm used for internally storing the bytes supplied via feed*()
. While this has NO effect on the semantics of the output of consume*()
, it has an effect on internal performance.
See Socket_buffer class doc header for explanation of block_size_hint
.
logger_ptr | Logger to use for subsequent logging. |
block_size_hint | See above. |
Definition at line 26 of file socket_buffer.cpp.
References FLOW_LOG_TRACE.
void flow::net_flow::Socket_buffer::clear | ( | ) |
Destroys all stored data.
Definition at line 170 of file socket_buffer.cpp.
References m_data_size, and m_q.
void flow::net_flow::Socket_buffer::consume_buf_move | ( | util::Blob * | target_buf, |
size_t | max_data_size | ||
) |
Consumes (removes from the front of the internal byte buffer and returns them to the caller) a byte sequence of length data_size() or max_data_size
– whichever is smaller.
The bytes are removed from the front of the internal byte buffer and are written in the same order, starting from the first byte in target_buf
. *target_buf
is resized to the number of bytes thus consumed. If possible based on certain constraints, the operation avoids copying and has O(1) time complexity.
If block_size_hint != 0
, and consume_buf_move() is always called with max_data_size == block_size_hint
, then the time complexity is O(1) whenever this is called. In other uses cases this is not guaranteed, and the time complexity is O(n).
target_buf | Byte buffer which will be cleared and replaced with the bytes consumed from internal buffer. Therefore target_buf->size() can be used to know how many bytes were consumed. Performance note: *target_buf internal buffer may be reallocated, though this is avoided if reasonably possible (if it already has enough capacity() , it is not reallocated). |
max_data_size | data_size() will not grow beyond this. |
Definition at line 103 of file socket_buffer.cpp.
References flow::util::buffers_dump_string(), consume_bufs_copy(), FLOW_LOG_DATA, FLOW_LOG_TRACE, m_data_size, and m_q.
Referenced by flow::net_flow::Node::send_worker().
size_t flow::net_flow::Socket_buffer::consume_bufs_copy | ( | const Mutable_buffer_sequence & | target_bufs | ) |
Consumes (removes from the front of the internal byte buffer and returns them to the caller) a byte sequence of length data_size() or the available space in target_bufs
– whichever is smaller.
The bytes are removed from the front of the internal byte buffer and are written in the same order, starting from the first byte in target_bufs
. The operation involves a copy and has O(n) time complexity.
Mutable_buffer_sequence | Type that models the boost.asio MutableBufferSequence concept (see Boost docs). Basically, it's any container with elements convertible to boost::asio::mutable_buffer ; and bidirectional iterator support. Examples: vector<mutable_buffer> , list<mutable_buffer>. Why allow mutable_buffer instead of, say, Sequence of bytes? Same reason as boost.asio's receive functions: it allows a great amount of flexibility without sacrificing performance, since boost::asio::buffer() function can adapt lots of different objects (arrays, vector s, string s, and more of bytes, integers, and more). |
target_bufs | Buffer sequence into which to copy bytes. |
target_bufs
. This is quite important – without it there is absolutely no way to know how much of target_bufs
we've used up. Definition at line 543 of file socket_buffer.hpp.
References flow::util::buffers_dump_string(), FLOW_LOG_DATA, FLOW_LOG_TRACE, m_data_size, and m_q.
Referenced by consume_buf_move(), flow::net_flow::Peer_socket::receive(), and flow::net_flow::Peer_socket::sync_receive_impl().
|
staticprivate |
Helper that copies, to a given raw memory buffer, a given number of bytes from a given Const_buffer_sequence
, starting at a given offset in a given member of that buffer sequence.
It uses memcpy()
or similar for copying and makes as few calls to it as possible. It is similar to the boost::asio::buffer_copy(ConstBufferSequence -> buffer)
function but also returns the logical pointers just past the last byte copied, so that this can be resumed into some other target buffer. Of course there's a buffer_copy(ConstBufferSequence -> MutableBufferSequence)
that would do this, but it would be inefficient for us to have to create a MutableBufferSequence
.
util::Blob::emplace_copy()
and in the other direction. In that case util::Blob::assign_copy()
and all single-buffer util::Blob methods should probably be similarly generalized (overloaded).Const_buffer_sequence | See user-facing APIs. |
cur_buf_it | Pointer to iterator to the buffer in the buffer sequence from where to begin copying. When function returns, *cur_buf_it will point to the buffer containing the last byte that was copied. This can be passed to this method again to resume copying where we left off. |
pos_in_buf | Pointer to offset, in bytes, from the start of **cur_buf_it from where to resume copying. This must be <= (*cur_buf_it)->size() . If they are equal (i.e., it points just past the last byte of **cur_buf_it ), we will properly copy from the start of the next buffer. (This can happen if the last call to the same method copied exactly through the end of a buffer in the buffer sequence, and then you call this method again with the resulting *cur_buf_it and *pos_in_buf .) |
to_copy | The number of bytes to copy. The remainder of the buffer sequence (from cur_buf_it /pos_in_buf position) MUST contain at least that many bytes. This means you MUST determine the total byte size of the overall buffer sequence before calling this function. |
dest_buf | (Pointer to) buffer containing dest location; data will be written into it. Must be of sufficient size, or behavior is undefined. |
dest | Pointer to the location where bytes will be copied. |
Definition at line 624 of file socket_buffer.hpp.
|
staticprivate |
Helper that copies, to a given raw memory buffer, a given number of bytes to a given Mutable_buffer_sequence
, starting at a given offset in a given member of that buffer sequence.
It uses memcpy()
or similar for copying and makes as few calls to it as possible. It is the opposite of copy_bytes_from_buf_seq().
Mutable_buffer_sequence | See user-facing APIs. |
cur_buf_it | See copy_bytes_from_buf_seq(). |
pos_in_buf | See copy_bytes_from_buf_seq(). |
to_copy | See copy_bytes_from_buf_seq(). |
src_buf | Buffer containing src location Must be of sufficient size, or behavior is undefined. |
src | Pointer to the location from where bytes will be copied. |
Definition at line 668 of file socket_buffer.hpp.
size_t flow::net_flow::Socket_buffer::data_size | ( | ) | const |
The total number of bytes of application-layer data stored in this object.
Intended use case: estimate of memory usage of this object.
Note: this only counts the actual user bytes fed into the buffer – not other internal book-keeping. For example, if the internal storage is a doubly-linked list of byte arrays, this will not count the memory used by the previous and next pointers for each node. Rationale: if we give control over the maximum size to the net_flow
user, with these semantics it is easier to express and implement what that limit means (even if some side data are not counted as a result).
Definition at line 34 of file socket_buffer.cpp.
References m_data_size.
Referenced by flow::net_flow::Node::send_worker().
bool flow::net_flow::Socket_buffer::empty | ( | ) | const |
Returns true if and only if data_size() == 0.
Definition at line 165 of file socket_buffer.cpp.
References m_data_size.
size_t flow::net_flow::Socket_buffer::feed_buf_move | ( | util::Blob * | data, |
size_t | max_data_size | ||
) |
Feeds (adds to the back of the byte buffer) the byte sequence equal to the given byte sequence *data
, up to an internal buffer size.
Any bytes thus fed are cleared from the given buffer. Internally, as much as possible while following the described block_size_hint
constraints and the max_data_size
constraint, move semantics are used, attempting to keep time complexity at O(1).
As many bytes as possible are taken, subject to the constraint data_size() <= max_data_size
. In particular if this is already untrue, no bytes are taken. If block_size_hint == 0
, and data_size() + data->size() <= max_data_size
, then time complexity is constant. Otherwise it is linear in data->size()
.
data | Bytes will be moved from this byte byffer until exhausted or data_size() equals max_data_size ; the bytes thus moved are erase() d from *data . Why make this a vector of bytes, and not a const_buffer ? Because this allows us to use move semantics to avoid a copy. |
max_data_size | See above. |
data->size()
decreased. Definition at line 39 of file socket_buffer.cpp.
References flow::util::buffers_dump_string(), feed_bufs_copy(), FLOW_LOG_DATA, FLOW_LOG_TRACE, flow::log::Log_context::get_logger(), m_block_size_hint, m_data_size, and m_q.
size_t flow::net_flow::Socket_buffer::feed_bufs_copy | ( | const Const_buffer_sequence & | data, |
size_t | max_data_size | ||
) |
Feeds (adds to the back of the byte buffer) the contents of the byte stream composed of the bytes in the given Const_buffer_sequence
, in the order in which they are given – up to an internal buffer size.
The data are copied (O(n)) and not modified.
As many bytes as possible are copied, subject to the constraint data_size() <= max_data_size
. In particular if this is already untrue, no bytes are copied.
Const_buffer_sequence | Type that models the boost.asio ConstBufferSequence concept (see Boost docs). Basically, it's any container with elements convertible to boost::asio::const_buffer ; and bidirectional iterator support. Examples: vector<const_buffer> , list<const_buffer> . Why allow const_buffer instead of, say, Sequence of bytes? Same reason as boost.asio's send functions: it allows a great amount of flexibility without sacrificing performance, since boost::asio::buffer() function can adapt lots of different objects (arrays, vectors, strings, and more of bytes, integers, and more). |
data | Bytes will be copied from this buffer sequence until exhausted or data_size() equals max_data_size . |
max_data_size | See above. |
Definition at line 391 of file socket_buffer.hpp.
References flow::util::buffers_dump_string(), FLOW_LOG_DATA, FLOW_LOG_TRACE, flow::log::Log_context::get_logger(), m_block_size_hint, m_data_size, and m_q.
Referenced by feed_buf_move(), flow::net_flow::Peer_socket::send(), and flow::net_flow::Peer_socket::sync_send_impl().
|
friend |
Prints a printable representation of the data in sock_buf
to the given standard ostream
.
Exactly one line per block is used. This implementation is slow; use only in DATA logging, or where performance does not matter.
os | Stream to which to write. |
sock_buf | Object to serialize. |
os
. Definition at line 176 of file socket_buffer.cpp.
|
private |
The max_data_size argument value that the user is predicting to use when calling consume_buf_move(); or 0 if the user intends to instead use consume_bufs_copy().
This is explored in detail in the class doc header. Basically this should be 0, when this Socket_buffer is used as a Receive buffer; and it should be set to max-block-size, when this Socket_buffer is used as a Send buffer.
Definition at line 352 of file socket_buffer.hpp.
Referenced by feed_buf_move(), and feed_bufs_copy().
|
private |
The total amount of data stored, in bytes, stored in this object.
Invariant: m_data_size
= sum(m->size()
: for all m
in m_q
).
Definition at line 383 of file socket_buffer.hpp.
Referenced by clear(), consume_buf_move(), consume_bufs_copy(), data_size(), empty(), feed_buf_move(), and feed_bufs_copy().
|
private |
The data in the buffer.
The logical byte sequence, as visible to the outside user of the class, is obtained as follows: iterate through range [m_q[0]->begin()
, m_q[0]->end()
); iterate through range [m_q[1]->begin()
, m_q[1]->end()
); ...; etc. shared_ptr
s, instead of raw pointers, are used as elements just to avoid having to delete. Pointers are stored instead of direct vector objects to avoid any wholesale copying by the deque
machinery.
Why not just use a queue of bytes then? The code would be far simpler. Answer: the chief use case is if Socket_buffer is Send buffer, so they'll set block_size_hint == N
and use feed_bufs_copy() whenever Peer_socket::send() is called. Then we will always reserve()
N
bytes in each Blob
in m_q; as feed*()
is called, we will fill up the last Blob
sequence until all N
bytes are exhausted, in which case we'll push_back()
another N
-capacity Blob
sequence and fill that one as needed, etc. Since send()
always copies bytes from user's buffer(s) anyway, we're not adding any extra copying (only the allocation behavior is different than if we'd used a simple byte buffer instead). Now, when the Node needs to packetize a packet to send, with max-block-size == N
, it will use consume_buf_move() directly into the packet data structure. Since Node will avoid sending unless congestion window allows at least N
bytes to be sent, in consume_buf_move() it will use max_data_size == N
. Therefore, consume_buf_move() will be constant-time due to performing a constant-time vector swap. Thus exactly one data copy is performed including the original send()
call through sending bytes on the wire.
Definition at line 375 of file socket_buffer.hpp.
Referenced by clear(), consume_buf_move(), consume_bufs_copy(), feed_buf_move(), and feed_bufs_copy().