Flow 1.0.1
Flow project: Full implementation reference.
socket_buffer.cpp
Go to the documentation of this file.
1/* Flow
2 * Copyright 2023 Akamai Technologies, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the
5 * "License"); you may not use this file except in
6 * compliance with the License. You may obtain a copy
7 * of the License at
8 *
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in
12 * writing, software distributed under the License is
13 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
14 * CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing
16 * permissions and limitations under the License. */
17
18/// @file
20
21namespace flow::net_flow
22{
23
24// Implementations.
25
26Socket_buffer::Socket_buffer(log::Logger* logger_ptr, size_t block_size_hint) :
27 log::Log_context(logger_ptr, Flow_log_component::S_NET_FLOW),
28 m_block_size_hint(block_size_hint),
29 m_data_size(0)
30{
31 FLOW_LOG_TRACE("Socket_buffer [" << this << "] created; block_size_hint = [" << block_size_hint << "].");
32}
33
35{
36 return m_data_size;
37}
38
39size_t Socket_buffer::feed_buf_move(util::Blob* data, size_t max_data_size)
40{
41 using util::Blob;
42 using boost::asio::const_buffer;
43 using boost::asio::buffer;
44
45 const size_t orig_data_size = m_data_size;
46
47 if (m_block_size_hint == 0)
48 {
49 /* In this mode, we postpone all re-packetizing until -- possibly -- the data are consume*()d.
50 * In other words, since we have advertised we will clear any data that we append to *this from
51 * *data, we can simply create a new buffer in m_q, and move (constant-time operation) *data
52 * into that buffer. Only case where we cannot do that is if that would exceed max_data_size
53 * data_size(); in that case we have to move only part of *data and therefore will have to copy. */
54
55 if (data->empty() || (m_data_size >= max_data_size))
56 {
57 return 0; // Get rid of these pathological cases right away.
58 }
59 const size_t target_space_left = max_data_size - m_data_size; // At least 1 (byte).
60
61 const size_t src_data_size = data->size();
62 assert(src_data_size > 0);
63 if (target_space_left < src_data_size)
64 {
65 // Unfortunately we'll have to move only part of *data; so we'll have to do a linear-time thing.
66 Blob* bytes = new Blob(get_logger());
67 // (All operations are max-performance:) Allocate N bytes; copy N bytes: (*data)[0, 1, ...].
68 bytes->assign_copy(const_buffer(data->const_data(), target_space_left));
69 data->erase(data->begin(), data->begin() + target_space_left);
70
71 m_q.push_back(Blob_ptr(bytes));
72 m_data_size = max_data_size;
73 }
74 else
75 {
76 // Enough space for all of *data -- so just use a constant-time swap.
77 Blob_ptr bytes_ptr(new Blob(std::move(*data))); // Move inner representation of *data into *bytes_ptr.
78 // *data empty now.
79
80 m_q.push_back(bytes_ptr);
81 m_data_size += src_data_size;
82 }
83
84 FLOW_LOG_TRACE("Socket_buffer/rcv [" << this << "]: data_size [" << m_data_size << "]; "
85 "buf_count [" << m_q.size() << "]; fast-fed (if not truncated) buffer "
86 "original/truncated size = [" << src_data_size << '/' << m_q.back()->size() << "].");
87 // Very verbose and CPU-intensive!
88 FLOW_LOG_DATA("Buffer data [" << util::buffers_dump_string(m_q.back()->const_buffer(), "", size_t(-1)) << "].");
89 }
90 else // if (m_block_size_hint != 0)
91 {
92 /* This is basically a special case of feed_bufs_copy() but with only one buffer in the buffer
93 * sequence and with the copied data then removed from that source buffer. This is the unlikely method to be
94 * used anyway, but we provide it for completeness. */
95
96 const size_t num_copied = feed_bufs_copy(buffer(data->const_data(), data->size()), max_data_size);
97 data->erase(data->begin(), data->begin() + num_copied);
98 }
99
100 return m_data_size - orig_data_size;
101} // Socket_buffer::feed_buf_move()
102
103void Socket_buffer::consume_buf_move(util::Blob* target_buf, size_t max_data_size)
104{
105 using util::Blob;
106
107 if (m_data_size == 0)
108 {
109 target_buf->clear(); // Pathological case.
110 return;
111 }
112 // else
113
114 /* This is the use case for which m_block_size_hint != 0 attempts to optimize. Since when feed*() is
115 * called in that mode, we always maintain the invariant that the m_q contains blocks of
116 * exactly m_block_size_hint bytes, except for the last block. And if one only ever calls
117 * consume_buf_move() with max_data_size == m_block_size_hint, then that invariant is always maintained.
118 * Therefore, the following if condition will always hold, which will enable us to perform
119 * an O(1) operation. */
120
121 Blob& front_buf = *m_q.front();
122 const size_t front_buf_size = front_buf.size();
123 if ((front_buf_size == max_data_size)
124 || ((front_buf_size < max_data_size) && (m_q.size() == 1)))
125 {
126 // Either the first block is perfectly sized, or it fits AND is the only one in the sequence.
127
128 *target_buf = std::move(front_buf); // Constant-time operation (swap internal buffer representations).
129 m_q.pop_front();
130 // Whatever they had in *target_buf is junk. Deleted!
131 m_data_size -= target_buf->size();
132
133 FLOW_LOG_TRACE("Socket_buffer [" << this << "]: data_size [" << m_data_size << "]; "
134 "buf_count [" << m_q.size() << "]; fast-consumed buffer of size [" << front_buf_size << "].");
135 // Very verbose and CPU-intensive!
136 FLOW_LOG_DATA("Buffer data [" << util::buffers_dump_string(target_buf->const_buffer(), "", size_t(-1)) << "].");
137 }
138 else
139 {
140 using boost::asio::buffer;
141
142 /* Either the front buffer is too big for max_data_size and thus has to be split up (left part
143 * moves, right part stays); or the front buffer is smaller than max_data_size, and there are more
144 * data in the following buffers, so we can only optimize the copying of the front buffer but not
145 * later. Thus the use case above must not be in effect, and we can just use the general
146 * consume_bufs_copy to copy/erase stuff in O(n) time. */
147
148 // consume_bufs_copy() uses Boost buffer semantics: size() of vector indicates max # bytes.
149
150 if ((!target_buf->zero()) && (target_buf->capacity() < max_data_size))
151 {
152 // *target_buf must be enlarged; but to enlarge it we must explicitly deallocate it first.
153 target_buf->make_zero();
154 }
155 target_buf->resize(max_data_size);
156
157 // Move up to max_data_size bytes into *target_buf.
158 const size_t num_copied = consume_bufs_copy(buffer(target_buf->data(), target_buf->size()));
159
160 // As advertised, fit the vector to the bytes placed (keep size() same or decrease it).
161 target_buf->resize(num_copied);
162 }
163} // Socket_buffer::consume_buf_move()
164
166{
167 return m_data_size == 0;
168}
169
171{
172 m_q.clear();
173 m_data_size = 0;
174}
175
176std::ostream& operator<<(std::ostream& os, const Socket_buffer& sock_buf)
177{
178 for (const auto& buf_ptr : sock_buf.m_q)
179 {
180 // Serialization of block will contain no newlines or unprintable characters.
181 os << '[' << util::buffers_dump_string(buf_ptr->const_buffer(), "", size_t(-1)) << "]\n";
182 }
183 return os;
184}
185
186} // namespace flow::net_flow
Logger * get_logger() const
Returns the stored Logger pointer, particularly as many FLOW_LOG_*() macros expect.
Definition: log.cpp:224
Interface that the user should implement, passing the implementing Logger into logging classes (Flow'...
Definition: log.hpp:1291
Internal net_flow class that implements a socket buffer, as used by Peer_socket for Send and Receive ...
size_t m_data_size
The total amount of data stored, in bytes, stored in this object.
void clear()
Destroys all stored data.
Socket_buffer(log::Logger *logger_ptr, size_t block_size_hint)
Initialize empty buffer.
size_t feed_buf_move(util::Blob *data, size_t max_data_size)
Feeds (adds to the back of the byte buffer) the byte sequence equal to the given byte sequence *data,...
size_t feed_bufs_copy(const Const_buffer_sequence &data, size_t max_data_size)
Feeds (adds to the back of the byte buffer) the contents of the byte stream composed of the bytes in ...
void consume_buf_move(util::Blob *target_buf, size_t max_data_size)
Consumes (removes from the front of the internal byte buffer and returns them to the caller) a byte s...
Queue m_q
The data in the buffer.
bool empty() const
Returns true if and only if data_size() == 0.
const size_t m_block_size_hint
The max_data_size argument value that the user is predicting to use when calling consume_buf_move(); ...
boost::shared_ptr< util::Blob > Blob_ptr
Short-hand for byte sequence on the heap. (Using ref-counted pointer purely for convenience....
size_t data_size() const
The total number of bytes of application-layer data stored in this object.
size_t consume_bufs_copy(const Mutable_buffer_sequence &target_bufs)
Consumes (removes from the front of the internal byte buffer and returns them to the caller) a byte s...
#define FLOW_LOG_DATA(ARG_stream_fragment)
Logs a DATA message into flow::log::Logger *get_logger() with flow::log::Component get_log_component(...
Definition: log.hpp:242
#define FLOW_LOG_TRACE(ARG_stream_fragment)
Logs a TRACE message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
Definition: log.hpp:227
Flow module containing the API and implementation of the Flow network protocol, a TCP-inspired stream...
Definition: node.cpp:25
std::ostream & operator<<(std::ostream &os, const Congestion_control_selector::Strategy_choice &strategy_choice)
Serializes a Peer_socket_options::Congestion_control_strategy_choice enum to a standard ostream – the...
Definition: cong_ctl.cpp:146
std::string buffers_dump_string(const Const_buffer_sequence &data, const std::string &indentation, size_t bytes_per_line)
Identical to buffers_to_ostream() but returns an std::string instead of writing to a given ostream.
Definition: util.hpp:481
Blob_with_log_context<> Blob
A concrete Blob_with_log_context that compile-time-disables Basic_blob::share() and the sharing API d...
Definition: blob_fwd.hpp:60
Flow_log_component
The flow::log::Component payload enumeration comprising various log components used by Flow's own int...
Definition: common.hpp:632