Flow 2.0.0
Flow project: Full implementation reference.
low_lvl_io.cpp
Go to the documentation of this file.
1/* Flow
2 * Copyright 2023 Akamai Technologies, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the
5 * "License"); you may not use this file except in
6 * compliance with the License. You may obtain a copy
7 * of the License at
8 *
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in
12 * writing, software distributed under the License is
13 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
14 * CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing
16 * permissions and limitations under the License. */
17
18/// @file
24#include <utility>
25
26namespace flow::net_flow
27{
28
29// Implementations.
30
32{
33 // We are in thread W.
34 m_low_lvl_sock.async_wait(Udp_socket::wait_read, [this](const Error_code& sys_err_code)
35 {
36 low_lvl_recv_and_handle(sys_err_code);
37 });
38}
39
41{
42 using util::Blob;
43 using boost::asio::buffer;
44
45 // We are in thread W.
46
47 // Number of packets received and thus handled by handle_incoming(). Useful at least for a log message at the end.
48 unsigned int handled_packet_count = 0;
49 /* Number of packets received. (The number handled by handle_incoming() may be lower or higher
50 * if simulator is in use. */
51 unsigned int recvd_packet_count = 0;
52
53 /* The limit on the # of received packets to handle in this handler. 0 means unlimited. For
54 * reasoning as to why we'd possibly want to limit it, see doc header for this option. */
55 unsigned int recvd_packet_count_limit = opt(m_opts.m_dyn_max_packets_per_main_loop_iteration);
56
57 if (!sys_err_code)
58 {
59 /* boost.asio has reason to believe there's at least one UDP datagram ready to read from the
60 * UDP net-stack's buffer. We'll read that one. We'll also keep reading them until UDP net-stack
61 * says it "would block" (i.e., no more datagrams have been received). Why not simply call
62 * async_low_lvl_recv() and let boost.asio deal with it and call us again? Consider a user
63 * waiting for Readable for a given Peer_socket. If did that, we will
64 * read one DATA packet, load it on the Receive buffer, and signal the waiting user. Thus they
65 * may immediately read from the Receive buffer and move onto the rest of their event loop
66 * iteration. However, there may be 50 more packets we're then going to immediately put on the
67 * Receive buffer in thread W. It would be arguably more efficient to read all 51 and THEN
68 * signal the user. Therefore we use Node::m_sock_events to accumulate active events
69 * and then read all available datagrams; only after that do we then signal Event_sets for
70 * accumulated events.
71 *
72 * @todo One can conceive of some pathological case where due to extreme traffic we'd keep
73 * reading more and more datagrams and not get out of the loop for a long time. Perhaps add a
74 * knob for the maximum number of iterations to go through before ending the loop and
75 * signaling about the accumulated events. */
76
77 // Note: m_packet_data is a member that is reused repeatedly for some performance savings.
78 auto& packet_data = m_packet_data;
79 /* Don't let this dynamic option's value's change to affect this main loop iteration's value.
80 * This way packet_data.capacity() will essentially remain constant which is easier to reason about than
81 * the alternative. Soon enough this method will exit, and any new value will take effect next time. */
82 const size_t max_packet_size = opt(m_opts.m_dyn_low_lvl_max_packet_size);
83
84 util::Udp_endpoint low_lvl_remote_endpoint;
85
86 // Read until error or "would block" (no data available).
87 size_t packet_size = 0;
88 do
89 {
90 /* Make buffer big enough for any packet we'd accept as valid.
91 * resize(MAX) will only work if packet_data.zero() (the case in first iteration) or if packet_data.capacity()
92 * >= MAX. Hence, if a loop iteration below has left packet_data with unsatisfactory capacity below MAX,
93 * then ensure zero() is true before the resize(MAX) call. In other words, reallocate packet_data but only
94 * if necessary. */
95 if ((!packet_data.zero()) && (packet_data.capacity() < max_packet_size))
96 {
97 packet_data.make_zero(); // This must be done explicitly: acknowledges we force reallocation in next line.
98 }
99 packet_data.resize(max_packet_size);
100 // ^-- Could use UDP available(), but its meaning is slightly ambiguous, + I don't care for the tiny race it adds.
101
102 // Read packet from UDP net-stack internal buffer into "packet_data."
103 packet_size = m_low_lvl_sock.receive_from(buffer(packet_data.data(), packet_data.size()),
104 low_lvl_remote_endpoint, 0, sys_err_code);
105 if (!sys_err_code)
106 {
107 assert(packet_size <= packet_data.size());
108 packet_data.resize(packet_size); // boost.asio NEVER resizes vector to the # of bytes it read.
109
110 FLOW_LOG_TRACE("Received low-level packet at [UDP " << m_low_lvl_sock.local_endpoint() << "] from "
111 "[UDP " << low_lvl_remote_endpoint << "]:");
112
113 // Count it against the limit.
114 ++recvd_packet_count;
115
116 handled_packet_count += handle_incoming_with_simulation(&packet_data, low_lvl_remote_endpoint);
117 /* packet_data is still valid and owned by us but may have any structure at all.
118 * As of this writing, in practice -- assuming no simulated delay -- packet_data would be untouched
119 * (and therefore could be reused sans reallocation for the next packet read in above, if any) for all
120 * packet types except DATA, which is moved elsewhere via std::move() semantics and would require
121 * reallocation above. */
122 }
123 else if (sys_err_code != boost::asio::error::would_block) // "Would block" is normal (no data available).
124 {
125 /* Weird it failed, since it's UDP. Oh well.
126 * Note: One might be tempted to do something dire like close some sockets. Note that in the current setup
127 * we wouldn't know which socket(s) to close, since several live on the same low-level (UDP) port. Even if
128 * we did, the failure to read could mean many things, so individual err_codes would need to be examined
129 * with various actions taken depending on what happened. (@todo Consider it.) Without that, just ignore it
130 * as we would any lost packet. Obviously, we deal with UDP's unreliability already.
131 *
132 * What about the would_block and try_again (EAGAIN/EWOULDBLOCK in POSIX world and similar for Windows)
133 * error codes which often demand special handling? Well, firstly, we are likely mostly not hitting that
134 * situation, as async_receive() (the call for which this is the handler function) specifically attempts
135 * to avoid those error codes by only executing the handler once the socket is Readable.
136 * Still, it's probably not impossible: we used async_wait() as of this writing, which means the actual
137 * receiving is done in this handler, not by boost.asio (and even if it was, we would still try more receives
138 * until no more are available). Between detection of Readable by boost.asio and the actual receive call,
139 * the situation may have changed. Well, fine. What's there to do? would_block/try_again means
140 * not Readable right now... try later. Great! We do just that in any case below by executing
141 * async_receive() again (inside the call at the end of this method). So this read failed due to some
142 * odd change in conditions; best we can do is wait for Readable and try again later. And we do. */
144 assert(packet_size == 0); // Should be the case on error.
145 }
146 else
147 {
148 assert(packet_size == 0);
149 }
150 }
151 // while (still getting data && (haven't exceeded per-handler limit || there is no limit))
152 while ((packet_size != 0)
153 && ((recvd_packet_count_limit == 0) || (recvd_packet_count < recvd_packet_count_limit)));
154
155 /* Have read all available low-level data. This may have accumulated certain tasks, like
156 * combining pending individual acknowledgments into larger ACK packet(s), to be performed at
157 * the end of the handler. Do so now. */
159
160 // Helpful in understanding the relationship between handler invocations and "delta" Event_set checks.
161 FLOW_LOG_TRACE("Handled a total of [" << handled_packet_count << "] incoming packets "
162 "out of [" << recvd_packet_count << "] received (limit [" << recvd_packet_count_limit << "]) "
163 "in this boost.asio handler.");
164 } // if (!err_code)
165 else
166 {
167 // boost.asio called us with an error. Strange, since this is UDP, but treat it same as a read error above.
169 }
170
171 /* If socket errors are detected above, since it's UDP and connectionless, there is hope we're
172 * still OK, so still set up wait at the end of this callback. Do not stop event loop.
173 *
174 * @todo This isn't necessarily sound. Could investigate the possible errors and act
175 * accordingly. */
176
177 // Register ourselves for the next UDP receive.
179} // Node::low_lvl_recv_and_handle()
180
182 const util::Udp_endpoint& low_lvl_remote_endpoint,
183 bool is_sim_duplicate_packet)
184{
185 using util::Blob;
186 using boost::chrono::milliseconds; // Just for the output streaming.
187 using boost::chrono::round;
188
189 // We are in thread W.
190
191 unsigned int handled = 0; // How many times we called handle_incoming() inside this invocation.
192
193 // Basically we should call handle_incoming(), but we may first have to simulate various network conditions.
194
195 if (is_sim_duplicate_packet || (!m_net_env_sim) || (!m_net_env_sim->should_drop_received_packet()))
196 {
197 // See below.
198 const bool must_dupe
199 = (!is_sim_duplicate_packet) && m_net_env_sim && m_net_env_sim->should_duplicate_received_packet();
200
201 Blob packet_data_copy{get_logger()};
202 if (must_dupe)
203 {
204 /* We will simulate duplication of the packet below. Since packet handling can be
205 * destructive (for performance reasons -- see handle_data_to_established()), we must create
206 * a copy that will not be hurt by this process. */
207 packet_data_copy = *(static_cast<const Blob*>(packet_data)); // Add const to express we require a copy, not move.
208 }
209
210 Fine_duration latency{m_net_env_sim ? m_net_env_sim->received_packet_latency() : Fine_duration::zero()};
211 if (latency == Fine_duration::zero())
212 {
213 // No simulated latency; just handle the packet now (mainstream case).
214 handle_incoming(packet_data, low_lvl_remote_endpoint);
215 // *packet_data may now be decimated! Do not use it.
216 ++handled;
217 }
218 else
219 {
220 /* Pretend it was actually delivered later, as the simulator told us to do. More precisely,
221 * call handle_incoming() but later. */
222 FLOW_LOG_TRACE("SIMULATION: Delaying reception of packet by simulated latency "
223 "[" << round<milliseconds>(latency) << "].");
224 async_wait_latency_then_handle_incoming(latency, packet_data, low_lvl_remote_endpoint);
225 // *packet_data may now be decimated! Do not use it.
226 }
227
228 // Possibly simulate packet duplication.
229 if (must_dupe)
230 {
231 /* Simulator told us to pretend this packet was received twice. Note that we only model a
232 * single duplication (i.e., a duplicated packet will not be simulated to itself get
233 * duplicated). Also, we don't drop a duplicated packet. */
234 FLOW_LOG_TRACE("SIMULATION: Duplicating received packet.");
235 handled += handle_incoming_with_simulation(&packet_data_copy, // Pass the COPY, not the possibly damaged original.
236 low_lvl_remote_endpoint, true);
237 // packet_data_copy may now be decimated! Do not use it.
238 }
239 }
240 else
241 {
242 // Got packet, but pretend it was dropped on the way due to packet loss, as simulator told us to do.
243 FLOW_LOG_TRACE("SIMULATION: Dropped received packet.");
244 }
245
246 return handled;
247} // Node::handle_incoming_with_simulation()
248
250 util::Blob* packet_data,
251 const util::Udp_endpoint& low_lvl_remote_endpoint)
252{
253 using util::Blob;
255 using boost::chrono::milliseconds;
256 using boost::chrono::round;
257 using boost::shared_ptr;
258
259 // We are in thread W.
260
261 // Schedule call to handle_incoming() to occur asynchronously after the latency period passes.
262
263 /* As advertised, *packet_data loses its buffer into this new container, so that caller can immediately
264 * use it for whatever they want. Meanwhile, we asynchronously own the actual data in it now.
265 * Make a smart pointer to ensure it lives long enough for handler to execute... but likely no longer than that. */
266 shared_ptr<Blob> packet_data_moved_ptr{new Blob{std::move(*packet_data)}};
267
268 // Unused if it doesn't get logged, which is a slight perf hit, but anyway this sim feature is a debug/test thing.
269 const Fine_time_pt started_at = Fine_clock::now();
270
272 [this, packet_data_moved_ptr, low_lvl_remote_endpoint, latency, started_at]
273 (bool)
274 {
275 // We are in thread W.
276
278 ("SIMULATOR: Handling low-level packet after "
279 "simulated latency [" << round<milliseconds>(latency) << "]; "
280 "actual simulated latency was "
281 "[" << round<milliseconds>(Fine_clock::now() - started_at) << "]; "
282 "from [UDP " << low_lvl_remote_endpoint << "].");
283
284 // Move (again) the actual buffer to handle_incoming()'s ownership.
285 Blob packet_data_moved_again(std::move(*packet_data_moved_ptr));
286 // *packet_data_moved_ptr is now empty and will be deleted once that smart pointer goes out of scope below.
287 handle_incoming(&packet_data_moved_again, low_lvl_remote_endpoint);
288 // packet_data_moved_again may now be decimated also! Do not use it.
289
290 /* We must do this here for similar reasons as at the end of low_lvl_recv_and_handle(). Think
291 * of the present closure as simply low_lvl_recv_and_handle() running and being able
292 * to read off the one low-level UDP packet (the argument "packet"). */
294
295 // Log a similar thing to that in low_lvl_recv_and_handle().
296 FLOW_LOG_TRACE("Handled a total of [1] incoming packets "
297 "out of a simulated [1] received in this boost.asio handler.");
298 }); // Async callback.
299} // Node::async_wait_latency_then_handle_incoming()
300
302 bool delayed_by_pacing)
303{
304 async_low_lvl_packet_send_impl(sock->remote_endpoint().m_udp_endpoint, std::move(packet), delayed_by_pacing, sock);
305}
306
309{
310 /* As of this writing we don't pace things, when no Peer_socket is involved (e.g., some RSTs) => always `false`: -|
311 * v-------------------------------------------------| */
312 async_low_lvl_packet_send_impl(low_lvl_remote_endpoint, packet, false, Peer_socket::Ptr{});
313}
314
317 bool delayed_by_pacing, Peer_socket::Ptr sock)
318{
319 using boost::asio::buffer;
320
321 assert(packet);
322 const auto& packet_ref = *packet;
323 const auto& packet_type_id = typeid(packet_ref);
324
325 // We are in thread W.
326
327 Sequence_number seq_num; // See below.
328
329 /* As explained in send_worker(), if it's a DATA packet, then we must overwrite
330 * m_sent_when. That way we'll avoid the deadly RTT drift (increase) due to pacing causing a
331 * higher RTT, then using that RTT to spread out packets more, which causes higher RTT again,
332 * etc. (If pacing is disabled, then the effect of doing this here rather than in send_worker()
333 * will not be dramatic.)
334 *
335 * Note that a number of important structures are based on m_sent_when; because m_sent_when
336 * ordering determines how things are ordered in m_snd_flying_pkts_by_sent_when and friends and thus the
337 * operation of the Drop Timer. So whenever we do finalize m_sent_when, we should handle those
338 * things too. */
339 if (packet_type_id == typeid(Data_packet))
340 {
341 assert(sock);
342 /* OK, so this is where I update m_sent_when and handle everything related to that
343 * (updating the scoreboard, Drop Timer, etc.). async_send_to() will indeed invoke UDP sendto()
344 * synchronously; that call occurs just below. */
345
346 mark_data_packet_sent(sock, static_cast<const Data_packet&>(packet_ref).m_seq_num);
347 } // if (packet type is DATA)
348 // @todo See the @todo in Node::async_low_lvl_ack_send() regarding correcting ACK delay value due to pacing.
349
350 /* Serialize to raw data. Important subtleties:
351 *
352 * This does *not* create a copied buffer. It generates a Const_buffer_sequence,
353 * which is a sequence container (like vector) containing a series of buffer
354 * start pointers and associated lengths. So it is a "scattered" buffer already in memory, namely within
355 * the Low_lvl_packet `packet`; and the boost.asio UDP async_send_to() we call below performs a "gather"
356 * operation when it generates the UDP datagram to send out. This should be superior to performing a
357 * copy that is at least proportional (in speed and memory use) to the size of the ultimate datagram.
358 * (In fact, an earlier version of the code indeed serialized to a newly generated single buffer here.)
359 *
360 * The only thing one must be careful of here is that the generated Const_buffer_sequence is valid only as long
361 * as the underlying areas in memory continue to exist; otherwise it's just a bunch of pointers into invalid
362 * memory. The underlying memory, as documented in Low_lvl_packet::serialize_to_raw_data() doc header,
363 * is valid as long as the Low_lvl_packet on which the method is called exists. For this reason, we pass
364 * `packet` (which is a shared_ptr<>) to the completion handler. Thus it will continue to exist until
365 * after the async_send_to() attempt itself. (If we hadn't dobe this, there's a large danger that
366 * `packet`'s ref-count would drop to zero at method exit just below, and hence async_send_to() asynchronously
367 * would crash a little later.)
368 *
369 * Update: Through empirical evidence, I found that, at least with Boost 1.63 and macOS, UDP async_send_to()
370 * below will silently truncate to the first 64 elements of raw_bufs. I could find no evidence of this being
371 * a common OS limitation (I've found other limits like 1024 in Linux for writev()), though I didn't dig very hard.
372 * The silent truncation was quite alarming and something to look into if only for educational purposes.
373 * In any case, we must keep raw_bufs.size() fairly low -- probably this is good for performance as well.
374 *
375 * Update: I have since looked into this in another project. This limit, 64, continues in Boost 1.75 and is actually
376 * at least imposed by boost.asio code itself, though it might just be using that value as a lowest-common-denominator
377 * technique. It is my impression that they do this silently on account of thinking only of the TCP context,
378 * where a send/write op call is free to write as few bytes as it wants, and one must deal with it in any
379 * case (until would-block). The silent dropping behavior in UDP, however, is a disaster and is really a Boost bug
380 * (@todo file it against Boost if needed). In any case it is what it is, and until it goes away, we must not
381 * exceed it. */
383 const size_t bytes_to_send = packet->serialize_to_raw_data_and_log(&raw_bufs);
384 assert(bytes_to_send != 0);
385
386 // Count an actual UDP stack send() call (if a socket was actually opened; else can't charge stats to one).
387 if (sock)
388 {
389 sock->m_snd_stats.low_lvl_packet_xfer_called(packet_type_id, delayed_by_pacing, bytes_to_send);
390 }
391
392 const size_t limit = opt(m_opts.m_dyn_low_lvl_max_packet_size);
393 if (bytes_to_send > limit)
394 {
395 // Bad and rare enough for a warning.
396 FLOW_LOG_WARNING("Tried to send low-level packet but before doing so detected "
397 "serialized size [" << bytes_to_send << "] exceeds limit [" << limit << "]; "
398 "check max-block-size and low-lvl-max-packet-size options! Serialized packet: "
399 "[\n" << packet->m_concise_ostream_manip << "].");
400 // However, full packets should not be logged unless DATA log level allowed, period.
401 FLOW_LOG_DATA("Detailed serialized packet details from preceding warning: "
402 "[\n" << packet->m_verbose_ostream_manip << "].");
403
404 // Short-circuit this, since no send occurred.
405 assert(sock && "Really? A giant low-level packet that is not even DATA? Bug.");
406 sock->m_snd_stats.low_lvl_packet_xfer_completed(packet_type_id, bytes_to_send, 0);
407 return;
408 }
409 // else
410
411 // Initiate asynchronous send.
412 m_low_lvl_sock.async_send_to(raw_bufs,
413 low_lvl_remote_endpoint,
414 [this, sock, packet, bytes_to_send](const Error_code& sys_err_code, size_t n_sent)
415 {
416 low_lvl_packet_sent(sock, packet, bytes_to_send, sys_err_code, n_sent);
417 });
418} // Node::async_low_lvl_packet_send_impl()
419
421{
422 using boost::chrono::microseconds;
423 using boost::chrono::round;
424
425 // We are in thread W.
426
427 Peer_socket::Sent_pkt_ordered_by_when_iter sent_pkt_it = sock->m_snd_flying_pkts_by_sent_when.find(seq_num);
428 if (sent_pkt_it == sock->m_snd_flying_pkts_by_sent_when.past_oldest())
429 {
430 // We haven't even (or we have just) sent packet, and its In-Flight data already gone? Very weird but not fatal.
431 FLOW_LOG_WARNING("Sending [DATA] packet over [" << sock << "] with "
432 "sequence number [" << seq_num << "] but cannot find corresponding Sent_packet. "
433 "Cannot deal with some of the related data structures; still sending. Bug?");
434 // @todo Reading this now, it seems more than "very weird." Possibly it should be an assert() or at least RST?
435 return;
436 }
437 // else
438
439 Peer_socket::Sent_packet& pkt = *sent_pkt_it->second;
440
441 /* Mark the given packet as being sent right now. The following structures depend on this
442 * event:
443 *
444 * - pkt.m_sent_when: Obviously needs to be marked with current time stamp (for RTT calculation
445 * later, among other things).
446 * - sock->m_snd_drop_timer: Since Drop_timer has an event for a packet becoming In-flight in the
447 * sense that it was actually sent over wire, which is exactly what's happening.
448 * So we call that.
449 * - Idle Timer: m_snd_last_data_sent_when is the m_sent_when value of the last DATA packet sent.
450 * Obviously we set that here. */
451
452 // Get current time; and record it for Idle Timeout calculation. See top of send_worker() for how it is used.
453 const Fine_time_pt& now = sock->m_snd_last_data_sent_when = Fine_clock::now();
454 const size_t cwnd = sock->m_snd_cong_ctl->congestion_window_bytes();
455 auto& last_send_attempt = pkt.m_sent_when.back();
456 const Peer_socket::order_num_t order_num = last_send_attempt.m_order_num;
457 auto const logger_ptr = get_logger();
458 if (logger_ptr && logger_ptr->should_log(log::Sev::S_TRACE, get_log_component()))
459 {
460 const Fine_time_pt prev_sent_when = pkt.m_sent_when.back().m_sent_time;
461 // REPLACE the values. Note m_order_num is just left alone.
462 last_send_attempt.m_sent_time = now;
463 last_send_attempt.m_sent_cwnd_bytes = cwnd;
464 const microseconds diff = round<microseconds>(now - prev_sent_when);
465
467 ("Sending/sent [DATA] packet over [" << sock << "] with "
468 "sequence number [" << seq_num << "] order_num [" << order_num << "]. Send timestamp changed from "
469 "[" << prev_sent_when << "] -> [" << now << "]; difference [" << diff << "].");
470 }
471 else
472 {
473 // Same but no logging.
474 last_send_attempt.m_sent_time = now;
475 last_send_attempt.m_sent_cwnd_bytes = cwnd;
476 }
477
478 /* Inform this guy as required, now that packet has actually been sent off over the wire (not merely In-flight
479 * by the Peer_socket definition, wherein it's been placed into sock->m_snd_flying_pkts*). */
480 const Drop_timer::Ptr drop_timer = sock->m_snd_drop_timer;
481 drop_timer->start_contemporaneous_events();
482 drop_timer->on_packet_in_flight(order_num);
483 drop_timer->end_contemporaneous_events();
484 /* ^-- @todo Is it possible to smoosh a bunch of mark_data_packet_sent() calls into a single
485 * start/end_contemporaneous_events() group? There are probably situations where 2 or more packets are sent
486 * at a time which means ultimately mark_data_packet_sent() is called essentially contemporaneously.
487 * However, at least as of this writing, mark_data_packet_sent() arises from a variety of situations, at least
488 * some of which are "singular" and not really contemporaneous with any other packet(s) being sent.
489 * So to isolate the cases where that is in fact true (2+ calls to us are contemporaneous) is certainly non-trivial,
490 * and the code to do it would be... highly technical and hard to maintain. So I wouldn't trip over myself to
491 * attempt this; and actually whatever book-keeping might be necessary to pull it off might itself have
492 * some performance cost. */
493} // Node::mark_data_packet_sent()
494
496 size_t bytes_expected_transferred, const Error_code& sys_err_code,
497 size_t bytes_transferred)
498{
499 using std::numeric_limits;
500
501 // We are in thread W.
502
503 const auto& packet_ref = *packet;
504 const auto& packet_type_id = typeid(packet_ref);
505
506 // Note: we don't save `packet` anywhere, so the memory will finally be freed, when we're done here.
507
508 // Log detailed info on packet but only if TRACE or DATA logging enabled.
509 auto const logger_ptr = get_logger();
510 if (logger_ptr && logger_ptr->should_log(log::Sev::S_TRACE, get_log_component()))
511 {
512 if (logger_ptr->should_log(log::Sev::S_DATA, get_log_component()))
513 {
514 FLOW_LOG_DATA_WITHOUT_CHECKING("Tried to send low-level packet: "
515 "[\n" << packet->m_verbose_ostream_manip << "].");
516 }
517 else
518 {
519 FLOW_LOG_TRACE_WITHOUT_CHECKING("Tried to send low-level packet: "
520 "[\n" << packet->m_concise_ostream_manip << "].");
521 }
522 }
523
524 if (sys_err_code)
525 {
526 /* Weird it failed, since it's UDP. Oh well. (Update: One thing to watch out for is "Message too long."!)
527 * Note: One might be tempted to do something dire like close some sockets. Several Flow connections might be
528 * using the same UDP port, so should we close all of them or just the one to which this outgoing packet applies?
529 * Even if we made some smart decision there, the failure to write could mean many things, so individual err_codes
530 * would need to be examined with various actions taken depending on what happened. (@todo Consider it.)
531 * Without that, just ignore it as we would any lost packet. Obviously, we deal with UDP's unreliability already.
532 *
533 * What about the specific error_codes 'would_block' and 'try_again' (in POSIX these probably both point to
534 * code EAGAIN which == EWOULDBLOCK; in Windows possibly just the former, but to be safe both should be handled the
535 * same way)? Currently I am ignoring this possibility. Why? Answer: we specifically use async_send() to wait
536 * for the socket to be ready for the trasmit operation before actually transmitting. This _should_ ensure
537 * that those two errors are _specifically_ avoided. However, that's probably not a 100% safe assumption.
538 * Even if the OS reports "Writable" at time T, a few microseconds later some resource might get used up
539 * (unrelated to our activities); and by the time the actual send executes, we might get would_block/try_again.
540 * Now, it's possible that since we do NOT use async_wait() instead of async_send() call -- meaning we let
541 * boost.asio both wait for Writable *and* itself execute the write -- that it would hide any such
542 * corner-case EAGAIN/etc. from us and just not call this handler in that case and retry later by itself.
543 * However, I don't know if it does that. @todo Therefore it would be safer to treat those error codes,
544 * would_block and try_again, specially here. Simply, we should retry the async_send(), meaning it will wait for
545 * Writable again and execute it again later on. One might worry that this might open the possibility of
546 * more than one outstanding async_send(). If this happens, it's fine. Even if boost.asio would be so rash as
547 * to allow outstanding async_send()s to execute their handlers in a different order from the calls, it's not
548 * a problem, as out-of-order datagrams are handled just fine by the other side.
549 *
550 * Note: I've placed a more visible @todo in the class doc header. So delete that if this gets done. */
552
553 if (sock)
554 {
555 sock->m_snd_stats.low_lvl_packet_xfer_completed(packet_type_id);
556 }
557
558 return;
559 }
560 // else
561
562 if (sock)
563 {
564 sock->m_snd_stats.low_lvl_packet_xfer_completed(packet_type_id, bytes_expected_transferred, bytes_transferred);
565 }
566
567 if (bytes_transferred != bytes_expected_transferred)
568 {
569 /* If a UDP send was partial, the datagram must have been too big, which we shouldn't have allowed; worth warning.
570 * Update: Actually, at least on Mac with Boost 1.63 I've seen these outcomes:
571 * - If message is too big byte-wise, it results in truthy sys_err_code.
572 * - If too many sub-buffers in scatter/gather buffer container passed to async_send_to() were passed,
573 * they may be silently truncated (!sys_err_code but the present condition is triggered). Limit was observed to
574 * be ~64. */
575 FLOW_LOG_WARNING("Low-level packet sent, but only [" << bytes_transferred << "] of "
576 "[" << bytes_expected_transferred << "] bytes "
577 "were sent. Internal error with packet size calculations? More likely, did stack truncate?");
578 return;
579 }
580 // else
581 FLOW_LOG_TRACE("Success.");
582} // Node::low_lvl_packet_sent()
583
585 const util::Udp_endpoint& low_lvl_remote_endpoint)
586{
587 using boost::asio::buffer;
588 using boost::shared_ptr;
589
590 // We are in thread W.
591
592 // Fill out typical info.
593 auto rst_base = Low_lvl_packet::create_uninit_packet_base<Rst_packet>(get_logger());
594 // No sequence number. See m_seq_num for discussion.
595 rst_base->m_packed.m_src_port = causing_packet->m_packed.m_dst_port; // Right back atcha.
596 rst_base->m_packed.m_dst_port = causing_packet->m_packed.m_src_port;
597 rst_base->m_opt_rexmit_on = false; // Not used in RST packets, so set it to something.
598
599 async_no_sock_low_lvl_packet_send(low_lvl_remote_endpoint, rst_base);
600 // If that returned false: It's an RST, so there's no one to inform of an error anymore. Oh well.
601} // Node::async_no_sock_low_lvl_rst_send()
602
604 Low_lvl_packet::Ptr&& packet)
605{
606 // We are in thread W.
607
608 // This is the general-purpose method for sending a packet along a well-defined connection (sock).
609
610 const auto& packet_ref = *packet;
611 const auto& packet_type_id = typeid(packet_ref);
612
613 sock->m_snd_stats.low_lvl_packet_xfer_requested(packet_type_id);
614
615 // Fill out typical info.
616 packet->m_packed.m_src_port = sock->m_local_port;
617 packet->m_packed.m_dst_port = sock->remote_endpoint().m_flow_port;
618 packet->m_opt_rexmit_on = sock->rexmit_on();
619
620 /* Apply packet pacing, which tries to spread out bursts of packets to prevent loss. For much
621 * more detail, see struct Send_pacing_data comment. */
622
623 if ((!sock->opt(sock->m_opts.m_st_snd_pacing_enabled)) ||
624 (sock->m_snd_smoothed_round_trip_time == Fine_duration::zero()) ||
625 (packet_type_id == typeid(Rst_packet)))
626 {
627 /* Per struct Send_pacing_data doc header, the pacing algorithm only begins once we have our
628 * first round trip time (and thus SRTT); until then we send all packets as soon as possible.
629 * Also pacing can just be disabled; in which case send all packets ASAP.
630 *
631 * Finally, if it's an RST, just send it ASAP. This is discussed in more detail in the
632 * Send_pacing_data struct header, but basically: RST means packet is being CLOSED right now.
633 * Queueing RST on the pacing queue means it may be sent after underlying socket is CLOSED.
634 * Therefore we have to keep queue and all that machinery operating past socket being CLOSED.
635 * As a rule of thumb, CLOSED => dead socket. So just send RST right away. This means it may
636 * jump the queue ahead of DATA/ACK packets already there, but since it is an error condition
637 * causing RST, we consider that OK (those packets will not be sent). */
638 async_sock_low_lvl_packet_send(sock, std::move(packet), false); // false => not queued in pacing module.
639 return;
640 }
641 // else pacing algorithm enabled and both can and must be used.
642
643 sock_pacing_new_packet_ready(sock, std::move(packet));
644} // Node::async_sock_low_lvl_packet_send_paced()
645
647{
648 using boost::chrono::duration_cast;
649 using boost::chrono::microseconds;
650 using boost::static_pointer_cast;
651 using boost::dynamic_pointer_cast;
652 using boost::shared_ptr;
653
654 // We are in thread W.
655
656 const auto& packet_ref = *packet;
657 const auto& packet_type_id = typeid(packet_ref);
658
659 // For brevity and a bit of speed.
660 Send_pacing_data& pacing = sock->m_snd_pacing_data;
661
662 const bool is_data_packet = packet_type_id == typeid(Data_packet);
663 // For logging, get the first sequence number mentioned (depending on whether it's DATA or ACK).
664 Sequence_number init_seq_num;
665 shared_ptr<const Data_packet> data;
666 if (is_data_packet)
667 {
668 init_seq_num = static_pointer_cast<const Data_packet>(packet)->m_seq_num;
669 }
670 else
671 {
672 const auto& acked_packets = static_pointer_cast<const Ack_packet>(packet)->m_rcv_acked_packets;
673 if (!acked_packets.empty())
674 {
675 init_seq_num = acked_packets.front()->m_seq_num;
676 }
677 }
678
679 const bool q_was_empty = pacing.m_packet_q.empty();
680
681 /* No matter what, we can't send packet before the ones already in the queue, so push onto end
682 * of the queue. (If queue is currently empty, then that is not a special case; we may well
683 * immediately pop and send it below.) */
684 pacing.m_packet_q.push_back(packet);
685
686 FLOW_LOG_TRACE("Pacing: On [" << sock << "] packet of type [" << packet->m_type_ostream_manip << "] "
687 "is newly available for sending; pushed onto queue; queue size [" << pacing.m_packet_q.size() << "]; "
688 "initial sequence number [" << init_seq_num << "].");
689
690 if (!q_was_empty)
691 {
692 const auto data = dynamic_pointer_cast<const Data_packet>(pacing.m_packet_q.front());
693 // !data if it's not a DATA packet.
694
695 assert((!data) || (pacing.m_bytes_allowed_this_slice < data->m_data.size()));
696
697 FLOW_LOG_TRACE("Pacing: On [" << sock << "]: was already in progress; queued and done.");
698
699 /* There were already packets in the queue, so the timer is running; certainly we can't send
700 * packet ahead of those in the queue, so push on the back of the queue above. Should we send
701 * the head packet now? No; if the last sock_pacing_new_packet_ready() or
702 * sock_pacing_time_slice_end() left a non-empty queue, then the timer has been set to fire when
703 * the slice ends, and more packets can be sent. Done. */
704 return;
705 }
706 // else if (q_was_empty)
707
708 // We have just pushed the sole packet on the queue.
709
710 if (!is_data_packet)
711 {
712 assert(packet_type_id == typeid(Ack_packet)); // Sanity check.
713 FLOW_LOG_TRACE("Pacing: On [" << sock << "]: due to packet type, sending immediately since at head of queue; "
714 "queue empty again.");
715
716 pacing.m_packet_q.clear();
717
718 /* Per discussion in struct Send_pacing_data doc header, if it's a non-DATA packet, then --
719 * other than not being allowed to be re-ordered -- it does not "count" as a paced packet. That
720 * is, it is "worth" zero bytes when compared to m_bytes_allowed_this_slice and should be sent
721 * as soon as it is at the head of the queue (which it is here, since we just placed it as the
722 * sole element on the queue). Since it doesn't count against m_bytes_allowed_this_slice, the
723 * pacing timing is irrelevant to it, and based on the "send ASAP" rule, we send it now. */
724 async_sock_low_lvl_packet_send(sock, std::move(packet), false); // false => not queued in pacing module.
725 return;
726 }
727 // else packet is DATA packet.
728
729 const Fine_time_pt now = Fine_clock::now();
730 if ((pacing.m_slice_start == Fine_time_pt{}) || (now >= (pacing.m_slice_start + pacing.m_slice_period)))
731 {
732 /* We are past the current time slice (if there is such a thing) and have a packet to send. By
733 * the algorithm in struct Send_pacing_data doc header, this means we create a new time slice with
734 * starting time = now. */
735
736 FLOW_LOG_TRACE("Pacing: On [" << sock << "]: "
737 "current time "
738 "slice [epoch+" << duration_cast<microseconds>(pacing.m_slice_start.time_since_epoch()) << " "
739 "over " << duration_cast<microseconds>(pacing.m_slice_period) << "] is over.");
740
742 /* pacing.{m_slice_start, m_slice_period, m_bytes_allowed_this_slice} have all been recomputed.
743 * By definition "now" is in [m_slice_start, m_slice_start + m_slice_period), since m_slice_start
744 * IS now (plus some epsilon). */
745 }
746 // else if ("now" is basically in time range [m_slice_start, m_slice_start + m_slice_period)) {}
747
748 /* We are in the current time slice. The one just-queued packet can be sent if there is
749 * m_bytes_allowed_this_slice budget; otherwise the timer must be started to fire at slice
750 * end, at which point we're cleared to send the packet. sock_pacing_process_q() performs this
751 * (it is generalized to work with multiple packets on the queue, but it will work fine with just
752 * one also).
753 *
754 * Note: If we just created the new time slice above, then m_bytes_allowed_this_slice >=
755 * max_block_size(), so certainly the following statement will immediately send the just-queued
756 * packet. If the time slice was in progress, then it depends. */
757
758 sock_pacing_process_q(sock, false);
759} // Node::sock_pacing_new_packet_ready()
760
762{
763 using boost::chrono::duration_cast;
764 using boost::chrono::microseconds;
765 using boost::chrono::milliseconds;
766 using std::max;
767
768 // We are in thread W.
769
770 // For brevity and a bit of speed.
771 Send_pacing_data& pacing = sock->m_snd_pacing_data;
772 const Fine_duration& srtt = sock->m_snd_smoothed_round_trip_time;
773
774 assert(srtt != Fine_duration::zero());
775
776 // New slice starts now.
777 pacing.m_slice_start = now;
778
779 /* Per struct Send_pacing_data doc header: If we had perfectly fine timer precision, then we'd
780 * want a slice that is SRTT / CWND. CWND = (congestion window in bytes / max-block-size).
781 * To minimize truncation error, then, it is X / (Y / Z) = X * Z / Y, where X, Y, Z are SRTT,
782 * max-block-size, and congestion window in bytes, respectively. */
783 Fine_duration slice_ideal_period
784 = srtt * sock->max_block_size() / sock->m_snd_cong_ctl->congestion_window_bytes();
785 if (slice_ideal_period == Fine_duration::zero())
786 {
787 // Avoid division by zero and any other tomfoolery below....
788 slice_ideal_period = Fine_duration{1};
789 }
790
791 Fine_duration timer_min_period = opt(m_opts.m_st_timer_min_period);
792 if (timer_min_period == Fine_duration::zero())
793 {
794 /* They want us to pick the a nice upper bound on the timer precision ourselves.
795 *
796 * This is conservative; in my tests Windows seem to have the worst timer precision, and it is
797 * about 15 msec. @todo Perhaps choose here based on platform. It can get hairy, as there is
798 * wide variation, so it would require much experimentation; but might be worth it for
799 * performance. */
800 const Fine_duration TIMER_MIN_PERIOD_DEFAULT = milliseconds{15};
801 timer_min_period = TIMER_MIN_PERIOD_DEFAULT;
802 }
803
804 /* Per Send_pacing_data doc header, the actual slice period is slice_ideal_period, unless that
805 * is below the timer's capabilities; in which case it is timer_min_period. */
806 pacing.m_slice_period = max(slice_ideal_period, timer_min_period);
807
808 /* Finally, allow (in this time slice) a burst of as few full-sized blocks as possible while
809 * staying below the target SRTT / CWND rate: floor(m_slice_period / slice_ideal_period).
810 * Note that when slice_ideal_period >= timer_min_period, i.e. the timer precision is fine enough
811 * to handle the desired rate exactly, then this value will always equal 1.
812 *
813 * Also, convert to bytes when actually assigning the data member.
814 *
815 * @todo Consider doing some kind of rounding instead of using floor(). */
817 = static_cast<size_t>(pacing.m_slice_period * sock->max_block_size() / slice_ideal_period);
818
819 /* If I just use the above math, I notice that over time the queue size can drift becoming slowly
820 * larger and larger as more and more time slices go by. I believe it's due to our floor math
821 * above, but I have not yet fully investigated it. @todo Investigate it.
822 *
823 * However if I just increase the number of bytes allowed per slice a little bit, it makes the
824 * drift go away and probably doesn't reduce the effectiveness of the pacing much. */
825 const size_t QUEUE_SIZE_DRIFT_PREVENTION_PCT = 110; // @todo Maybe make it a configurable option.
826 pacing.m_bytes_allowed_this_slice *= QUEUE_SIZE_DRIFT_PREVENTION_PCT;
827 pacing.m_bytes_allowed_this_slice /= 100;
828
829 assert(pacing.m_bytes_allowed_this_slice >= sock->max_block_size());
830
831 FLOW_LOG_TRACE("Pacing: On [" << sock << "]: "
832 "new time "
833 "slice [epoch+" << duration_cast<microseconds>(pacing.m_slice_start.time_since_epoch()) << " "
834 "over " << duration_cast<microseconds>(pacing.m_slice_period) << "]; "
835 "ideal slice period = [SRTT " << duration_cast<microseconds>(srtt) << "] / "
836 "([cong_wnd " << sock->m_snd_cong_ctl->congestion_window_bytes() << "] / "
837 "[max-block-size " << sock->max_block_size() << "]) = "
838 "[" << duration_cast<microseconds>(slice_ideal_period) << "]; "
839 "timer_min_period = [" << duration_cast<microseconds>(timer_min_period) << "]; "
840 "bytes_allowed = max(ideal, min) / ideal * max-block-size * "
841 "[" << QUEUE_SIZE_DRIFT_PREVENTION_PCT << "%] = "
842 "[" << pacing.m_bytes_allowed_this_slice << "].");
843} // Node::sock_pacing_new_time_slice()
844
845void Node::sock_pacing_process_q(Peer_socket::Ptr sock, bool executing_after_delay)
846{
847 using boost::chrono::milliseconds;
848 using boost::chrono::round;
849 using boost::shared_ptr;
850 using boost::weak_ptr;
851 using boost::static_pointer_cast;
852 using boost::dynamic_pointer_cast;
853 using std::max;
854
855 // We are in thread W.
856
857 /* Pre-condition is that the current time is within the current time slice, and that all other
858 * invariants hold (including that the head packet, if any, is a DATA packet). So now we send as
859 * many packets as still allowed by the budget in m_bytes_allowed_this_slice. If anything remains
860 * beyond that, we schedule a timer to hit at the end of the time slice to get the rest. */
861
862 // For brevity and a bit of speed.
863 Send_pacing_data& pacing = sock->m_snd_pacing_data;
864 shared_ptr<const Data_packet> head_packet;
865
866 FLOW_LOG_TRACE("Pacing: On [" << sock << "]: processing queue; queue size [" << pacing.m_packet_q.size() << "]; "
867 "byte budget [" << sock->bytes_blocks_str(pacing.m_bytes_allowed_this_slice) << "] remaining in this "
868 "slice.");
869
870 /* Pop things from queue until we've run out of pacing byte budget for this time slice, or until
871 * there is nothing left to send. */
872 while ((!pacing.m_packet_q.empty())
874 >= (head_packet = static_pointer_cast<const Data_packet>(pacing.m_packet_q.front()))->m_data.size()))
875 {
876 // Explicit invariant: header_packet is DATA. We always send non-DATA packets as soon as they get to head of queue.
877
878 // It is a DATA packet at head of queue, and there is enough pacing budget to send it now.
879
880 // Use up the budget.
881 pacing.m_bytes_allowed_this_slice -= head_packet->m_data.size();
882
883 FLOW_LOG_TRACE("Will send [" << head_packet->m_data.size() << "] bytes of data; budget now "
884 "[" << sock->bytes_blocks_str(pacing.m_bytes_allowed_this_slice) << "]; "
885 "queue size now [" << (pacing.m_packet_q.size() - 1) << "].");
886
887 // Send it. executing_after_delay <=> packet being sent was delayed by pacing as opposed to sent immediately.
888 async_sock_low_lvl_packet_send(sock, std::move(head_packet), executing_after_delay);
889
890 pacing.m_packet_q.pop_front(); // After this the raw pointer in head_packet should be freed.
891
892 /* Since we've popped a packet, another packet is now at the head. We must maintain the
893 * invariant that no non-DATA packet is at the head of the queue (see struct Send_pacing_data
894 * doc header for reasoning), i.e. we should send any such packets immediately. Do so until
895 * we run out or encounter a DATA packet. */
896
897 // Subtlety: Using dynamic_pointer_cast<> instead of typeid() to check packet type to avoid "side effect" warning.
898 Low_lvl_packet::Const_ptr head_packet_base;
899 while ((!pacing.m_packet_q.empty())
900 && (!(dynamic_pointer_cast<const Data_packet>(head_packet_base = pacing.m_packet_q.front()))))
901 {
902 FLOW_LOG_TRACE("Pacing: On [" << sock << "]: due to packet type, sending immediately since at head of queue; "
903 "queue size now [" << (pacing.m_packet_q.size() - 1) << "].");
904
905 // See above cmnt about last arg.
906 async_sock_low_lvl_packet_send(sock, std::move(head_packet_base), executing_after_delay);
907
908 pacing.m_packet_q.pop_front(); // After this the raw pointer in head_packet should be freed.
909
910 // Note that, as discussed in struct Send_pacing_data doc header, a non-DATA packet is worth 0 budget.
911 }
912 } // while ((m_packet_q not empty) && (more m_bytes_allowed_this_slice budget available))
913
914 if (pacing.m_packet_q.empty())
915 {
916 FLOW_LOG_TRACE("Pacing: Queue emptied.");
917
918 // Successfully sent off entire queue. Pacing done for now -- until the next sock_pacing_new_packet_ready().
919 return;
920 }
921 // else
922
923 /* No more budget left in this pacing time slice, but there is at least one packet in the queue
924 * still. Per algorithm, a fresh slice should begin ASAP (since packets must be sent ASAP but no
925 * sooner). Therefore schedule timer for the end of the time slice, which is just
926 * m_slice_start + m_slice_period. */
927 const Fine_time_pt slice_end = pacing.m_slice_start + pacing.m_slice_period;
928
929 pacing.m_slice_timer.expires_at(slice_end);
930 // (Even if slice_end is slightly in the past, that'll just mean it'll fire ASAP.)
931
932 FLOW_LOG_TRACE("Pacing: Exhausted budget; queue size [" << pacing.m_packet_q.size() << "]; "
933 "scheduling next processing at end of time slice "
934 "in [" << round<milliseconds>(slice_end - Fine_clock::now()) << "].");
935
936 // When triggered or canceled, call this->sock_pacing_time_slice_end(sock, <error code>).
937 pacing.m_slice_timer.async_wait([this, sock_observer = weak_ptr<Peer_socket>(sock)]
938 (const Error_code& sys_err_code)
939 {
940 auto sock = sock_observer.lock();
941 if (sock)
942 {
943 sock_pacing_time_slice_end(sock, sys_err_code);
944 }
945 // else { Possible or not, allow for this possibility for maintainability. }
946 });
947
948 // More work to do later, but for now we've been successful.
949
950 /* That's it. The only reason the timer would get canceled is if we go into CLOSED state, in
951 * which case it can just do nothing. */
952} // Node::sock_pacing_process_q()
953
954void Node::sock_pacing_time_slice_end(Peer_socket::Ptr sock, [[maybe_unused]] const Error_code& sys_err_code)
955{
956 // We are in thread W.
957
958 // As always, no need to lock m_state, unless we plan to alter it, since no other thread can alter it.
959 if (sock->m_state == Peer_socket::State::S_CLOSED)
960 {
961 /* Once state is CLOSED, the socket is dead -- all packets have been sent. A corollary of
962 * that is that if we must send an RST, then always send it immediately (even if it has to
963 * jump ahead of other packets waiting on queue). */
964 return;
965 }
966
967 // We only cancel the timer when we close socket, and we've already returned if we'd done that.
968 assert(sys_err_code != boost::asio::error::operation_aborted);
969
970 // There could be other errors, but as in other timer handlers, we've no idea what that means, so pretend all is OK.
971
972 // Pre-condition: if we set up the timer, then the queue had to have been non-empty at the time.
973 assert(!sock->m_snd_pacing_data.m_packet_q.empty());
974
975 FLOW_LOG_TRACE("Pacing: On [" << sock << "]: slice end timer fired; creating new slice and processing queue.");
976
977 /* Timer fired, so right now we are somewhere after the end of the current time slice. Therefore,
978 * begin the next time slice. */
979 sock_pacing_new_time_slice(sock, Fine_clock::now());
980
981 /* pacing.{m_slice_start, m_slice_period, m_bytes_allowed_this_slice} have all been recomputed.
982 * By definition "now" is in [m_slice_start, m_slice_start + m_slice_period), since m_slice_start
983 * IS now (plus some epsilon). */
984
985 /* We are in the current time slice. We just created the new time slice above, so
986 * m_bytes_allowed_this_slice >= max_block_size(), and certainly the following statement will
987 * immediately send at least one packet. */
988
989 sock_pacing_process_q(sock, true); // Process as many packets as the new budget allows.
990} // Node::sock_pacing_time_slice_end()
991
993{
994 // We are in thread W.
995
996 // Fill out common fields and asynchronously send packet.
997 auto rst = Low_lvl_packet::create_uninit_packet_base<Rst_packet>(get_logger());
998 async_sock_low_lvl_packet_send_paced(sock, std::move(rst));
999} // Node::async_sock_low_lvl_rst_send()
1000
1002{
1003 using boost::asio::buffer;
1004
1005 // We are in thread W.
1006
1007 // Fill out fields.
1008 auto rst = Low_lvl_packet::create_uninit_packet_base<Rst_packet>(get_logger());
1009 rst->m_packed.m_src_port = sock->m_local_port;
1010 rst->m_packed.m_dst_port = sock->remote_endpoint().m_flow_port;
1011 rst->m_opt_rexmit_on = false; // Unused in RST packets, so set it to something.
1012
1013 // Serialize to a buffer sequence (basically sequence of pointers/lengths referring to existing memory areas).
1015 const size_t size = rst->serialize_to_raw_data_and_log(&raw_bufs); // Logs TRACE/DATA.
1016
1017 // This special-case sending path should report stats similarly to the main path elsewhere.
1018 const auto& rst_type_id = typeid(Rst_packet); // ...a/k/a typeid(*rst).
1019 sock->m_snd_stats.low_lvl_packet_xfer_requested(rst_type_id);
1020 sock->m_snd_stats.low_lvl_packet_xfer_called(rst_type_id, false, size);
1021
1022 // Same check as when using async_send_to(). @todo Code reuse?
1023 const size_t limit = opt(m_opts.m_dyn_low_lvl_max_packet_size);
1024 if (size > limit)
1025 {
1026 // Bad and rare enough for a warning.
1027 FLOW_LOG_WARNING("Tried to send RST but before doing so detected "
1028 "serialized size [" << size << "] exceeds limit [" << limit << "]; "
1029 "check max-block-size and low-lvl-max-packet-size options! Serialized packet: "
1030 "[\n" << rst->m_concise_ostream_manip << "].");
1031
1032 sock->m_snd_stats.low_lvl_packet_xfer_completed(rst_type_id, size, 0); // No send occurred.
1033 }
1034 else
1035 {
1036 // Synchronously send to remote UDP. If non-blocking mode and not sendable, this will return error.
1037 Error_code sys_err_code;
1038 const size_t size_sent = m_low_lvl_sock.send_to(raw_bufs,
1039 sock->remote_endpoint().m_udp_endpoint, 0, sys_err_code);
1040 if (sys_err_code)
1041 {
1043 sock->m_snd_stats.low_lvl_packet_xfer_completed(rst_type_id);
1044 }
1045 else
1046 {
1047 sock->m_snd_stats.low_lvl_packet_xfer_completed(rst_type_id, size, size_sent);
1048 }
1049 }
1050} // Node::sync_sock_low_lvl_rst_send()
1051
1053 m_slice_period(0), // Comment says undefined is zero() so don't leave it uninitialized. @todo Is it really needed?
1054 m_bytes_allowed_this_slice(0),
1055 m_slice_timer(*task_engine)
1056{
1057 // Nothing.
1058}
1059
1060} // namespace flow::net_flow
const Component & get_log_component() const
Returns reference to the stored Component object, particularly as many FLOW_LOG_*() macros expect.
Definition: log.cpp:226
Logger * get_logger() const
Returns the stored Logger pointer, particularly as many FLOW_LOG_*() macros expect.
Definition: log.cpp:215
void low_lvl_packet_sent(Peer_socket::Ptr sock, Low_lvl_packet::Const_ptr packet, size_t bytes_expected_transferred, const Error_code &sys_err_code, size_t bytes_transferred)
Completion handler for async_low_lvl_packet_send_impl(); called when the packet is either successfull...
Definition: low_lvl_io.cpp:495
void sock_pacing_new_packet_ready(Peer_socket::Ptr sock, Low_lvl_packet::Ptr packet)
async_sock_low_lvl_packet_send_paced() pacing helper: Handles a DATA or ACK packet that was just pass...
Definition: low_lvl_io.cpp:646
void async_no_sock_low_lvl_packet_send(const util::Udp_endpoint &low_lvl_remote_endpoint, Low_lvl_packet::Const_ptr packet)
async_low_lvl_packet_send_impl() wrapper to call when packet is to be sent to the remote side of the ...
Definition: low_lvl_io.cpp:307
void async_sock_low_lvl_packet_send(Peer_socket::Ptr sock, Low_lvl_packet::Const_ptr &&packet, bool delayed_by_pacing)
async_low_lvl_packet_send_impl() wrapper to call when packet is to be sent to the remote side of the ...
Definition: low_lvl_io.cpp:301
void perform_accumulated_on_recv_tasks()
Performs all tasks to be performed at the end of low_lvl_recv_and_handle() or async part of async_wai...
Definition: node.cpp:375
void handle_incoming(util::Blob *packet_data, const util::Udp_endpoint &low_lvl_remote_endpoint)
Handles a just-received, not-yet-deserialized low-level packet.
Definition: node.cpp:426
void sync_sock_low_lvl_rst_send(Peer_socket::Ptr sock)
Sends an RST to the other side of the given socket, synchronously.
void mark_data_packet_sent(Peer_socket::Ptr sock, const Sequence_number &seq_num)
Performs important book-keeping based on the event "DATA packet was sent to destination....
Definition: low_lvl_io.cpp:420
void sock_pacing_time_slice_end(Peer_socket::Ptr sock, const Error_code &sys_err_code)
async_sock_low_lvl_packet_send_paced() pacing helper: If sock_pacing_process_q() ran out of the last ...
Definition: low_lvl_io.cpp:954
boost::shared_ptr< Net_env_simulator > m_net_env_sim
The object used to simulate stuff like packet loss and latency via local means directly in the code.
Definition: node.hpp:3671
void async_wait_latency_then_handle_incoming(const Fine_duration &latency, util::Blob *packet_data, const util::Udp_endpoint &low_lvl_remote_endpoint)
Sets up handle_incoming(packet_data, low_lvl_remote_endpoint) to be called asynchronously after a spe...
Definition: low_lvl_io.cpp:249
Opt_type opt(const Opt_type &opt_val_ref) const
Obtain a copy of the value of a given option in a thread-safe manner.
Definition: node.hpp:4132
void sock_pacing_process_q(Peer_socket::Ptr sock, bool executing_after_delay)
async_sock_low_lvl_packet_send_paced() pacing helper: Given that we are currently in the pacing time ...
Definition: low_lvl_io.cpp:845
void async_no_sock_low_lvl_rst_send(Low_lvl_packet::Const_ptr causing_packet, const util::Udp_endpoint &low_lvl_remote_endpoint)
Sends an RST to the given UDP endpoint in response to the given incoming low-level packet that came f...
Definition: low_lvl_io.cpp:584
unsigned int handle_incoming_with_simulation(util::Blob *packet_data, const util::Udp_endpoint &low_lvl_remote_endpoint, bool is_sim_duplicate_packet=false)
Helper for low_lvl_recv_and_handle() that calls handle_incoming() on the not-yet-deserialized low-lev...
Definition: low_lvl_io.cpp:181
void low_lvl_recv_and_handle(Error_code sys_err_code)
Handles the pre-condition that m_low_lvl_sock has a UDP packet available for reading,...
Definition: low_lvl_io.cpp:40
Udp_socket m_low_lvl_sock
The UDP socket used to receive low-level packets (to assemble into application layer data) and send t...
Definition: node.hpp:3711
void async_low_lvl_packet_send_impl(const util::Udp_endpoint &low_lvl_remote_endpoint, Low_lvl_packet::Const_ptr packet, bool delayed_by_pacing, Peer_socket::Ptr sock)
Takes given low-level packet structure, serializes it, and initiates asynchronous send of these data ...
Definition: low_lvl_io.cpp:315
util::Blob m_packet_data
Stores incoming raw packet data; re-used repeatedly for possible performance gains.
Definition: node.hpp:3732
void async_sock_low_lvl_rst_send(Peer_socket::Ptr sock)
Sends an RST to the other side of the given socket asynchronously when possible.
Definition: low_lvl_io.cpp:992
Node_options m_opts
This Node's global set of options.
Definition: node.hpp:3662
void async_low_lvl_recv()
Registers so that during the current or next m_task_engine.run(), the latter will wait for a receivab...
Definition: low_lvl_io.cpp:31
void async_sock_low_lvl_packet_send_paced(const Peer_socket::Ptr &sock, Low_lvl_packet::Ptr &&packet)
Begins the process of asynchronously sending the given low-level packet to the remote Node specified ...
Definition: low_lvl_io.cpp:603
util::Task_engine m_task_engine
The main loop engine, functioning in the single-threaded-but-asynchronous callback-based "reactor" st...
Definition: node.hpp:3697
void sock_pacing_new_time_slice(Peer_socket::Ptr sock, const Fine_time_pt &now)
async_sock_low_lvl_packet_send_paced() pacing helper: Resets the socket's Send_pacing_data structure ...
Definition: low_lvl_io.cpp:761
@ S_CLOSED
Neither future reads nor writes are possible, AND Node has disowned the Peer_socket.
Sequence_number::seq_num_t order_num_t
Short-hand for order number type. 0 is reserved. Caution: Keep in sync with Drop_timer::packet_id_t.
Sent_pkt_by_sent_when_map::iterator Sent_pkt_ordered_by_when_iter
Short-hand for m_snd_flying_pkts_by_sent_when iterator type.
An internal net_flow sequence number identifying a piece of data.
Definition: seq_num.hpp:126
boost::shared_ptr< Peer_socket > Ptr
Short-hand for ref-counted pointer to mutable values of type Target_type::element_type (a-la T*).
Const_target_ptr Const_ptr
Short-hand for ref-counted pointer to immutable values of type Target_type::element_type (a-la T cons...
#define FLOW_ERROR_SYS_ERROR_LOG_WARNING()
Logs a warning about the (often errno-based or from a library) error code in sys_err_code.
Definition: error.hpp:269
#define FLOW_LOG_DATA(ARG_stream_fragment)
Logs a DATA message into flow::log::Logger *get_logger() with flow::log::Component get_log_component(...
Definition: log.hpp:242
#define FLOW_LOG_WARNING(ARG_stream_fragment)
Logs a WARNING message into flow::log::Logger *get_logger() with flow::log::Component get_log_compone...
Definition: log.hpp:152
#define FLOW_LOG_TRACE_WITHOUT_CHECKING(ARG_stream_fragment)
Logs a TRACE message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
Definition: log.hpp:354
#define FLOW_LOG_DATA_WITHOUT_CHECKING(ARG_stream_fragment)
Logs a DATA message into flow::log::Logger *get_logger() with flow::log::Component get_log_component(...
Definition: log.hpp:372
#define FLOW_LOG_TRACE(ARG_stream_fragment)
Logs a TRACE message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
Definition: log.hpp:227
@ S_DATA
Message satisfies Sev::S_TRACE description AND contains variable-length structure (like packet,...
@ S_TRACE
Message indicates any condition that may occur with great frequency (thus verbose if logged).
Flow module containing the API and implementation of the Flow network protocol, a TCP-inspired stream...
Definition: node.cpp:25
Scheduled_task_handle schedule_task_from_now(log::Logger *logger_ptr, const Fine_duration &from_now, bool single_threaded, Task_engine *task_engine, Scheduled_task_handler &&task_body_moved)
Schedule the given function to execute in a certain amount of time: A handy wrapper around Timer (asi...
Definition: sched_task.hpp:34
boost::asio::io_context Task_engine
Short-hand for boost.asio event service, the central class of boost.asio.
Definition: util_fwd.hpp:147
Blob_with_log_context<> Blob
A concrete Blob_with_log_context that compile-time-disables Basic_blob::share() and the sharing API d...
Definition: blob_fwd.hpp:62
boost::asio::ip::udp::endpoint Udp_endpoint
Short-hand for the UDP endpoint (IP/port) type.
Definition: util_fwd.hpp:220
boost::system::error_code Error_code
Short-hand for a boost.system error code (which basically encapsulates an integer/enum error code and...
Definition: common.hpp:497
Fine_clock::duration Fine_duration
A high-res time duration as computed from two Fine_time_pts.
Definition: common.hpp:405
Fine_clock::time_point Fine_time_pt
A high-res time point as returned by Fine_clock::now() and suitable for precise time math in general.
Definition: common.hpp:402
Internal net_flow struct that encapsulates the Flow-protocol low-level ACK packet.
Internal net_flow struct that encapsulates the Flow-protocol low-level DATA packet.
std::vector< Const_buffer > Const_buffer_sequence
Short-hand for sequence of immutable buffers; i.e., a sequence of 1 or more scattered areas in memory...
size_t m_dyn_low_lvl_max_packet_size
Any incoming low-level (UDP) packet will be truncated to this size.
Definition: options.hpp:560
unsigned int m_dyn_max_packets_per_main_loop_iteration
The UDP net-stack may deliver 2 or more datagrams to the Flow Node at the same time.
Definition: options.hpp:552
Fine_duration m_st_timer_min_period
A time period such that the boost.asio timer implementation for this platform is able to accurately a...
Definition: options.hpp:541
Metadata (and data, if retransmission is on) for a packet that has been sent one (if retransmission i...
std::vector< Sent_when > m_sent_when
Time stamps, order numbers, and other info at the times when the different attempts (including origin...
Internal net_flow struct that encapsulates the Flow-protocol low-level RST packet.
The current outgoing packet pacing state, including queue of low-level packets to be sent,...
Definition: low_lvl_io.hpp:178
util::Timer m_slice_timer
When running, m_packet_q is non-empty, m_bytes_allowed_this_slice < data size of m_packet_q....
Definition: low_lvl_io.hpp:232
size_t m_bytes_allowed_this_slice
This many bytes worth of DATA packets may still be sent, at this time, within the time slice defined ...
Definition: low_lvl_io.hpp:212
Send_pacing_data(util::Task_engine *task_engine)
Initializes data to initial state (no active time slice).
Packet_q m_packet_q
Queue of low-level packets to be sent to the remote endpoint, in order in which they are to be sent,...
Definition: low_lvl_io.hpp:221
Fine_duration m_slice_period
The length of the current pacing time slice period; this depends on congestion window and SRTT on the...
Definition: low_lvl_io.hpp:199
Fine_time_pt m_slice_start
The time point at which the last pacing time slice began; or epoch if no packets sent so far (i....
Definition: low_lvl_io.hpp:192