Flow 2.0.0
Flow project: Full implementation reference.
socket_buffer.hpp
Go to the documentation of this file.
1/* Flow
2 * Copyright 2023 Akamai Technologies, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the
5 * "License"); you may not use this file except in
6 * compliance with the License. You may obtain a copy
7 * of the License at
8 *
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in
12 * writing, software distributed under the License is
13 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
14 * CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing
16 * permissions and limitations under the License. */
17
18/// @file
19#pragma once
20
22#include "flow/log/log.hpp"
23#include "flow/util/blob.hpp"
24#include "flow/util/util.hpp"
25#include <boost/asio.hpp>
26#include <boost/shared_ptr.hpp>
27#include <vector>
28#include <deque>
29#include <ostream>
30
31namespace flow::net_flow
32{
33
34/**
35 * Internal `net_flow` class that implements a socket buffer, as used by Peer_socket for Send and
36 * Receive buffers. Users do not get direct access to such objects, but instead they access them
37 * indirectly through things like Peer_socket::send() and Peer_socket::receive(). Meanwhile
38 * Node modifies them directly when data are received or sent over the network.
39 *
40 * In terms of the API semantics, the object represents a sequence of bytes. User can enqueue bytes
41 * at the back -- from either a `Const_buffer_sequence` (boost.asio style gather operation) or a
42 * vector of bytes. User can dequeue bytes from the front -- to either a `Mutable_buffer_sequence`
43 * (boost.asio style scatter operation) or a vector of bytes.
44 *
45 * That's all quite simple, but the complexity comes from performance aspects. The goal is to avoid
46 * copying arrays (linear-time) and encourage moving them (constant-time) whenever the user enqueues
47 * or dequeues some bytes. To that end, the class can operate in one of two modes (permanently
48 * specified at construction), tailored to high performance in the two practical use cases of the
49 * class.
50 *
51 * #### Mode 1: `block_size_hint == 0` ####
52 * Intended for use as a Receive buffer. In this case, packetized
53 * blocks of data come from the network and can be of arbitrary sizes (though typically they'll be
54 * max-block-size in length if possible). They are then converted to a logical byte stream, and the
55 * user (via receive()) will dequeue them using a scatter operation. Since the size(s) of the
56 * buffer(s) the user will choose in the scatter receive() are entirely unpredictable, copying of
57 * the bytes during dequeuing is unavoidable. However, copying the packet blocks while enqueuing a
58 * packet upon receipt IS avoidable; since the Node no longer needs the packet data having given it
59 * to the Receive buffer, we can take ownership of the block's data without copying.
60 *
61 * In mode 1, it is expected the class user will call feed_buf_move() for a constant-time
62 * enqueueing; and the class user will call consume_bufs_copy() for a linear-time dequeuing into the
63 * end user's (`receive()` caller's) data structure.
64 *
65 * #### Mode 2: `block_size_hint > 0` ####
66 * Intended for use as a Send buffer. In this case, a stream of bytes
67 * comes from the end user (via `send()`), in the form of a sequence of buffers of arbitrary
68 * length/structure, which is enqueued into this object. Node, when it is ready to send a packet,
69 * dequeues max-block-size bytes (or fewer, if fewer are available in Socket_buffer), packetizes
70 * them, and sends them off. Node will never ask for fewer than max-block-size bytes (in
71 * particular, it will not send anything until congestion window [CWND] has at least that much free
72 * space). Since the size(s) of the enqueued buffers are entirely unpredictable, copying of the
73 * bytes during enqueueing is unavoidable. However, copying the packets while dequeueing a packet
74 * to send IS avoidable. How? During enqueuing, we internally pack bytes into groups of
75 * `block_size_hint` == max-block-size bytes. Then when a packet is dequeued, we can provide a
76 * ready-made front group of bytes with a constant-time operation.
77 *
78 * In mode 2, it is expected the class user will call feed_bufs_copy() for a linear-time enqueueing
79 * from the end user's (`send()` caller's) data structure; and the class user will call
80 * consume_buf_move() for a constant-time dequeuing into a low-level packet.
81 *
82 * This narrow design for performance is the reason for the kind-of-odd asymmetrical set of methods
83 * for feeding and consuming bytes. The two modes differ only in performance, and technically you
84 * may mix and match the recommended calls for the 2 modes after construction. However that defeats
85 * the point of the class, so I wouldn't recommend that.
86 *
87 * ### Thread safety ###
88 * Separate objects: All functionality safe for simultaneous execution from multiple
89 * threads. Same object: Not safe to execute a non-const operation simultaneously with any other
90 * operation; otherwise safe. Rationale: While locking of Socket_buffer objects is essential, typically
91 * they must be locked together with various other external (to Socket_buffer) pieces of data (such
92 * as a given Peer_socket::m_disconnect_cause), and therefore the locking is left to the user of
93 * the class.
94 */
96 public log::Log_context,
97 private boost::noncopyable // Definitely discourage copying. If we make copyable, it'll be conscious design choice.
98{
99public:
100 // Constructors/destructor.
101
102 /**
103 * Initialize empty buffer. The block_size_hint argument is an important parameter that
104 * determines the algorithm used for internally storing the bytes supplied via `feed*()`. While
105 * this has NO effect on the semantics of the output of `consume*()`, it has an effect on internal
106 * performance.
107 *
108 * See Socket_buffer class doc header for explanation of `block_size_hint`.
109 *
110 * @param logger_ptr
111 * Logger to use for subsequent logging.
112 * @param block_size_hint
113 * See above.
114 */
115 explicit Socket_buffer(log::Logger* logger_ptr, size_t block_size_hint);
116
117 // Methods.
118
119 /**
120 * The total number of bytes of application-layer data stored in this object. Intended use case:
121 * estimate of memory usage of this object.
122 *
123 * Note: this only counts the actual user bytes fed into the buffer -- not other internal
124 * book-keeping. For example, if the internal storage is a doubly-linked list of byte arrays,
125 * this will not count the memory used by the previous and next pointers for each node.
126 * Rationale: if we give control over the maximum size to the `net_flow` user, with these semantics
127 * it is easier to express and implement what that limit means (even if some side data are not
128 * counted as a result).
129 *
130 * @return See above.
131 */
132 size_t data_size() const;
133
134 /**
135 * Returns true if and only if data_size() == 0.
136 *
137 * @return Ditto.
138 */
139 bool empty() const;
140
141 /**
142 * Feeds (adds to the back of the byte buffer) the contents of the byte stream composed of the
143 * bytes in the given `Const_buffer_sequence`, in the order in which they are given -- up to an
144 * internal buffer size. The data are copied (O(n)) and not modified.
145 *
146 * As many bytes as possible are copied, subject to the constraint `data_size() <= max_data_size`.
147 * In particular if this is already untrue, no bytes are copied.
148 *
149 * @tparam Const_buffer_sequence
150 * Type that models the boost.asio `ConstBufferSequence` concept (see Boost docs).
151 * Basically, it's any container with elements convertible to `boost::asio::const_buffer`;
152 * and bidirectional iterator support. Examples: `vector<const_buffer>`, `list<const_buffer>`.
153 * Why allow `const_buffer` instead of, say, `Sequence` of bytes? Same reason as boost.asio's
154 * send functions: it allows a great amount of flexibility without sacrificing performance,
155 * since `boost::asio::buffer()` function can adapt lots of different objects (arrays,
156 * vectors, strings, and more of bytes, integers, and more).
157 * @param data
158 * Bytes will be copied from this buffer sequence until exhausted or data_size() equals
159 * `max_data_size`.
160 * @param max_data_size
161 * See above.
162 * @return Number of bytes (possibly zero) by which data_size() increased.
163 */
164 template<typename Const_buffer_sequence>
165 size_t feed_bufs_copy(const Const_buffer_sequence& data, size_t max_data_size);
166
167 /**
168 * Feeds (adds to the back of the byte buffer) the byte sequence equal to the given byte sequence
169 * `*data`, up to an internal buffer size. Any bytes thus fed are cleared from the given buffer.
170 * Internally, as much as possible while following the described `block_size_hint` constraints and
171 * the `max_data_size` constraint, move semantics are used, attempting to keep time complexity at
172 * @em O(1).
173 *
174 * As many bytes as possible are taken, subject to the constraint `data_size() <= max_data_size`.
175 * In particular if this is already untrue, no bytes are taken. If `block_size_hint == 0`, and
176 * `data_size() + data->size() <= max_data_size`, then time complexity is constant. Otherwise it is
177 * linear in `data->size()`.
178 *
179 * @param data
180 * Bytes will be moved from this byte byffer until exhausted or data_size() equals
181 * `max_data_size`; the bytes thus moved are `erase()`d from `*data`.
182 * Why make this a `vector` of bytes, and not a `const_buffer`? Because this allows us to use
183 * move semantics to avoid a copy.
184 * @param max_data_size
185 * See above.
186 * @return Number of bytes (possibly zero) by which data_size() increased, which equals # of bytes
187 * by which `data->size()` decreased.
188 */
189 size_t feed_buf_move(util::Blob* data, size_t max_data_size);
190
191 /**
192 * Consumes (removes from the front of the internal byte buffer and returns them to the caller) a
193 * byte sequence of length data_size() or the available space in `target_bufs` -- whichever is
194 * smaller. The bytes are removed from the front of the internal byte buffer and are written in
195 * the same order, starting from the first byte in `target_bufs`. The operation involves a copy and
196 * has @em O(n) time complexity.
197 *
198 * @tparam Mutable_buffer_sequence
199 * Type that models the boost.asio `MutableBufferSequence` concept (see Boost docs).
200 * Basically, it's any container with elements convertible to `boost::asio::mutable_buffer`;
201 * and bidirectional iterator support. Examples: `vector<mutable_buffer>`,
202 * `list<mutable_buffer>.` Why allow `mutable_buffer` instead of, say, `Sequence` of bytes?
203 * Same reason as boost.asio's receive functions: it allows a great amount of flexibility
204 * without sacrificing performance, since `boost::asio::buffer()` function can adapt lots of
205 * different objects (arrays, `vector`s, `string`s, and more of bytes, integers, and more).
206 * @param target_bufs
207 * Buffer sequence into which to copy bytes.
208 * @return Number of bytes (possibly zero) by which data_size() decreased, which equals the # of
209 * bytes copied into `target_bufs`. This is quite important -- without it there is
210 * absolutely no way to know how much of `target_bufs` we've used up.
211 */
212 template<typename Mutable_buffer_sequence>
213 size_t consume_bufs_copy(const Mutable_buffer_sequence& target_bufs);
214
215 /**
216 * Consumes (removes from the front of the internal byte buffer and returns them to the caller) a
217 * byte sequence of length data_size() or `max_data_size` -- whichever is smaller. The bytes are
218 * removed from the front of the internal byte buffer and are written in the same order, starting
219 * from the first byte in `target_buf`. `*target_buf` is resized to the number of bytes thus
220 * consumed. If possible based on certain constraints, the operation avoids copying and has @em O(1)
221 * time complexity.
222 *
223 * If `block_size_hint != 0`, and consume_buf_move() is always called with `max_data_size ==
224 * block_size_hint`, then the time complexity is @em O(1) whenever this is called. In other uses cases
225 * this is not guaranteed, and the time complexity is @em O(n).
226 *
227 * @param target_buf
228 * Byte buffer which will be cleared and replaced with the bytes consumed from internal
229 * buffer. Therefore `target_buf->size()` can be used to know how many bytes were consumed.
230 * Performance note: `*target_buf` internal buffer may be reallocated, though this is avoided if reasonably
231 * possible (if it already has enough `capacity()`, it is not reallocated).
232 * @param max_data_size
233 * data_size() will not grow beyond this.
234 */
235 void consume_buf_move(util::Blob* target_buf, size_t max_data_size);
236
237 /// Destroys all stored data.
238 void clear();
239
240private:
241 // Types.
242
243 /// Short-hand for byte sequence on the heap. (Using ref-counted pointer purely for convenience.)
244 using Blob_ptr = boost::shared_ptr<util::Blob>;
245
246 /// Short-hand for byte sequence on the heap. (Using ref-counted pointer purely for convenience.)
247 using Blob_const_ptr = boost::shared_ptr<const util::Blob>;
248
249 /**
250 * FIFO of byte sequences, together comprising an overall byte sequence. Why not just use a
251 * `deque<uint8_t>`? We easily could, and it would make the code far simpler. Answer: this enables
252 * us to reduce or eliminate copying of data in certain likely use cases, chief among them when
253 * using Socket_buffer as the Send buffer. See #m_q.
254 *
255 * Why `deque` instead of `list`? Both offer ~O(1) `push_back()` and `pop_front()`. `deque` is
256 * implemented, in practice, as a set of arrays and a master array with pointers to those arrays.
257 * `list` is a doubly-linked list. `deque` is therefore probably more compact and local in memory.
258 * `list` may perform less reallocating/copying, depending on how the `deque` implementation works.
259 * Using `deque` on a hunch, however. Note, also, that at least gcc 4+ defaults `queue<T>` to
260 * `queue<T, deque>`, so I guess they also find it better on average.
261 *
262 * Using container instead of adapter `queue` due to need to iterate over the whole thing in some
263 * situations.
264 *
265 * @todo Investigate pros and cons of deque vs. list empirically.
266 */
267 using Queue = std::deque<Blob_ptr>;
268
269 // Friends.
270
271 // Friend of Socket_buffer: For access to our internals.
272 friend std::ostream& operator<<(std::ostream& os, const Socket_buffer& sock_buf);
273
274 // Methods.
275
276 /**
277 * Helper that copies, to a given raw memory buffer, a given number of bytes from a given
278 * `Const_buffer_sequence`, starting at a given offset in a given member of that buffer sequence.
279 * It uses `memcpy()` or similar for copying and makes as few calls to it as possible. It is similar to the
280 * `boost::asio::buffer_copy(ConstBufferSequence -> buffer)` function but also returns the logical
281 * pointers just past the last byte copied, so that this can be resumed into some other target
282 * buffer. Of course there's a `buffer_copy(ConstBufferSequence -> MutableBufferSequence)` that
283 * would do this, but it would be inefficient for us to have to create a `MutableBufferSequence`.
284 *
285 * @todo It would not be crazy to move copy_bytes_from_buf_seq() and copy_bytes_to_buf_seq() into util::Blob to make
286 * it more widely available, though as of this writing there is no demand for this: perhaps a buffer-sequence
287 * version of `util::Blob::emplace_copy()` and in the other direction. In that case `util::Blob::assign_copy()`
288 * and all single-buffer util::Blob methods should probably be similarly generalized (overloaded).
289 *
290 * @tparam Const_buffer_sequence
291 * See user-facing APIs.
292 * @tparam Const_it
293 * Iterator type suitable for traversing `Const_buffer_sequence`.
294 * @param cur_buf_it
295 * Pointer to iterator to the buffer in the buffer sequence from where to begin copying.
296 * When function returns, `*cur_buf_it` will point to the buffer containing the last byte
297 * that was copied. This can be passed to this method again to resume copying where we
298 * left off.
299 * @param pos_in_buf
300 * Pointer to offset, in bytes, from the start of `**cur_buf_it` from where to resume
301 * copying. This must be `<= (*cur_buf_it)->size()`. If they are equal (i.e., it points
302 * just past the last byte of `**cur_buf_it`), we will properly copy from the start of the
303 * next buffer. (This can happen if the last call to the same method copied exactly
304 * through the end of a buffer in the buffer sequence, and then you call this method again
305 * with the resulting `*cur_buf_it` and `*pos_in_buf`.)
306 * @param to_copy
307 * The number of bytes to copy. The remainder of the buffer sequence (from
308 * `cur_buf_it`/`pos_in_buf` position) MUST contain at least that many bytes. This means you
309 * MUST determine the total byte size of the overall buffer sequence before calling this
310 * function.
311 * @param dest_buf
312 * (Pointer to) buffer containing `dest` location; data will be written into it. Must be of
313 * sufficient size, or behavior is undefined.
314 * @param dest
315 * Pointer to the location where bytes will be copied.
316 */
317 template<typename Const_buffer_sequence, typename Const_it>
318 static void copy_bytes_from_buf_seq(Const_it* cur_buf_it, size_t* pos_in_buf,
319 size_t to_copy,
320 util::Blob* dest_buf, util::Blob::Iterator dest);
321
322 /**
323 * Helper that copies, to a given raw memory buffer, a given number of bytes to a given
324 * `Mutable_buffer_sequence`, starting at a given offset in a given member of that buffer sequence.
325 * It uses `memcpy()` or similar for copying and makes as few calls to it as possible. It is the opposite of
326 * copy_bytes_from_buf_seq().
327 *
328 * @tparam Mutable_buffer_sequence
329 * See user-facing APIs.
330 * @tparam Const_it
331 * Iterator type suitable for traversing `Mutable_buffer_sequence`.
332 * @param cur_buf_it
333 * See copy_bytes_from_buf_seq().
334 * @param pos_in_buf
335 * See copy_bytes_from_buf_seq().
336 * @param to_copy
337 * See copy_bytes_from_buf_seq().
338 * @param src_buf
339 * Buffer containing `src` location Must be of sufficient size, or behavior is undefined.
340 * @param src
341 * Pointer to the location from where bytes will be copied.
342 */
343 template<typename Mutable_buffer_sequence, typename Const_it>
344 static void copy_bytes_to_buf_seq(Const_it* cur_buf_it, size_t* pos_in_buf,
345 size_t to_copy,
346 const util::Blob& src_buf, util::Blob::Const_iterator src);
347 // Data.
348
349 /**
350 * The max_data_size argument value that the user is predicting to use when calling
351 * consume_buf_move(); or 0 if the user intends to instead use consume_bufs_copy().
352 * This is explored in detail in the class doc header. Basically this should be 0, when this
353 * Socket_buffer is used as a Receive buffer; and it should be set to max-block-size, when this
354 * Socket_buffer is used as a Send buffer.
355 */
356 const size_t m_block_size_hint;
357
358 /**
359 * The data in the buffer. The logical byte sequence, as visible to the outside user of the
360 * class, is obtained as follows: iterate through range [`m_q[0]->begin()`, `m_q[0]->end()`);
361 * iterate through range [`m_q[1]->begin()`, `m_q[1]->end()`); ...; etc. `shared_ptr`s, instead
362 * of raw pointers, are used as elements just to avoid having to delete. Pointers are stored
363 * instead of direct vector objects to avoid any wholesale copying by the `deque` machinery.
364 *
365 * Why not just use a queue of bytes then? The code would be far simpler. Answer: the chief use
366 * case is if Socket_buffer is Send buffer, so they'll set `block_size_hint == N` and use
367 * feed_bufs_copy() whenever Peer_socket::send() is called. Then we will always `reserve()` `N` bytes
368 * in each `Blob` in #m_q; as `feed*()` is called, we will fill up the last `Blob` sequence until
369 * all `N` bytes are exhausted, in which case we'll `push_back()` another `N`-capacity `Blob` sequence
370 * and fill that one as needed, etc. Since `send()` always copies bytes from user's buffer(s)
371 * anyway, we're not adding any extra copying (only the allocation behavior is different than if
372 * we'd used a simple byte buffer instead). Now, when the Node needs to packetize a packet to
373 * send, with `max-block-size == N`, it will use consume_buf_move() directly into the packet data
374 * structure. Since Node will avoid sending unless congestion window allows at least `N` bytes to
375 * be sent, in consume_buf_move() it will use `max_data_size == N`. Therefore, consume_buf_move()
376 * will be constant-time due to performing a constant-time vector swap. Thus exactly one data
377 * copy is performed including the original `send()` call through sending bytes on the wire.
378 */
380
381 /**
382 * The total amount of data stored, in bytes, stored in this object. @see data_size() for more
383 * discussion.
384 *
385 * Invariant: `m_data_size` = sum(`m->size()`: for all `m` in `m_q`).
386 */
388}; // class Socket_buffer
389
390// Free functions: in *_fwd.hpp.
391
392// Template implementations.
393
394template<typename Const_buffer_sequence>
395size_t Socket_buffer::feed_bufs_copy(const Const_buffer_sequence& data, size_t max_data_size)
396{
397 using util::Blob;
398 using boost::asio::buffer_sequence_begin;
399 using boost::asio::buffer_sequence_end;
400 using boost::asio::const_buffer;
401 using boost::asio::buffer_size;
402 using std::min;
403
404 const size_t orig_data_size = m_data_size;
405
406 if (m_block_size_hint == 0)
407 {
408 /* In this mode, we postpone all re-packetizing until -- possibly -- the data are consume*()d.
409 * In other words we just allocate a new buffer for each buffer that comes in and make a
410 * direct copy. They're not even likely to use this method in this mode (probably they'll use
411 * feed_buf_move() with a Receive buffer instead), but we provide this for completeness. For
412 * more info on the use of this mode, see class doc header or feed_buf_move(). */
413 for (auto buf_data_ptr = buffer_sequence_begin(data),
414 buf_data_end_ptr = buffer_sequence_end(data);
415 buf_data_ptr != buf_data_end_ptr; ++buf_data_ptr)
416 {
417 const auto& buf_data = *buf_data_ptr;
418 if (m_data_size >= max_data_size)
419 {
420 return m_data_size - orig_data_size;
421 }
422 // else there is space in our buffer.
423
424 // Copy entire buffer if possible, but don't exceed max_data_size bytes in total.
425 const size_t to_copy = min(buf_data.size(), // Could be zero. This is in BYTES.
426 max_data_size - m_data_size); // Definitely >= 1 (checked above).
427 if (to_copy == 0)
428 {
429 continue; // I guess this buffer was empty... nothing to do for it.
430 }
431 // else source buffer has data to copy.
432
433 // Get the raw data pointer.
434 const auto buf_start = static_cast<Blob::value_type const *>(buf_data.data());
435
436 const Blob_ptr buf_copy(new Blob(get_logger()));
437 // Make a byte blob copy from that raw memory. Performance is highest possible (allocate, copy).
438 buf_copy->assign_copy(const_buffer(buf_start, to_copy));
439 m_q.push_back(buf_copy);
440
441 // Accounting.
442 m_data_size += to_copy;
443
444 FLOW_LOG_TRACE("Socket_buffer/rcv [" << this << "]: data_size [" << m_data_size << "]; "
445 "buf_count [" << m_q.size() << "]; fed buffer "
446 "original/truncated size = [" << buf_data.size() << "/" << to_copy << "].");
447 // Very verbose and CPU-intensive!
448 FLOW_LOG_DATA("Buffer data [" << util::buffers_dump_string(buf_copy->const_buffer(), "", size_t(-1)) << "].");
449 } // for (buf_data in data)
450 }
451 else // if (m_block_size_hint != 0)
452 {
453 /* In this, the more likely, use of this method we are attempting to maintain the following
454 * invariant:
455 * - Each buffer in m_q is allocated to m_block_size_hint bytes and never reallocated
456 * (so one reserve() call).
457 * - Each buffer in m_q is at capacity, except m_q.back().
458 *
459 * If the user cooperates by only consuming via consume_buf_move(..., m_block_size_hint), then
460 * we will be able to perform no copies in those consume_buf_move() calls, as we can always just
461 * swap our byte vector m_q.front() with their empty byte vector (a constant time
462 * operation). This is the likely behavior if *this is used as a Send buffer, as Node will not
463 * consume from *this unless it has at least m_block_size_hint (max-block-size in its terms) of
464 * congestion window space.
465 *
466 * Of course we still have to copy here, but in the Send buffer use case that is an advertised
467 * given: we must copy the buffer from user data structure as opposed to messing with it. */
468
469 if (m_data_size >= max_data_size)
470 {
471 return 0; // Get rid of this pathological case now.
472 }
473 // else some buffer space left.
474 size_t target_space_left = max_data_size - m_data_size;
475 // This will hold how many bytes *this can still still add before exceeding max_data_size.
476
477 size_t src_size_left = buffer_size(data);
478 /* @todo That was a bit inefficient (has to traverse buffer sequence to add up individual sizes). To avoid
479 * it, copy_bytes_from_buf_seq() can be improved at the cost of complexity of its interface.
480 * To put it in perspective, the # of buffers is probably ~0.1% of the number of bytes. */
481 if (src_size_left == 0)
482 {
483 return 0; // Get rid of this pathological case now.
484 }
485 // else: src_size_left will hold how many bytes we still haven't copied from data.
486
487 // Starting source location in memory is first buffer in `data`, byte 0.
488 auto cur_buf_it = buffer_sequence_begin(data);
489 size_t pos_in_buf = 0;
490 do
491 {
492 /* The following invariants must hold at the start of this loop iteration:
493 *
494 * -1- src_size_left != 0 (guaranteed initially above; and by while condition below).
495 * -2- target_space_left != 0 (ditto).
496 * -3- m_q.back().size() < m_block_size_hint (i.e., still space to write into it).
497 *
498 * Let us now guarantee -3-. */
499
500 // Ensure there is some space in the trailing buffer (space up to m_block_size_hint bytes).
501 if (m_q.empty() || (m_q.back()->size() == m_block_size_hint))
502 {
503 // Either the trailing buffer in queue is filled to capacity; or no trailing buffer exists. Make an all-new one.
504 m_q.push_back(Blob_ptr(new Blob(get_logger())));
505
506 // Reserve exactly N bytes of capacity (should be the only allocation for this member).
507 m_q.back()->reserve(m_block_size_hint);
508 }
509 Blob& target_buf = *m_q.back();
510
511 /* Now, decide how many bytes to copy into target_buf. This must be:
512 * - at most target_buf.capacity() - target_buf.size() (maintain exactly m_block_size_hint
513 * bytes allocated in target_buf);
514 * - at most target_space_left (do not exceed m_data_size bytes in *this total);
515 * - at most src_size_left (cannot copy more bytes than available in the source buffer sequence. */
516 const size_t to_copy = min(m_block_size_hint - target_buf.size(), min(target_space_left, src_size_left));
517
518 // Due to invariant mentioned above:
519 assert(to_copy != 0);
520
521 /* Do it. Expand the buffer to receive to_copy bytes. This is maximally performant.
522 *
523 * Then, walk along the buffer sequence, memcpy()-ing (etc.) maximally large chunks of data until we
524 * got to_copy bytes total. cur_buf_it and pos_in_buf keep track of where the next copy (if
525 * any) will resume. */
526 target_buf.resize(target_buf.size() + to_copy);
527 copy_bytes_from_buf_seq<Const_buffer_sequence>(&cur_buf_it, &pos_in_buf, to_copy,
528 &target_buf, target_buf.end() - to_copy);
529
530 target_space_left -= to_copy;
531 src_size_left -= to_copy;
532 m_data_size += to_copy;
533
534 FLOW_LOG_TRACE("Socket_buffer/" << m_block_size_hint << " [" << this << "]: data_size [" << m_data_size << "]; "
535 "buf_count [" << m_q.size() << "]; "
536 "fed/total buffer size = [" << to_copy << '/' << target_buf.size() << "].");
537 // Very verbose and CPU-intensive!
538 FLOW_LOG_DATA("Buffer data post-append: "
539 "[" << util::buffers_dump_string(target_buf.const_buffer(), "", size_t(-1)) << "].");
540 }
541 while ((src_size_left != 0) && (target_space_left != 0));
542 } // else if (m_block_size_hint != 0)
543
544 return m_data_size - orig_data_size;
545} // Socket_buffer::feed_bufs_copy()
546
547template<typename Mutable_buffer_sequence>
548size_t Socket_buffer::consume_bufs_copy(const Mutable_buffer_sequence& target_bufs)
549{
550 using util::Blob;
551 using boost::asio::buffer_sequence_begin;
552 using boost::asio::buffer_size;
553 using boost::asio::const_buffer;
554 using std::min;
555
556 if (m_data_size == 0)
557 {
558 return 0; // Get rid of this pathological case right away.
559 }
560 // else
561
562 const size_t orig_data_size = m_data_size;
563
564 // @todo This is a bit inefficient: O(n) time, where n is the number of buffers in target_bufs.
565 size_t target_size_left = buffer_size(target_bufs);
566
567 if (target_size_left == 0)
568 {
569 return 0; // Another pathological case.
570 }
571 // else
572
573 /* Now go through each buffer in m_q, until either we run out bytes or fill up the target
574 * buffer. For each buffer in m_q, copy it over into target_bufs. */
575 Queue::iterator cur_src_it = m_q.begin();
576 // Starting destination location in memory is first buffer in "target_bufs," byte 0.
577 auto cur_buf_it = buffer_sequence_begin(target_bufs);
578 size_t pos_in_buf = 0;
579 do
580 {
581 Blob& src_bytes = **cur_src_it;
582
583 // Normally copy all of current buffer -- unless that would overflow the target Mutable_buffer_sequence.
584 const size_t to_copy = min(src_bytes.size(), target_size_left);
585
586 /* Do it. Walk along the buffer sequence, memcpy-ing (etc.) maximally large chunks of data into it
587 * until we copy to_copy bytes total. cur_buf_it and pos_in_buf keep track of where the next
588 * copy (if any) will resume. */
589 copy_bytes_to_buf_seq<Mutable_buffer_sequence>
590 (&cur_buf_it, &pos_in_buf, to_copy, src_bytes, src_bytes.const_begin());
591
592 m_data_size -= to_copy;
593 target_size_left -= to_copy;
594
595 FLOW_LOG_TRACE("Socket_buffer [" << this << "]: data_size [" << m_data_size << "]; "
596 "slow-consumed buffer of size/total [" << to_copy << '/' << src_bytes.size() << "].");
597 // Very verbose and CPU-intensive!
598 FLOW_LOG_DATA("Buffer data "
599 "[" << util::buffers_dump_string(const_buffer(src_bytes.const_begin(), to_copy),
600 "", size_t(-1))
601 << "].");
602
603 if (to_copy == src_bytes.size())
604 {
605 // Copied entire source buffer -- move to the next one; this one will be erase()ed outside the loop.
606 ++cur_src_it;
607 }
608 else
609 {
610 /* Was limited by the target buffer sequence size; copied only part of source buffer.
611 * Therefore the loop will not execute again. Erase the bytes that were just consumed, but
612 * leave the others alone. erase() outside the loop will not erase the buffer, as we will not
613 * ++cur_src_it. */
614 src_bytes.erase(src_bytes.begin(), src_bytes.begin() + to_copy);
615 }
616 }
617 while ((target_size_left != 0) && (m_data_size != 0));
618
619 // All buffers in this range were entirely copied, so now they should be deallocated/removed.
620 m_q.erase(m_q.begin(), cur_src_it);
621
622 FLOW_LOG_TRACE("Socket_buffer [" << this << "] consume finished: data_size [" << m_data_size << "]; "
623 "buf_count [" << m_q.size() << "].");
624
625 return orig_data_size - m_data_size;
626} // Socket_buffer::consume_bufs_copy()
627
628template<typename Const_buffer_sequence, typename Const_it>
630 size_t* pos_in_buf, size_t to_copy,
631 util::Blob* dest_buf,
632 util::Blob::Iterator dest) // Static.
633{
634 using util::Blob;
635 using boost::asio::const_buffer;
636 using std::min;
637
638 /* A pre-condition is that *pos_in_buf <= (**cur_buf_it).size(). If it's strictly less, then
639 * we will copy into some position in **cur_buf_it. If it is equal, then we need to go to the
640 * next buffer -- actually the next non-empty buffer. Why do we allow the == case? Because if we
641 * end the copying in an earlier invocation by copying the very last byte in a buffer, then the
642 * final *pos_in_buf will indeed equal .size() of that buffer. So basically, we leave
643 * *pos_in_buf at the value it is after a copy, regardless of whether that's in the middle or end
644 * of the buffer. Then if we *need* to, we will go to the next non-empty buffer. */
645
646 // Keep decreasing to_copy until all copied. A pre-condition is that the buffer sequence has enough bytes total.
647 while (to_copy != 0)
648 {
649 // At this point *pos_in_buf <= .size(). We must copy, so if they're equal, find next non-empty buffer.
650 size_t cur_buf_size;
651 while (*pos_in_buf == (cur_buf_size = (*cur_buf_it)->size()))
652 {
653 ++*cur_buf_it;
654 *pos_in_buf = 0; // If the next buffer is empty, this will equal .size() (0).
655 }
656 // Since we are guaranteed buffer sequence has enough bytes to satisfy to_copy, **cur_buf_it must be a buffer.
657
658 // Destination buffer may be larger than what we still have to copy.
659 const size_t to_copy_in_buf = min(to_copy, cur_buf_size - *pos_in_buf);
660 /* This is the reason for using this function instead of buffer_iterator (which would've been much easier -- but
661 * this is way faster and probably uses memcpy() or similar). */
662 dest = dest_buf->emplace_copy(dest,
663 const_buffer(static_cast<Blob::Const_iterator>((*cur_buf_it)->data()) + *pos_in_buf,
664 to_copy_in_buf));
665
666 to_copy -= to_copy_in_buf;
667 *pos_in_buf += to_copy_in_buf;
668 // Note that if to_copy != 0 right now, then *pos_in_buf points just past end of current buffer.
669 }
670} // Socket_buffer::copy_bytes_from_buf_seq()
671
672template<typename Mutable_buffer_sequence, typename Const_it>
673void Socket_buffer::copy_bytes_to_buf_seq(Const_it* cur_buf_it,
674 size_t* pos_in_buf, size_t to_copy,
675 const util::Blob& src_buf,
676 util::Blob::Const_iterator src) // Static.
677{
678 /* This is almost exactly the same as copy_bytes_from_buf_seq() -- just in the other direction.
679 * Basically memcpy() (or similar) arguments are switched. So I'll keep comments minimal -- see the other
680 * method.
681 * @todo Code reuse somehow. It's possible but may not be worth it given the small volume of code
682 * involved. */
683
684 using util::Blob;
685 using boost::asio::mutable_buffer;
686 using std::min;
687
688 while (to_copy != 0)
689 {
690 size_t cur_buf_size;
691 while (*pos_in_buf == (cur_buf_size = (*cur_buf_it)->size()))
692 {
693 ++*cur_buf_it;
694 *pos_in_buf = 0;
695 }
696
697 const size_t to_copy_in_buf = min(to_copy, cur_buf_size - *pos_in_buf);
698 src = src_buf.sub_copy(src,
699 mutable_buffer(static_cast<Blob::Iterator>((*cur_buf_it)->data()) + *pos_in_buf,
700 to_copy_in_buf));
701
702 to_copy -= to_copy_in_buf;
703 *pos_in_buf += to_copy_in_buf;
704 }
705} // Socket_buffer::copy_bytes_to_buf_seq()
706
707} // namespace flow::net_flow
Convenience class that simply stores a Logger and/or Component passed into a constructor; and returns...
Definition: log.hpp:1612
Logger * get_logger() const
Returns the stored Logger pointer, particularly as many FLOW_LOG_*() macros expect.
Definition: log.cpp:217
Interface that the user should implement, passing the implementing Logger into logging classes (Flow'...
Definition: log.hpp:1284
Internal net_flow class that implements a socket buffer, as used by Peer_socket for Send and Receive ...
size_t m_data_size
The total amount of data stored, in bytes, stored in this object.
boost::shared_ptr< const util::Blob > Blob_const_ptr
Short-hand for byte sequence on the heap. (Using ref-counted pointer purely for convenience....
std::deque< Blob_ptr > Queue
FIFO of byte sequences, together comprising an overall byte sequence.
void clear()
Destroys all stored data.
Socket_buffer(log::Logger *logger_ptr, size_t block_size_hint)
Initialize empty buffer.
static void copy_bytes_to_buf_seq(Const_it *cur_buf_it, size_t *pos_in_buf, size_t to_copy, const util::Blob &src_buf, util::Blob::Const_iterator src)
Helper that copies, to a given raw memory buffer, a given number of bytes to a given Mutable_buffer_s...
size_t feed_buf_move(util::Blob *data, size_t max_data_size)
Feeds (adds to the back of the byte buffer) the byte sequence equal to the given byte sequence *data,...
size_t feed_bufs_copy(const Const_buffer_sequence &data, size_t max_data_size)
Feeds (adds to the back of the byte buffer) the contents of the byte stream composed of the bytes in ...
static void copy_bytes_from_buf_seq(Const_it *cur_buf_it, size_t *pos_in_buf, size_t to_copy, util::Blob *dest_buf, util::Blob::Iterator dest)
Helper that copies, to a given raw memory buffer, a given number of bytes from a given Const_buffer_s...
void consume_buf_move(util::Blob *target_buf, size_t max_data_size)
Consumes (removes from the front of the internal byte buffer and returns them to the caller) a byte s...
Queue m_q
The data in the buffer.
bool empty() const
Returns true if and only if data_size() == 0.
const size_t m_block_size_hint
The max_data_size argument value that the user is predicting to use when calling consume_buf_move(); ...
boost::shared_ptr< util::Blob > Blob_ptr
Short-hand for byte sequence on the heap. (Using ref-counted pointer purely for convenience....
friend std::ostream & operator<<(std::ostream &os, const Socket_buffer &sock_buf)
Prints a printable representation of the data in sock_buf to the given standard ostream.
size_t data_size() const
The total number of bytes of application-layer data stored in this object.
size_t consume_bufs_copy(const Mutable_buffer_sequence &target_bufs)
Consumes (removes from the front of the internal byte buffer and returns them to the caller) a byte s...
#define FLOW_LOG_DATA(ARG_stream_fragment)
Logs a DATA message into flow::log::Logger *get_logger() with flow::log::Component get_log_component(...
Definition: log.hpp:242
#define FLOW_LOG_TRACE(ARG_stream_fragment)
Logs a TRACE message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
Definition: log.hpp:227
Flow module containing the API and implementation of the Flow network protocol, a TCP-inspired stream...
Definition: node.cpp:25
std::string buffers_dump_string(const Const_buffer_sequence &data, const std::string &indentation, size_t bytes_per_line)
Identical to buffers_to_ostream() but returns an std::string instead of writing to a given ostream.
Definition: util.hpp:506
Blob_with_log_context<> Blob
A concrete Blob_with_log_context that compile-time-disables Basic_blob::share() and the sharing API d...
Definition: blob_fwd.hpp:60