Flow 1.0.0
Flow project: Full implementation reference.
socket_buffer.hpp
Go to the documentation of this file.
1/* Flow
2 * Copyright 2023 Akamai Technologies, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the
5 * "License"); you may not use this file except in
6 * compliance with the License. You may obtain a copy
7 * of the License at
8 *
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in
12 * writing, software distributed under the License is
13 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
14 * CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing
16 * permissions and limitations under the License. */
17
18/// @file
19#pragma once
20
22#include "flow/log/log.hpp"
23#include "flow/util/blob.hpp"
24#include "flow/util/util.hpp"
25#include <boost/asio.hpp>
26#include <boost/shared_ptr.hpp>
27#include <vector>
28#include <deque>
29#include <ostream>
30
31namespace flow::net_flow
32{
33
34/**
35 * Internal `net_flow` class that implements a socket buffer, as used by Peer_socket for Send and
36 * Receive buffers. Users do not get direct access to such objects, but instead they access them
37 * indirectly through things like Peer_socket::send() and Peer_socket::receive(). Meanwhile
38 * Node modifies them directly when data are received or sent over the network.
39 *
40 * In terms of the API semantics, the object represents a sequence of bytes. User can enqueue bytes
41 * at the back -- from either a `Const_buffer_sequence` (boost.asio style gather operation) or a
42 * vector of bytes. User can dequeue bytes from the front -- to either a `Mutable_buffer_sequence`
43 * (boost.asio style scatter operation) or a vector of bytes.
44 *
45 * That's all quite simple, but the complexity comes from performance aspects. The goal is to avoid
46 * copying arrays (linear-time) and encourage moving them (constant-time) whenever the user enqueues
47 * or dequeues some bytes. To that end, the class can operate in one of two modes (permanently
48 * specified at construction), tailored to high performance in the two practical use cases of the
49 * class.
50 *
51 * #### Mode 1: `block_size_hint == 0` ####
52 * Intended for use as a Receive buffer. In this case, packetized
53 * blocks of data come from the network and can be of arbitrary sizes (though typically they'll be
54 * max-block-size in length if possible). They are then converted to a logical byte stream, and the
55 * user (via receive()) will dequeue them using a scatter operation. Since the size(s) of the
56 * buffer(s) the user will choose in the scatter receive() are entirely unpredictable, copying of
57 * the bytes during dequeuing is unavoidable. However, copying the packet blocks while enqueuing a
58 * packet upon receipt IS avoidable; since the Node no longer needs the packet data having given it
59 * to the Receive buffer, we can take ownership of the block's data without copying.
60 *
61 * In mode 1, it is expected the class user will call feed_buf_move() for a constant-time
62 * enqueueing; and the class user will call consume_bufs_copy() for a linear-time dequeuing into the
63 * end user's (`receive()` caller's) data structure.
64 *
65 * #### Mode 2: `block_size_hint > 0` ####
66 * Intended for use as a Send buffer. In this case, a stream of bytes
67 * comes from the end user (via `send()`), in the form of a sequence of buffers of arbitrary
68 * length/structure, which is enqueued into this object. Node, when it is ready to send a packet,
69 * dequeues max-block-size bytes (or fewer, if fewer are available in Socket_buffer), packetizes
70 * them, and sends them off. Node will never ask for fewer than max-block-size bytes (in
71 * particular, it will not send anything until congestion window [CWND] has at least that much free
72 * space). Since the size(s) of the enqueued buffers are entirely unpredictable, copying of the
73 * bytes during enqueueing is unavoidable. However, copying the packets while dequeueing a packet
74 * to send IS avoidable. How? During enqueuing, we internally pack bytes into groups of
75 * `block_size_hint` == max-block-size bytes. Then when a packet is dequeued, we can provide a
76 * ready-made front group of bytes with a constant-time operation.
77 *
78 * In mode 2, it is expected the class user will call feed_bufs_copy() for a linear-time enqueueing
79 * from the end user's (`send()` caller's) data structure; and the class user will call
80 * consume_buf_move() for a constant-time dequeuing into a low-level packet.
81 *
82 * This narrow design for performance is the reason for the kind-of-odd asymmetrical set of methods
83 * for feeding and consuming bytes. The two modes differ only in performance, and technically you
84 * may mix and match the recommended calls for the 2 modes after construction. However that defeats
85 * the point of the class, so I wouldn't recommend that.
86 *
87 * ### Thread safety ###
88 * Separate objects: All functionality safe for simultaneous execution from multiple
89 * threads. Same object: Not safe to execute a non-const operation simultaneously with any other
90 * operation; otherwise safe. Rationale: While locking of Socket_buffer objects is essential, typically
91 * they must be locked together with various other external (to Socket_buffer) pieces of data (such
92 * as a given Peer_socket::m_disconnect_cause), and therefore the locking is left to the user of
93 * the class.
94 */
96 public log::Log_context,
97 private boost::noncopyable // Definitely discourage copying. If we make copyable, it'll be conscious design choice.
98{
99public:
100 // Constructors/destructor.
101
102 /**
103 * Initialize empty buffer. The block_size_hint argument is an important parameter that
104 * determines the algorithm used for internally storing the bytes supplied via `feed*()`. While
105 * this has NO effect on the semantics of the output of `consume*()`, it has an effect on internal
106 * performance.
107 *
108 * See Socket_buffer class doc header for explanation of `block_size_hint`.
109 *
110 * @param logger_ptr
111 * Logger to use for subsequent logging.
112 * @param block_size_hint
113 * See above.
114 */
115 explicit Socket_buffer(log::Logger* logger_ptr, size_t block_size_hint);
116
117 // Methods.
118
119 /**
120 * The total number of bytes of application-layer data stored in this object. Intended use case:
121 * estimate of memory usage of this object.
122 *
123 * Note: this only counts the actual user bytes fed into the buffer -- not other internal
124 * book-keeping. For example, if the internal storage is a doubly-linked list of byte arrays,
125 * this will not count the memory used by the previous and next pointers for each node.
126 * Rationale: if we give control over the maximum size to the `net_flow` user, with these semantics
127 * it is easier to express and implement what that limit means (even if some side data are not
128 * counted as a result).
129 *
130 * @return See above.
131 */
132 size_t data_size() const;
133
134 /**
135 * Returns true if and only if data_size() == 0.
136 *
137 * @return Ditto.
138 */
139 bool empty() const;
140
141 /**
142 * Feeds (adds to the back of the byte buffer) the contents of the byte stream composed of the
143 * bytes in the given `Const_buffer_sequence`, in the order in which they are given -- up to an
144 * internal buffer size. The data are copied (O(n)) and not modified.
145 *
146 * As many bytes as possible are copied, subject to the constraint `data_size() <= max_data_size`.
147 * In particular if this is already untrue, no bytes are copied.
148 *
149 * @tparam Const_buffer_sequence
150 * Type that models the boost.asio `ConstBufferSequence` concept (see Boost docs).
151 * Basically, it's any container with elements convertible to `boost::asio::const_buffer`;
152 * and bidirectional iterator support. Examples: `vector<const_buffer>`, `list<const_buffer>`.
153 * Why allow `const_buffer` instead of, say, Sequence of bytes? Same reason as boost.asio's
154 * send functions: it allows a great amount of flexibility without sacrificing performance,
155 * since `boost::asio::buffer()` function can adapt lots of different objects (arrays,
156 * vectors, strings, and more of bytes, integers, and more).
157 * @param data
158 * Bytes will be copied from this buffer sequence until exhausted or data_size() equals
159 * `max_data_size`.
160 * @param max_data_size
161 * See above.
162 * @return Number of bytes (possibly zero) by which data_size() increased.
163 */
164 template<typename Const_buffer_sequence>
165 size_t feed_bufs_copy(const Const_buffer_sequence& data, size_t max_data_size);
166
167 /**
168 * Feeds (adds to the back of the byte buffer) the byte sequence equal to the given byte sequence
169 * `*data`, up to an internal buffer size. Any bytes thus fed are cleared from the given buffer.
170 * Internally, as much as possible while following the described `block_size_hint` constraints and
171 * the `max_data_size` constraint, move semantics are used, attempting to keep time complexity at
172 * @em O(1).
173 *
174 * As many bytes as possible are taken, subject to the constraint `data_size() <= max_data_size`.
175 * In particular if this is already untrue, no bytes are taken. If `block_size_hint == 0`, and
176 * `data_size() + data->size() <= max_data_size`, then time complexity is constant. Otherwise it is
177 * linear in `data->size()`.
178 *
179 * @param data
180 * Bytes will be moved from this byte byffer until exhausted or data_size() equals
181 * `max_data_size`; the bytes thus moved are `erase()`d from `*data`.
182 * Why make this a `vector` of bytes, and not a `const_buffer`? Because this allows us to use
183 * move semantics to avoid a copy.
184 * @param max_data_size
185 * See above.
186 * @return Number of bytes (possibly zero) by which data_size() increased, which equals # of bytes
187 * by which `data->size()` decreased.
188 */
189 size_t feed_buf_move(util::Blob* data, size_t max_data_size);
190
191 /**
192 * Consumes (removes from the front of the internal byte buffer and returns them to the caller) a
193 * byte sequence of length data_size() or the available space in `target_bufs` -- whichever is
194 * smaller. The bytes are removed from the front of the internal byte buffer and are written in
195 * the same order, starting from the first byte in `target_bufs`. The operation involves a copy and
196 * has @em O(n) time complexity.
197 *
198 * @tparam Mutable_buffer_sequence
199 * Type that models the boost.asio `MutableBufferSequence` concept (see Boost docs).
200 * Basically, it's any container with elements convertible to `boost::asio::mutable_buffer`;
201 * and bidirectional iterator support. Examples: `vector<mutable_buffer>`,
202 * l`ist<mutable_buffer>.` Why allow `mutable_buffer` instead of, say, Sequence of bytes?
203 * Same reason as boost.asio's receive functions: it allows a great amount of flexibility
204 * without sacrificing performance, since `boost::asio::buffer()` function can adapt lots of
205 * different objects (arrays, `vector`s, `string`s, and more of bytes, integers, and more).
206 * @param target_bufs
207 * Buffer sequence into which to copy bytes.
208 * @return Number of bytes (possibly zero) by which data_size() decreased, which equals the # of
209 * bytes copied into `target_bufs`. This is quite important -- without it there is
210 * absolutely no way to know how much of `target_bufs` we've used up.
211 */
212 template<typename Mutable_buffer_sequence>
213 size_t consume_bufs_copy(const Mutable_buffer_sequence& target_bufs);
214
215 /**
216 * Consumes (removes from the front of the internal byte buffer and returns them to the caller) a
217 * byte sequence of length data_size() or `max_data_size` -- whichever is smaller. The bytes are
218 * removed from the front of the internal byte buffer and are written in the same order, starting
219 * from the first byte in `target_buf`. `*target_buf` is resized to the number of bytes thus
220 * consumed. If possible based on certain constraints, the operation avoids copying and has @em O(1)
221 * time complexity.
222 *
223 * If `block_size_hint != 0`, and consume_buf_move() is always called with `max_data_size ==
224 * block_size_hint`, then the time complexity is @em O(1) whenever this is called. In other uses cases
225 * this is not guaranteed, and the time complexity is @em O(n).
226 *
227 * @param target_buf
228 * Byte buffer which will be cleared and replaced with the bytes consumed from internal
229 * buffer. Therefore `target_buf->size()` can be used to know how many bytes were consumed.
230 * Performance note: `*target_buf` internal buffer may be reallocated, though this is avoided if reasonably
231 * possible (if it already has enough `capacity()`, it is not reallocated).
232 * @param max_data_size
233 * data_size() will not grow beyond this.
234 */
235 void consume_buf_move(util::Blob* target_buf, size_t max_data_size);
236
237 /// Destroys all stored data.
238 void clear();
239
240private:
241 // Types.
242
243 /// Short-hand for byte sequence on the heap. (Using ref-counted pointer purely for convenience.)
244 using Blob_ptr = boost::shared_ptr<util::Blob>;
245
246 /// Short-hand for byte sequence on the heap. (Using ref-counted pointer purely for convenience.)
247 using Blob_const_ptr = boost::shared_ptr<const util::Blob>;
248
249 /**
250 * FIFO of byte sequences, together comprising an overall byte sequence. Why not just use a
251 * `deque<uint8_t>`? We easily could, and it would make the code far simpler. Answer: this enables
252 * us to reduce or eliminate copying of data in certain likely use cases, chief among them when
253 * using Socket_buffer as the Send buffer. See #m_q.
254 *
255 * Why `deque` instead of `list`? Both offer ~O(1) `push_back()` and `pop_front()`. `deque` is
256 * implemented, in practice, as a set of arrays and a master array with pointers to those arrays.
257 * `list` is a doubly-linked list. `deque` is therefore probably more compact and local in memory.
258 * `list` may perform less reallocating/copying, depending on how the `deque` implementation works.
259 * Using `deque` on a hunch, however. Note, also, that at least gcc 4+ defaults `queue<T>` to
260 * `queue<T, deque>`, so I guess they also find it better on average.
261 *
262 * Using container instead of adapter `queue` due to need to iterate over the whole thing in some
263 * situations.
264 *
265 * @todo Investigate pros and cons of deque vs. list empirically.
266 */
267 using Queue = std::deque<Blob_ptr>;
268
269 // Friends.
270
271 // Friend of Socket_buffer: For access to our internals.
272 friend std::ostream& operator<<(std::ostream& os, const Socket_buffer& sock_buf);
273
274 // Methods.
275
276 /**
277 * Helper that copies, to a given raw memory buffer, a given number of bytes from a given
278 * `Const_buffer_sequence`, starting at a given offset in a given member of that buffer sequence.
279 * It uses `memcpy()` or similar for copying and makes as few calls to it as possible. It is similar to the
280 * `boost::asio::buffer_copy(ConstBufferSequence -> buffer)` function but also returns the logical
281 * pointers just past the last byte copied, so that this can be resumed into some other target
282 * buffer. Of course there's a `buffer_copy(ConstBufferSequence -> MutableBufferSequence)` that
283 * would do this, but it would be inefficient for us to have to create a `MutableBufferSequence`.
284 *
285 * @todo It would not be crazy to move copy_bytes_from_buf_seq() and copy_bytes_to_buf_seq() into util::Blob to make
286 * it more widely available, though as of this writing there is no demand for this: perhaps a buffer-sequence
287 * version of `util::Blob::emplace_copy()` and in the other direction. In that case `util::Blob::assign_copy()`
288 * and all single-buffer util::Blob methods should probably be similarly generalized (overloaded).
289 *
290 * @tparam Const_buffer_sequence
291 * See user-facing APIs.
292 * @param cur_buf_it
293 * Pointer to iterator to the buffer in the buffer sequence from where to begin copying.
294 * When function returns, `*cur_buf_it` will point to the buffer containing the last byte
295 * that was copied. This can be passed to this method again to resume copying where we
296 * left off.
297 * @param pos_in_buf
298 * Pointer to offset, in bytes, from the start of `**cur_buf_it` from where to resume
299 * copying. This must be `<= (*cur_buf_it)->size()`. If they are equal (i.e., it points
300 * just past the last byte of `**cur_buf_it`), we will properly copy from the start of the
301 * next buffer. (This can happen if the last call to the same method copied exactly
302 * through the end of a buffer in the buffer sequence, and then you call this method again
303 * with the resulting `*cur_buf_it` and `*pos_in_buf`.)
304 * @param to_copy
305 * The number of bytes to copy. The remainder of the buffer sequence (from
306 * `cur_buf_it`/`pos_in_buf` position) MUST contain at least that many bytes. This means you
307 * MUST determine the total byte size of the overall buffer sequence before calling this
308 * function.
309 * @param dest_buf
310 * (Pointer to) buffer containing `dest` location; data will be written into it. Must be of
311 * sufficient size, or behavior is undefined.
312 * @param dest
313 * Pointer to the location where bytes will be copied.
314 */
315 template<typename Const_buffer_sequence>
316 static void copy_bytes_from_buf_seq(typename Const_buffer_sequence::const_iterator* cur_buf_it, size_t* pos_in_buf,
317 size_t to_copy,
318 util::Blob* dest_buf, util::Blob::Iterator dest);
319
320 /**
321 * Helper that copies, to a given raw memory buffer, a given number of bytes to a given
322 * `Mutable_buffer_sequence`, starting at a given offset in a given member of that buffer sequence.
323 * It uses `memcpy()` or similar for copying and makes as few calls to it as possible. It is the opposite of
324 * copy_bytes_from_buf_seq().
325 *
326 * @tparam Mutable_buffer_sequence
327 * See user-facing APIs.
328 * @param cur_buf_it
329 * See copy_bytes_from_buf_seq().
330 * @param pos_in_buf
331 * See copy_bytes_from_buf_seq().
332 * @param to_copy
333 * See copy_bytes_from_buf_seq().
334 * @param src_buf
335 * Buffer containing `src` location Must be of sufficient size, or behavior is undefined.
336 * @param src
337 * Pointer to the location from where bytes will be copied.
338 */
339 template<typename Mutable_buffer_sequence>
340 static void copy_bytes_to_buf_seq(typename Mutable_buffer_sequence::const_iterator* cur_buf_it, size_t* pos_in_buf,
341 size_t to_copy,
342 const util::Blob& src_buf, util::Blob::Const_iterator src);
343 // Data.
344
345 /**
346 * The max_data_size argument value that the user is predicting to use when calling
347 * consume_buf_move(); or 0 if the user intends to instead use consume_bufs_copy().
348 * This is explored in detail in the class doc header. Basically this should be 0, when this
349 * Socket_buffer is used as a Receive buffer; and it should be set to max-block-size, when this
350 * Socket_buffer is used as a Send buffer.
351 */
352 const size_t m_block_size_hint;
353
354 /**
355 * The data in the buffer. The logical byte sequence, as visible to the outside user of the
356 * class, is obtained as follows: iterate through range [`m_q[0]->begin()`, `m_q[0]->end()`);
357 * iterate through range [`m_q[1]->begin()`, `m_q[1]->end()`); ...; etc. `shared_ptr`s, instead
358 * of raw pointers, are used as elements just to avoid having to delete. Pointers are stored
359 * instead of direct vector objects to avoid any wholesale copying by the `deque` machinery.
360 *
361 * Why not just use a queue of bytes then? The code would be far simpler. Answer: the chief use
362 * case is if Socket_buffer is Send buffer, so they'll set `block_size_hint == N` and use
363 * feed_bufs_copy() whenever Peer_socket::send() is called. Then we will always `reserve()` `N` bytes
364 * in each `Blob` in #m_q; as `feed*()` is called, we will fill up the last `Blob` sequence until
365 * all `N` bytes are exhausted, in which case we'll `push_back()` another `N`-capacity `Blob` sequence
366 * and fill that one as needed, etc. Since `send()` always copies bytes from user's buffer(s)
367 * anyway, we're not adding any extra copying (only the allocation behavior is different than if
368 * we'd used a simple byte buffer instead). Now, when the Node needs to packetize a packet to
369 * send, with `max-block-size == N`, it will use consume_buf_move() directly into the packet data
370 * structure. Since Node will avoid sending unless congestion window allows at least `N` bytes to
371 * be sent, in consume_buf_move() it will use `max_data_size == N`. Therefore, consume_buf_move()
372 * will be constant-time due to performing a constant-time vector swap. Thus exactly one data
373 * copy is performed including the original `send()` call through sending bytes on the wire.
374 */
376
377 /**
378 * The total amount of data stored, in bytes, stored in this object. @see data_size() for more
379 * discussion.
380 *
381 * Invariant: `m_data_size` = sum(`m->size()`: for all `m` in `m_q`).
382 */
384}; // class Socket_buffer
385
386// Free functions: in *_fwd.hpp.
387
388// Template implementations.
389
390template<typename Const_buffer_sequence>
391size_t Socket_buffer::feed_bufs_copy(const Const_buffer_sequence& data, size_t max_data_size)
392{
393 using util::Blob;
394 using boost::asio::const_buffer;
395 using boost::asio::buffers_iterator;
396 using boost::asio::buffer_size;
397 using std::min;
398
399 const size_t orig_data_size = m_data_size;
400
401 if (m_block_size_hint == 0)
402 {
403 /* In this mode, we postpone all re-packetizing until -- possibly -- the data are consume*()d.
404 * In other words we just allocate a new buffer for each buffer that comes in and make a
405 * direct copy. They're not even likely to use this method in this mode (probably they'll use
406 * feed_buf_move() with a Receive buffer instead), but we provide this for completeness. For
407 * more info on the use of this mode, see class doc header or feed_buf_move(). */
408 for (const auto& buf_data : data)
409 {
410 if (m_data_size >= max_data_size)
411 {
412 return m_data_size - orig_data_size;
413 }
414 // else there is space in our buffer.
415
416 // Does NOT copy actual buffer data -- just the memory location/size. (Convertible_to_const_buffer requirement.)
417 const const_buffer buf(buf_data);
418
419 // Copy entire buffer if possible, but don't exceed max_data_size bytes in total.
420 const size_t to_copy = min(buf.size(), // Could be zero. This is in BYTES.
421 max_data_size - m_data_size); // Definitely >= 1 (checked above).
422 if (to_copy == 0)
423 {
424 continue; // I guess this buffer was empty... nothing to do for it.
425 }
426 // else source buffer has data to copy.
427
428 // Get the raw data pointer.
429 const auto buf_start = static_cast<Blob::value_type const *>(buf.data());
430
431 const Blob_ptr buf_copy(new Blob(get_logger()));
432 // Make a byte blob copy from that raw memory. Performance is highest possible (allocate, copy).
433 buf_copy->assign_copy(const_buffer(buf_start, to_copy));
434 m_q.push_back(buf_copy);
435
436 // Accounting.
437 m_data_size += to_copy;
438
439 FLOW_LOG_TRACE("Socket_buffer/rcv [" << this << "]: data_size [" << m_data_size << "]; "
440 "buf_count [" << m_q.size() << "]; fed buffer "
441 "original/truncated size = [" << buf.size() << "/" << to_copy << "].");
442 // Very verbose and CPU-intensive!
443 FLOW_LOG_DATA("Buffer data [" << util::buffers_dump_string(buf_copy->const_buffer(), "", size_t(-1)) << "].");
444 } // for (buf_data in data)
445 }
446 else // if (m_block_size_hint != 0)
447 {
448 /* In this, the more likely, use of this method we are attempting to maintain the following
449 * invariant:
450 * - Each buffer in m_q is allocated to m_block_size_hint bytes and never reallocated
451 * (so one reserve() call).
452 * - Each buffer in m_q is at capacity, except m_q.back().
453 *
454 * If the user cooperates by only consuming via consume_buf_move(..., m_block_size_hint), then
455 * we will be able to perform no copies in those consume_buf_move() calls, as we can always just
456 * swap our byte vector m_q.front() with their empty byte vector (a constant time
457 * operation). This is the likely behavior if *this is used as a Send buffer, as Node will not
458 * consume from *this unless it has at least m_block_size_hint (max-block-size in its terms) of
459 * congestion window space.
460 *
461 * Of course we still have to copy here, but in the Send buffer use case that is an advertised
462 * given: we must copy the buffer from user data structure as opposed to messing with it. */
463
464 if (m_data_size >= max_data_size)
465 {
466 return 0; // Get rid of this pathological case now.
467 }
468 // else some buffer space left.
469 size_t target_space_left = max_data_size - m_data_size;
470 // This will hold how many bytes *this can still still add before exceeding max_data_size.
471
472 size_t src_size_left = buffer_size(data);
473 /* @todo That was a bit inefficient (has to traverse buffer sequence to add up individual sizes). To avoid
474 * it, copy_bytes_from_buf_seq() can be improved at the cost of complexity of its interface.
475 * To put it in perspective, the # of buffers is probably ~0.1% of the number of bytes. */
476 if (src_size_left == 0)
477 {
478 return 0; // Get rid of this pathological case now.
479 }
480 // else: src_size_left will hold how many bytes we still haven't copied from data.
481
482 // Starting source location in memory is first buffer in `data`, byte 0.
483 typename Const_buffer_sequence::const_iterator cur_buf_it = data.begin();
484 size_t pos_in_buf = 0;
485 do
486 {
487 /* The following invariants must hold at the start of this loop iteration:
488 *
489 * -1- src_size_left != 0 (guaranteed initially above; and by while condition below).
490 * -2- target_space_left != 0 (ditto).
491 * -3- m_q.back().size() < m_block_size_hint (i.e., still space to write into it).
492 *
493 * Let us now guarantee -3-. */
494
495 // Ensure there is some space in the trailing buffer (space up to m_block_size_hint bytes).
496 if (m_q.empty() || (m_q.back()->size() == m_block_size_hint))
497 {
498 // Either the trailing buffer in queue is filled to capacity; or no trailing buffer exists. Make an all-new one.
499 m_q.push_back(Blob_ptr(new Blob(get_logger())));
500
501 // Reserve exactly N bytes of capacity (should be the only allocation for this member).
502 m_q.back()->reserve(m_block_size_hint);
503 }
504 Blob& target_buf = *m_q.back();
505
506 /* Now, decide how many bytes to copy into target_buf. This must be:
507 * - at most target_buf.capacity() - target_buf.size() (maintain exactly m_block_size_hint
508 * bytes allocated in target_buf);
509 * - at most target_space_left (do not exceed m_data_size bytes in *this total);
510 * - at most src_size_left (cannot copy more bytes than available in the source buffer sequence. */
511 const size_t to_copy = min(m_block_size_hint - target_buf.size(), min(target_space_left, src_size_left));
512
513 // Due to invariant mentioned above:
514 assert(to_copy != 0);
515
516 /* Do it. Expand the buffer to receive to_copy bytes. This is maximally performant.
517 *
518 * Then, walk along the buffer sequence, memcpy()-ing (etc.) maximally large chunks of data until we
519 * got to_copy bytes total. cur_buf_it and pos_in_buf keep track of where the next copy (if
520 * any) will resume. */
521 target_buf.resize(target_buf.size() + to_copy);
522 copy_bytes_from_buf_seq<Const_buffer_sequence>(&cur_buf_it, &pos_in_buf, to_copy,
523 &target_buf, target_buf.end() - to_copy);
524
525 target_space_left -= to_copy;
526 src_size_left -= to_copy;
527 m_data_size += to_copy;
528
529 FLOW_LOG_TRACE("Socket_buffer/" << m_block_size_hint << " [" << this << "]: data_size [" << m_data_size << "]; "
530 "buf_count [" << m_q.size() << "]; "
531 "fed/total buffer size = [" << to_copy << '/' << target_buf.size() << "].");
532 // Very verbose and CPU-intensive!
533 FLOW_LOG_DATA("Buffer data post-append: "
534 "[" << util::buffers_dump_string(target_buf.const_buffer(), "", size_t(-1)) << "].");
535 }
536 while ((src_size_left != 0) && (target_space_left != 0));
537 } // else if (m_block_size_hint != 0)
538
539 return m_data_size - orig_data_size;
540} // Socket_buffer::feed_bufs_copy()
541
542template<typename Mutable_buffer_sequence>
543size_t Socket_buffer::consume_bufs_copy(const Mutable_buffer_sequence& target_bufs)
544{
545 using util::Blob;
546 using boost::asio::buffer_size;
547 using boost::asio::buffers_iterator;
548 using boost::asio::const_buffer;
549 using std::min;
550
551 if (m_data_size == 0)
552 {
553 return 0; // Get rid of this pathological case right away.
554 }
555 // else
556
557 const size_t orig_data_size = m_data_size;
558
559 // @todo This is a bit inefficient: O(n) time, where n is the number of buffers in target_bufs.
560 size_t target_size_left = buffer_size(target_bufs);
561
562 if (target_size_left == 0)
563 {
564 return 0; // Another pathological case.
565 }
566 // else
567
568 /* Now go through each buffer in m_q, until either we run out bytes or fill up the target
569 * buffer. For each buffer in m_q, copy it over into target_bufs. */
570 Queue::iterator cur_src_it = m_q.begin();
571 // Starting destination location in memory is first buffer in "target_bufs," byte 0.
572 typename Mutable_buffer_sequence::const_iterator cur_buf_it = target_bufs.begin();
573 size_t pos_in_buf = 0;
574 do
575 {
576 Blob& src_bytes = **cur_src_it;
577
578 // Normally copy all of current buffer -- unless that would overflow the target Mutable_buffer_sequence.
579 const size_t to_copy = min(src_bytes.size(), target_size_left);
580
581 /* Do it. Walk along the buffer sequence, memcpy-ing (etc.) maximally large chunks of data into it
582 * until we copy to_copy bytes total. cur_buf_it and pos_in_buf keep track of where the next
583 * copy (if any) will resume. */
584 copy_bytes_to_buf_seq<Mutable_buffer_sequence>
585 (&cur_buf_it, &pos_in_buf, to_copy, src_bytes, src_bytes.const_begin());
586
587 m_data_size -= to_copy;
588 target_size_left -= to_copy;
589
590 FLOW_LOG_TRACE("Socket_buffer [" << this << "]: data_size [" << m_data_size << "]; "
591 "slow-consumed buffer of size/total [" << to_copy << '/' << src_bytes.size() << "].");
592 // Very verbose and CPU-intensive!
593 FLOW_LOG_DATA("Buffer data "
594 "[" << util::buffers_dump_string(const_buffer(src_bytes.const_begin(), to_copy),
595 "", size_t(-1))
596 << "].");
597
598 if (to_copy == src_bytes.size())
599 {
600 // Copied entire source buffer -- move to the next one; this one will be erase()ed outside the loop.
601 ++cur_src_it;
602 }
603 else
604 {
605 /* Was limited by the target buffer sequence size; copied only part of source buffer.
606 * Therefore the loop will not execute again. Erase the bytes that were just consumed, but
607 * leave the others alone. erase() outside the loop will not erase the buffer, as we will not
608 * ++cur_src_it. */
609 src_bytes.erase(src_bytes.begin(), src_bytes.begin() + to_copy);
610 }
611 }
612 while ((target_size_left != 0) && (m_data_size != 0));
613
614 // All buffers in this range were entirely copied, so now they should be deallocated/removed.
615 m_q.erase(m_q.begin(), cur_src_it);
616
617 FLOW_LOG_TRACE("Socket_buffer [" << this << "] consume finished: data_size [" << m_data_size << "]; "
618 "buf_count [" << m_q.size() << "].");
619
620 return orig_data_size - m_data_size;
621} // Socket_buffer::consume_bufs_copy()
622
623template<typename Const_buffer_sequence>
624void Socket_buffer::copy_bytes_from_buf_seq(typename Const_buffer_sequence::const_iterator* cur_buf_it,
625 size_t* pos_in_buf, size_t to_copy,
626 util::Blob* dest_buf,
627 util::Blob::Iterator dest) // Static.
628{
629 using util::Blob;
630 using boost::asio::const_buffer;
631 using std::min;
632
633 /* A pre-condition is that *pos_in_buf <= (**cur_buf_it).size(). If it's strictly less, then
634 * we will copy into some position in **cur_buf_it. If it is equal, then we need to go to the
635 * next buffer -- actually the next non-empty buffer. Why do we allow the == case? Because if we
636 * end the copying in an earlier invocation by copying the very last byte in a buffer, then the
637 * final *pos_in_buf will indeed equal .size() of that buffer. So basically, we leave
638 * *pos_in_buf at the value it is after a copy, regardless of whether that's in the middle or end
639 * of the buffer. Then if we *need* to, we will go to the next non-empty buffer. */
640
641 // Keep decreasing to_copy until all copied. A pre-condition is that the buffer sequence has enough bytes total.
642 while (to_copy != 0)
643 {
644 // At this point *pos_in_buf <= .size(). We must copy, so if they're equal, find next non-empty buffer.
645 size_t cur_buf_size;
646 while (*pos_in_buf == (cur_buf_size = (*cur_buf_it)->size()))
647 {
648 ++*cur_buf_it;
649 *pos_in_buf = 0; // If the next buffer is empty, this will equal .size() (0).
650 }
651 // Since we are guaranteed buffer sequence has enough bytes to satisfy to_copy, **cur_buf_it must be a buffer.
652
653 // Destination buffer may be larger than what we still have to copy.
654 const size_t to_copy_in_buf = min(to_copy, cur_buf_size - *pos_in_buf);
655 /* This is the reason for using this function instead of buffer_iterator (which would've been much easier -- but
656 * this is way faster and probably uses memcpy() or similar). */
657 dest = dest_buf->emplace_copy(dest,
658 const_buffer(static_cast<Blob::Const_iterator>((*cur_buf_it)->data()) + *pos_in_buf,
659 to_copy_in_buf));
660
661 to_copy -= to_copy_in_buf;
662 *pos_in_buf += to_copy_in_buf;
663 // Note that if to_copy != 0 right now, then *pos_in_buf points just past end of current buffer.
664 }
665} // Socket_buffer::copy_bytes_from_buf_seq()
666
667template<typename Mutable_buffer_sequence>
668void Socket_buffer::copy_bytes_to_buf_seq(typename Mutable_buffer_sequence::const_iterator* cur_buf_it,
669 size_t* pos_in_buf, size_t to_copy,
670 const util::Blob& src_buf,
671 util::Blob::Const_iterator src) // Static.
672{
673 /* This is almost exactly the same as copy_bytes_from_buf_seq() -- just in the other direction.
674 * Basically memcpy() (or similar) arguments are switched. So I'll keep comments minimal -- see the other
675 * method.
676 * @todo Code reuse somehow. It's possible but may not be worth it given the small volume of code
677 * involved. */
678
679 using util::Blob;
680 using boost::asio::mutable_buffer;
681 using std::min;
682
683 while (to_copy != 0)
684 {
685 size_t cur_buf_size;
686 while (*pos_in_buf == (cur_buf_size = (*cur_buf_it)->size()))
687 {
688 ++*cur_buf_it;
689 *pos_in_buf = 0;
690 }
691
692 const size_t to_copy_in_buf = min(to_copy, cur_buf_size - *pos_in_buf);
693 src = src_buf.sub_copy(src,
694 mutable_buffer(static_cast<Blob::Iterator>((*cur_buf_it)->data()) + *pos_in_buf,
695 to_copy_in_buf));
696
697 to_copy -= to_copy_in_buf;
698 *pos_in_buf += to_copy_in_buf;
699 }
700} // Socket_buffer::copy_bytes_to_buf_seq()
701
702} // namespace flow::net_flow
Convenience class that simply stores a Logger and/or Component passed into a constructor; and returns...
Definition: log.hpp:1619
Logger * get_logger() const
Returns the stored Logger pointer, particularly as many FLOW_LOG_*() macros expect.
Definition: log.cpp:224
Interface that the user should implement, passing the implementing Logger into logging classes (Flow'...
Definition: log.hpp:1291
Internal net_flow class that implements a socket buffer, as used by Peer_socket for Send and Receive ...
size_t m_data_size
The total amount of data stored, in bytes, stored in this object.
boost::shared_ptr< const util::Blob > Blob_const_ptr
Short-hand for byte sequence on the heap. (Using ref-counted pointer purely for convenience....
std::deque< Blob_ptr > Queue
FIFO of byte sequences, together comprising an overall byte sequence.
void clear()
Destroys all stored data.
Socket_buffer(log::Logger *logger_ptr, size_t block_size_hint)
Initialize empty buffer.
static void copy_bytes_from_buf_seq(typename Const_buffer_sequence::const_iterator *cur_buf_it, size_t *pos_in_buf, size_t to_copy, util::Blob *dest_buf, util::Blob::Iterator dest)
Helper that copies, to a given raw memory buffer, a given number of bytes from a given Const_buffer_s...
size_t feed_buf_move(util::Blob *data, size_t max_data_size)
Feeds (adds to the back of the byte buffer) the byte sequence equal to the given byte sequence *data,...
size_t feed_bufs_copy(const Const_buffer_sequence &data, size_t max_data_size)
Feeds (adds to the back of the byte buffer) the contents of the byte stream composed of the bytes in ...
void consume_buf_move(util::Blob *target_buf, size_t max_data_size)
Consumes (removes from the front of the internal byte buffer and returns them to the caller) a byte s...
static void copy_bytes_to_buf_seq(typename Mutable_buffer_sequence::const_iterator *cur_buf_it, size_t *pos_in_buf, size_t to_copy, const util::Blob &src_buf, util::Blob::Const_iterator src)
Helper that copies, to a given raw memory buffer, a given number of bytes to a given Mutable_buffer_s...
Queue m_q
The data in the buffer.
bool empty() const
Returns true if and only if data_size() == 0.
const size_t m_block_size_hint
The max_data_size argument value that the user is predicting to use when calling consume_buf_move(); ...
boost::shared_ptr< util::Blob > Blob_ptr
Short-hand for byte sequence on the heap. (Using ref-counted pointer purely for convenience....
friend std::ostream & operator<<(std::ostream &os, const Socket_buffer &sock_buf)
Prints a printable representation of the data in sock_buf to the given standard ostream.
size_t data_size() const
The total number of bytes of application-layer data stored in this object.
size_t consume_bufs_copy(const Mutable_buffer_sequence &target_bufs)
Consumes (removes from the front of the internal byte buffer and returns them to the caller) a byte s...
#define FLOW_LOG_DATA(ARG_stream_fragment)
Logs a DATA message into flow::log::Logger *get_logger() with flow::log::Component get_log_component(...
Definition: log.hpp:242
#define FLOW_LOG_TRACE(ARG_stream_fragment)
Logs a TRACE message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
Definition: log.hpp:227
Flow module containing the API and implementation of the Flow network protocol, a TCP-inspired stream...
Definition: node.cpp:25
std::string buffers_dump_string(const Const_buffer_sequence &data, const std::string &indentation, size_t bytes_per_line)
Identical to buffers_to_ostream() but returns an std::string instead of writing to a given ostream.
Definition: util.hpp:481
Blob_with_log_context<> Blob
A concrete Blob_with_log_context that compile-time-disables Basic_blob::share() and the sharing API d...
Definition: blob_fwd.hpp:60