Flow 2.0.0
Flow project: Full implementation reference.
low_lvl_io.cpp
Go to the documentation of this file.
1/* Flow
2 * Copyright 2023 Akamai Technologies, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the
5 * "License"); you may not use this file except in
6 * compliance with the License. You may obtain a copy
7 * of the License at
8 *
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in
12 * writing, software distributed under the License is
13 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
14 * CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing
16 * permissions and limitations under the License. */
17
18/// @file
24#include <utility>
25
26namespace flow::net_flow
27{
28
29// Implementations.
30
32{
33 // We are in thread W.
34 m_low_lvl_sock.async_wait(Udp_socket::wait_read, [this](const Error_code& sys_err_code)
35 {
36 low_lvl_recv_and_handle(sys_err_code);
37 });
38}
39
41{
42 using util::Blob;
43 using boost::asio::buffer;
44
45 // We are in thread W.
46
47 // Number of packets received and thus handled by handle_incoming(). Useful at least for a log message at the end.
48 unsigned int handled_packet_count = 0;
49 /* Number of packets received. (The number handled by handle_incoming() may be lower or higher
50 * if simulator is in use. */
51 unsigned int recvd_packet_count = 0;
52
53 /* The limit on the # of received packets to handle in this handler. 0 means unlimited. For
54 * reasoning as to why we'd possibly want to limit it, see doc header for this option. */
55 unsigned int recvd_packet_count_limit = opt(m_opts.m_dyn_max_packets_per_main_loop_iteration);
56
57 if (!sys_err_code)
58 {
59 /* boost.asio has reason to believe there's at least one UDP datagram ready to read from the
60 * UDP net-stack's buffer. We'll read that one. We'll also keep reading them until UDP net-stack
61 * says it "would block" (i.e., no more datagrams have been received). Why not simply call
62 * async_low_lvl_recv() and let boost.asio deal with it and call us again? Consider a user
63 * waiting for Readable for a given Peer_socket. If did that, we will
64 * read one DATA packet, load it on the Receive buffer, and signal the waiting user. Thus they
65 * may immediately read from the Receive buffer and move onto the rest of their event loop
66 * iteration. However, there may be 50 more packets we're then going to immediately put on the
67 * Receive buffer in thread W. It would be arguably more efficient to read all 51 and THEN
68 * signal the user. Therefore we use Node::m_sock_events to accumulate active events
69 * and then read all available datagrams; only after that do we then signal Event_sets for
70 * accumulated events.
71 *
72 * @todo One can conceive of some pathological case where due to extreme traffic we'd keep
73 * reading more and more datagrams and not get out of the loop for a long time. Perhaps add a
74 * knob for the maximum number of iterations to go through before ending the loop and
75 * signaling about the accumulated events. */
76
77 // Note: m_packet_data is a member that is reused repeatedly for some performance savings.
78 auto& packet_data = m_packet_data;
79 /* Don't let this dynamic option's value's change to affect this main loop iteration's value.
80 * This way packet_data.capacity() will essentially remain constant which is easier to reason about than
81 * the alternative. Soon enough this method will exit, and any new value will take effect next time. */
82 const size_t max_packet_size = opt(m_opts.m_dyn_low_lvl_max_packet_size);
83
84 util::Udp_endpoint low_lvl_remote_endpoint;
85
86 // Read until error or "would block" (no data available).
87 size_t packet_size = 0;
88 do
89 {
90 /* Make buffer big enough for any packet we'd accept as valid.
91 * resize(MAX) will only work if packet_data.zero() (the case in first iteration) or if packet_data.capacity()
92 * >= MAX. Hence, if a loop iteration below has left packet_data with unsatisfactory capacity below MAX,
93 * then ensure zero() is true before the resize(MAX) call. In other words, reallocate packet_data but only
94 * if necessary. */
95 if ((!packet_data.zero()) && (packet_data.capacity() < max_packet_size))
96 {
97 packet_data.make_zero(); // This must be done explicitly: acknowledges we force reallocation in next line.
98 }
99 packet_data.resize(max_packet_size);
100 // ^-- Could use UDP available(), but its meaning is slightly ambiguous, + I don't care for the tiny race it adds.
101
102 // Read packet from UDP net-stack internal buffer into "packet_data."
103 packet_size = m_low_lvl_sock.receive_from(buffer(packet_data.data(), packet_data.size()),
104 low_lvl_remote_endpoint, 0, sys_err_code);
105 if (!sys_err_code)
106 {
107 assert(packet_size <= packet_data.size());
108 packet_data.resize(packet_size); // boost.asio NEVER resizes vector to the # of bytes it read.
109
110 FLOW_LOG_TRACE("Received low-level packet at [UDP " << m_low_lvl_sock.local_endpoint() << "] from "
111 "[UDP " << low_lvl_remote_endpoint << "]:");
112
113 // Count it against the limit.
114 ++recvd_packet_count;
115
116 handled_packet_count += handle_incoming_with_simulation(&packet_data, low_lvl_remote_endpoint);
117 /* packet_data is still valid and owned by us but may have any structure at all.
118 * As of this writing, in practice -- assuming no simulated delay -- packet_data would be untouched
119 * (and therefore could be reused sans reallocation for the next packet read in above, if any) for all
120 * packet types except DATA, which is moved elsewhere via std::move() semantics and would require
121 * reallocation above. */
122 }
123 else if (sys_err_code != boost::asio::error::would_block) // "Would block" is normal (no data available).
124 {
125 /* Weird it failed, since it's UDP. Oh well.
126 * Note: One might be tempted to do something dire like close some sockets. Note that in the current setup
127 * we wouldn't know which socket(s) to close, since several live on the same low-level (UDP) port. Even if
128 * we did, the failure to read could mean many things, so individual err_codes would need to be examined
129 * with various actions taken depending on what happened. (@todo Consider it.) Without that, just ignore it
130 * as we would any lost packet. Obviously, we deal with UDP's unreliability already.
131 *
132 * What about the would_block and try_again (EAGAIN/EWOULDBLOCK in POSIX world and similar for Windows)
133 * error codes which often demand special handling? Well, firstly, we are likely mostly not hitting that
134 * situation, as async_receive() (the call for which this is the handler function) specifically attempts
135 * to avoid those error codes by only executing the handler once the socket is Readable.
136 * Still, it's probably not impossible: we used async_wait() as of this writing, which means the actual
137 * receiving is done in this handler, not by boost.asio (and even if it was, we would still try more receives
138 * until no more are available). Between detection of Readable by boost.asio and the actual receive call,
139 * the situation may have changed. Well, fine. What's there to do? would_block/try_again means
140 * not Readable right now... try later. Great! We do just that in any case below by executing
141 * async_receive() again (inside the call at the end of this method). So this read failed due to some
142 * odd change in conditions; best we can do is wait for Readable and try again later. And we do. */
144 assert(packet_size == 0); // Should be the case on error.
145 }
146 else
147 {
148 assert(packet_size == 0);
149 }
150 }
151 // while (still getting data && (haven't exceeded per-handler limit || there is no limit))
152 while ((packet_size != 0)
153 && ((recvd_packet_count_limit == 0) || (recvd_packet_count < recvd_packet_count_limit)));
154
155 /* Have read all available low-level data. This may have accumulated certain tasks, like
156 * combining pending individual acknowledgments into larger ACK packet(s), to be performed at
157 * the end of the handler. Do so now. */
159
160 // Helpful in understanding the relationship between handler invocations and "delta" Event_set checks.
161 FLOW_LOG_TRACE("Handled a total of [" << handled_packet_count << "] incoming packets "
162 "out of [" << recvd_packet_count << "] received (limit [" << recvd_packet_count_limit << "]) "
163 "in this boost.asio handler.");
164 } // if (!err_code)
165 else
166 {
167 // boost.asio called us with an error. Strange, since this is UDP, but treat it same as a read error above.
169 }
170
171 /* If socket errors are detected above, since it's UDP and connectionless, there is hope we're
172 * still OK, so still set up wait at the end of this callback. Do not stop event loop.
173 *
174 * @todo This isn't necessarily sound. Could investigate the possible errors and act
175 * accordingly. */
176
177 // Register ourselves for the next UDP receive.
179} // Node::low_lvl_recv_and_handle()
180
182 const util::Udp_endpoint& low_lvl_remote_endpoint,
183 bool is_sim_duplicate_packet)
184{
185 using util::Blob;
186 using boost::chrono::milliseconds; // Just for the output streaming.
187 using boost::chrono::round;
188
189 // We are in thread W.
190
191 unsigned int handled = 0; // How many times we called handle_incoming() inside this invocation.
192
193 // Basically we should call handle_incoming(), but we may first have to simulate various network conditions.
194
195 if (is_sim_duplicate_packet || (!m_net_env_sim) || (!m_net_env_sim->should_drop_received_packet()))
196 {
197 // See below.
198 const bool must_dupe
199 = (!is_sim_duplicate_packet) && m_net_env_sim && m_net_env_sim->should_duplicate_received_packet();
200
201 Blob packet_data_copy(get_logger());
202 if (must_dupe)
203 {
204 /* We will simulate duplication of the packet below. Since packet handling can be
205 * destructive (for performance reasons -- see handle_data_to_established()), we must create
206 * a copy that will not be hurt by this process. */
207 packet_data_copy = *(static_cast<const Blob*>(packet_data)); // Add const to express we require a copy, not move.
208 }
209
210 Fine_duration latency(m_net_env_sim ? m_net_env_sim->received_packet_latency() : Fine_duration::zero());
211 if (latency == Fine_duration::zero())
212 {
213 // No simulated latency; just handle the packet now (mainstream case).
214 handle_incoming(packet_data, low_lvl_remote_endpoint);
215 // *packet_data may now be decimated! Do not use it.
216 ++handled;
217 }
218 else
219 {
220 /* Pretend it was actually delivered later, as the simulator told us to do. More precisely,
221 * call handle_incoming() but later. */
222 FLOW_LOG_TRACE("SIMULATION: Delaying reception of packet by simulated latency "
223 "[" << round<milliseconds>(latency) << "].");
224 async_wait_latency_then_handle_incoming(latency, packet_data, low_lvl_remote_endpoint);
225 // *packet_data may now be decimated! Do not use it.
226 }
227
228 // Possibly simulate packet duplication.
229 if (must_dupe)
230 {
231 /* Simulator told us to pretend this packet was received twice. Note that we only model a
232 * single duplication (i.e., a duplicated packet will not be simulated to itself get
233 * duplicated). Also, we don't drop a duplicated packet. */
234 FLOW_LOG_TRACE("SIMULATION: Duplicating received packet.");
235 handled += handle_incoming_with_simulation(&packet_data_copy, // Pass the COPY, not the possibly damaged original.
236 low_lvl_remote_endpoint, true);
237 // packet_data_copy may now be decimated! Do not use it.
238 }
239 }
240 else
241 {
242 // Got packet, but pretend it was dropped on the way due to packet loss, as simulator told us to do.
243 FLOW_LOG_TRACE("SIMULATION: Dropped received packet.");
244 }
245
246 return handled;
247} // Node::handle_incoming_with_simulation()
248
250 util::Blob* packet_data,
251 const util::Udp_endpoint& low_lvl_remote_endpoint)
252{
253 using util::Blob;
255 using boost::chrono::milliseconds;
256 using boost::chrono::round;
257 using boost::shared_ptr;
258
259 // We are in thread W.
260
261 // Schedule call to handle_incoming() to occur asynchronously after the latency period passes.
262
263 /* As advertised, *packet_data loses its buffer into this new container, so that caller can immediately
264 * use it for whatever they want. Meanwhile, we asynchronously own the actual data in it now.
265 * Make a smart pointer to ensure it lives long enough for handler to execute... but likely no longer than that. */
266 shared_ptr<Blob> packet_data_moved_ptr(new Blob(std::move(*packet_data)));
267
268 // Unused if it doesn't get logged, which is a slight perf hit, but anyway this sim feature is a debug/test thing.
269 const Fine_time_pt started_at = Fine_clock::now();
270
272 [this, packet_data_moved_ptr, low_lvl_remote_endpoint, latency, started_at]
273 (bool)
274 {
275 // We are in thread W.
276
278 ("SIMULATOR: Handling low-level packet after "
279 "simulated latency [" << round<milliseconds>(latency) << "]; "
280 "actual simulated latency was "
281 "[" << round<milliseconds>(Fine_clock::now() - started_at) << "]; "
282 "from [UDP " << low_lvl_remote_endpoint << "].");
283
284 // Move (again) the actual buffer to handle_incoming()'s ownership.
285 Blob packet_data_moved_again(std::move(*packet_data_moved_ptr));
286 // *packet_data_moved_ptr is now empty and will be deleted once that smart pointer goes out of scope below.
287 handle_incoming(&packet_data_moved_again, low_lvl_remote_endpoint);
288 // packet_data_moved_again may now be decimated also! Do not use it.
289
290 /* We must do this here for similar reasons as at the end of low_lvl_recv_and_handle(). Think
291 * of the present closure as simply low_lvl_recv_and_handle() running and being able
292 * to read off the one low-level UDP packet (the argument "packet"). */
294
295 // Log a similar thing to that in low_lvl_recv_and_handle().
296 FLOW_LOG_TRACE("Handled a total of [1] incoming packets "
297 "out of a simulated [1] received in this boost.asio handler.");
298 }); // Async callback.
299} // Node::async_wait_latency_then_handle_incoming()
300
302 bool delayed_by_pacing)
303{
304 async_low_lvl_packet_send_impl(sock->remote_endpoint().m_udp_endpoint, std::move(packet), delayed_by_pacing, sock);
305}
306
309{
310 /* As of this writing we don't pace things, when no Peer_socket is involved (e.g., some RSTs) => always `false`: -|
311 * v-------------------------------------------------| */
312 async_low_lvl_packet_send_impl(low_lvl_remote_endpoint, packet, false, Peer_socket::Ptr());
313}
314
317 bool delayed_by_pacing, Peer_socket::Ptr sock)
318{
319 using boost::asio::buffer;
320
321 assert(packet);
322 const auto& packet_ref = *packet;
323 const auto& packet_type_id = typeid(packet_ref);
324
325 // We are in thread W.
326
327 Sequence_number seq_num; // See below.
328
329 /* As explained in send_worker(), if it's a DATA packet, then we must overwrite
330 * m_sent_when. That way we'll avoid the deadly RTT drift (increase) due to pacing causing a
331 * higher RTT, then using that RTT to spread out packets more, which causes higher RTT again,
332 * etc. (If pacing is disabled, then the effect of doing this here rather than in send_worker()
333 * will not be dramatic.)
334 *
335 * Note that a number of important structures are based on m_sent_when; because m_sent_when
336 * ordering determines how things are ordered in m_snd_flying_pkts_by_sent_when and friends and thus the
337 * operation of the Drop Timer. So whenever we do finalize m_sent_when, we should handle those
338 * things too. */
339 if (packet_type_id == typeid(Data_packet))
340 {
341 assert(sock);
342 /* OK, so this is where I update m_sent_when and handle everything related to that
343 * (updating the scoreboard, Drop Timer, etc.). async_send_to() will indeed invoke UDP sendto()
344 * synchronously; that call occurs just below. */
345
346 mark_data_packet_sent(sock, static_cast<const Data_packet&>(packet_ref).m_seq_num);
347 } // if (packet type is DATA)
348 // @todo See the @todo in Node::async_low_lvl_ack_send() regarding correcting ACK delay value due to pacing.
349
350 /* Serialize to raw data. Important subtleties:
351 *
352 * This does *not* create a copied buffer. It generates a Const_buffer_sequence,
353 * which is a sequence container (like vector) containing a series of buffer
354 * start pointers and associated lengths. So it is a "scattered" buffer already in memory, namely within
355 * the Low_lvl_packet `packet`; and the boost.asio UDP async_send_to() we call below performs a "gather"
356 * operation when it generates the UDP datagram to send out. This should be superior to performing a
357 * copy that is at least proportional (in speed and memory use) to the size of the ultimate datagram.
358 * (In fact, an earlier version of the code indeed serialized to a newly generated single buffer here.)
359 *
360 * The only thing one must be careful of here is that the generated Const_buffer_sequence is valid only as long
361 * as the underlying areas in memory continue to exist; otherwise it's just a bunch of pointers into invalid
362 * memory. The underlying memory, as documented in Low_lvl_packet::serialize_to_raw_data() doc header,
363 * is valid as long as the Low_lvl_packet on which the method is called exists. For this reason, we pass
364 * `packet` (which is a shared_ptr<>) to the completion handler. Thus it will continue to exist until
365 * after the async_send_to() attempt itself. (If we hadn't dobe this, there's a large danger that
366 * `packet`'s ref-count would drop to zero at method exit just below, and hence async_send_to() asynchronously
367 * would crash a little later.)
368 *
369 * Update: Through empirical evidence, I found that, at least with Boost 1.63 and macOS, UDP async_send_to()
370 * below will silently truncate to the first 64 elements of raw_bufs. I could find no evidence of this being
371 * a common OS limitation (I've found other limits like 1024 in Linux for writev()), though I didn't dig very hard.
372 * The silent truncation was quite alarming and something to look into if only for educational purposes.
373 * In any case, we must keep raw_bufs.size() fairly low -- probably this is good for performance as well.
374 *
375 * Update: I have since looked into this in another project. This limit, 64, continues in Boost 1.75 and is actually
376 * at least imposed by boost.asio code itself, though it might just be using that value as a lowest-common-denominator
377 * technique. It is my impression that they do this silently on account of thinking only of the TCP context,
378 * where a send/write op call is free to write as few bytes as it wants, and one must deal with it in any
379 * case (until would-block). The silent dropping behavior in UDP, however, is a disaster and is really a Boost bug
380 * (@todo file it against Boost if needed). In any case it is what it is, and until it goes away, we must not
381 * exceed it. */
383 const size_t bytes_to_send = packet->serialize_to_raw_data_and_log(&raw_bufs);
384 assert(bytes_to_send != 0);
385
386 // Count an actual UDP stack send() call.
387 sock->m_snd_stats.low_lvl_packet_xfer_called(packet_type_id, delayed_by_pacing, bytes_to_send);
388
389 const size_t limit = opt(m_opts.m_dyn_low_lvl_max_packet_size);
390 if (bytes_to_send > limit)
391 {
392 // Bad and rare enough for a warning.
393 FLOW_LOG_WARNING("Tried to send low-level packet but before doing so detected "
394 "serialized size [" << bytes_to_send << "] exceeds limit [" << limit << "]; "
395 "check max-block-size and low-lvl-max-packet-size options! Serialized packet: "
396 "[\n" << packet->m_concise_ostream_manip << "].");
397 // However, full packets should not be logged unless DATA log level allowed, period.
398 FLOW_LOG_DATA("Detailed serialized packet details from preceding warning: "
399 "[\n" << packet->m_verbose_ostream_manip << "].");
400
401 // Short-circuit this, since no send occurred.
402 sock->m_snd_stats.low_lvl_packet_xfer_completed(packet_type_id, bytes_to_send, 0);
403 return;
404 }
405 // else
406
407 // Initiate asynchronous send.
408 m_low_lvl_sock.async_send_to(raw_bufs,
409 low_lvl_remote_endpoint,
410 [this, sock, packet, bytes_to_send](const Error_code& sys_err_code, size_t n_sent)
411 {
412 low_lvl_packet_sent(sock, packet, bytes_to_send, sys_err_code, n_sent);
413 });
414} // Node::async_low_lvl_packet_send_impl()
415
417{
418 using boost::chrono::microseconds;
419 using boost::chrono::round;
420
421 // We are in thread W.
422
423 Peer_socket::Sent_pkt_ordered_by_when_iter sent_pkt_it = sock->m_snd_flying_pkts_by_sent_when.find(seq_num);
424 if (sent_pkt_it == sock->m_snd_flying_pkts_by_sent_when.past_oldest())
425 {
426 // We haven't even (or we have just) sent packet, and its In-Flight data already gone? Very weird but not fatal.
427 FLOW_LOG_WARNING("Sending [DATA] packet over [" << sock << "] with "
428 "sequence number [" << seq_num << "] but cannot find corresponding Sent_packet. "
429 "Cannot deal with some of the related data structures; still sending. Bug?");
430 // @todo Reading this now, it seems more than "very weird." Possibly it should be an assert() or at least RST?
431 return;
432 }
433 // else
434
435 Peer_socket::Sent_packet& pkt = *sent_pkt_it->second;
436
437 /* Mark the given packet as being sent right now. The following structures depend on this
438 * event:
439 *
440 * - pkt.m_sent_when: Obviously needs to be marked with current time stamp (for RTT calculation
441 * later, among other things).
442 * - sock->m_snd_drop_timer: Since Drop_timer has an event for a packet becoming In-flight in the
443 * sense that it was actually sent over wire, which is exactly what's happening.
444 * So we call that.
445 * - Idle Timer: m_snd_last_data_sent_when is the m_sent_when value of the last DATA packet sent.
446 * Obviously we set that here. */
447
448 // Get current time; and record it for Idle Timeout calculation. See top of send_worker() for how it is used.
449 const Fine_time_pt& now = sock->m_snd_last_data_sent_when = Fine_clock::now();
450 const size_t cwnd = sock->m_snd_cong_ctl->congestion_window_bytes();
451 auto& last_send_attempt = pkt.m_sent_when.back();
452 const Peer_socket::order_num_t order_num = last_send_attempt.m_order_num;
453 auto const logger_ptr = get_logger();
454 if (logger_ptr && logger_ptr->should_log(log::Sev::S_TRACE, get_log_component()))
455 {
456 const Fine_time_pt prev_sent_when = pkt.m_sent_when.back().m_sent_time;
457 // REPLACE the values. Note m_order_num is just left alone.
458 last_send_attempt.m_sent_time = now;
459 last_send_attempt.m_sent_cwnd_bytes = cwnd;
460 const microseconds diff = round<microseconds>(now - prev_sent_when);
461
463 ("Sending/sent [DATA] packet over [" << sock << "] with "
464 "sequence number [" << seq_num << "] order_num [" << order_num << "]. Send timestamp changed from "
465 "[" << prev_sent_when << "] -> [" << now << "]; difference [" << diff << "].");
466 }
467 else
468 {
469 // Same but no logging.
470 last_send_attempt.m_sent_time = now;
471 last_send_attempt.m_sent_cwnd_bytes = cwnd;
472 }
473
474 /* Inform this guy as required, now that packet has actually been sent off over the wire (not merely In-flight
475 * by the Peer_socket definition, wherein it's been placed into sock->m_snd_flying_pkts*). */
476 const Drop_timer::Ptr drop_timer = sock->m_snd_drop_timer;
477 drop_timer->start_contemporaneous_events();
478 drop_timer->on_packet_in_flight(order_num);
479 drop_timer->end_contemporaneous_events();
480 /* ^-- @todo Is it possible to smoosh a bunch of mark_data_packet_sent() calls into a single
481 * start/end_contemporaneous_events() group? There are probably situations where 2 or more packets are sent
482 * at a time which means ultimately mark_data_packet_sent() is called essentially contemporaneously.
483 * However, at least as of this writing, mark_data_packet_sent() arises from a variety of situations, at least
484 * some of which are "singular" and not really contemporaneous with any other packet(s) being sent.
485 * So to isolate the cases where that is in fact true (2+ calls to us are contemporaneous) is certainly non-trivial,
486 * and the code to do it would be... highly technical and hard to maintain. So I wouldn't trip over myself to
487 * attempt this; and actually whatever book-keeping might be necessary to pull it off might itself have
488 * some performance cost. */
489} // Node::mark_data_packet_sent()
490
492 size_t bytes_expected_transferred, const Error_code& sys_err_code,
493 size_t bytes_transferred)
494{
495 using std::numeric_limits;
496
497 // We are in thread W.
498
499 const auto& packet_ref = *packet;
500 const auto& packet_type_id = typeid(packet_ref);
501
502 // Note: we don't save `packet` anywhere, so the memory will finally be freed, when we're done here.
503
504 // Log detailed info on packet but only if TRACE or DATA logging enabled.
505 auto const logger_ptr = get_logger();
506 if (logger_ptr && logger_ptr->should_log(log::Sev::S_TRACE, get_log_component()))
507 {
508 if (logger_ptr->should_log(log::Sev::S_DATA, get_log_component()))
509 {
510 FLOW_LOG_DATA_WITHOUT_CHECKING("Tried to send low-level packet: "
511 "[\n" << packet->m_verbose_ostream_manip << "].");
512 }
513 else
514 {
515 FLOW_LOG_TRACE_WITHOUT_CHECKING("Tried to send low-level packet: "
516 "[\n" << packet->m_concise_ostream_manip << "].");
517 }
518 }
519
520 if (sys_err_code)
521 {
522 /* Weird it failed, since it's UDP. Oh well. (Update: One thing to watch out for is "Message too long."!)
523 * Note: One might be tempted to do something dire like close some sockets. Several Flow connections might be
524 * using the same UDP port, so should we close all of them or just the one to which this outgoing packet applies?
525 * Even if we made some smart decision there, the failure to write could mean many things, so individual err_codes
526 * would need to be examined with various actions taken depending on what happened. (@todo Consider it.)
527 * Without that, just ignore it as we would any lost packet. Obviously, we deal with UDP's unreliability already.
528 *
529 * What about the specific error_codes 'would_block' and 'try_again' (in POSIX these probably both point to
530 * code EAGAIN which == EWOULDBLOCK; in Windows possibly just the former, but to be safe both should be handled the
531 * same way)? Currently I am ignoring this possibility. Why? Answer: we specifically use async_send() to wait
532 * for the socket to be ready for the trasmit operation before actually transmitting. This _should_ ensure
533 * that those two errors are _specifically_ avoided. However, that's probably not a 100% safe assumption.
534 * Even if the OS reports "Writable" at time T, a few microseconds later some resource might get used up
535 * (unrelated to our activities); and by the time the actual send executes, we might get would_block/try_again.
536 * Now, it's possible that since we do NOT use async_wait() instead of async_send() call -- meaning we let
537 * boost.asio both wait for Writable *and* itself execute the write -- that it would hide any such
538 * corner-case EAGAIN/etc. from us and just not call this handler in that case and retry later by itself.
539 * However, I don't know if it does that. @todo Therefore it would be safer to treat those error codes,
540 * would_block and try_again, specially here. Simply, we should retry the async_send(), meaning it will wait for
541 * Writable again and execute it again later on. One might worry that this might open the possibility of
542 * more than one outstanding async_send(). If this happens, it's fine. Even if boost.asio would be so rash as
543 * to allow outstanding async_send()s to execute their handlers in a different order from the calls, it's not
544 * a problem, as out-of-order datagrams are handled just fine by the other side.
545 *
546 * Note: I've placed a more visible @todo in the class doc header. So delete that if this gets done. */
548
549 if (sock)
550 {
551 sock->m_snd_stats.low_lvl_packet_xfer_completed(packet_type_id);
552 }
553
554 return;
555 }
556 // else
557
558 if (sock)
559 {
560 sock->m_snd_stats.low_lvl_packet_xfer_completed(packet_type_id, bytes_expected_transferred, bytes_transferred);
561 }
562
563 if (bytes_transferred != bytes_expected_transferred)
564 {
565 /* If a UDP send was partial, the datagram must have been too big, which we shouldn't have allowed; worth warning.
566 * Update: Actually, at least on Mac with Boost 1.63 I've seen these outcomes:
567 * - If message is too big byte-wise, it results in truthy sys_err_code.
568 * - If too many sub-buffers in scatter/gather buffer container passed to async_send_to() were passed,
569 * they may be silently truncated (!sys_err_code but the present condition is triggered). Limit was observed to
570 * be ~64. */
571 FLOW_LOG_WARNING("Low-level packet sent, but only [" << bytes_transferred << "] of "
572 "[" << bytes_expected_transferred << "] bytes "
573 "were sent. Internal error with packet size calculations? More likely, did stack truncate?");
574 return;
575 }
576 // else
577 FLOW_LOG_TRACE("Success.");
578} // Node::low_lvl_packet_sent()
579
581 const util::Udp_endpoint& low_lvl_remote_endpoint)
582{
583 using boost::asio::buffer;
584 using boost::shared_ptr;
585
586 // We are in thread W.
587
588 // Fill out typical info.
589 auto rst_base = Low_lvl_packet::create_uninit_packet_base<Rst_packet>(get_logger());
590 // No sequence number. See m_seq_num for discussion.
591 rst_base->m_packed.m_src_port = causing_packet->m_packed.m_dst_port; // Right back atcha.
592 rst_base->m_packed.m_dst_port = causing_packet->m_packed.m_src_port;
593 rst_base->m_opt_rexmit_on = false; // Not used in RST packets, so set it to something.
594
595 async_no_sock_low_lvl_packet_send(low_lvl_remote_endpoint, rst_base);
596 // If that returned false: It's an RST, so there's no one to inform of an error anymore. Oh well.
597} // Node::async_no_sock_low_lvl_rst_send()
598
600 Low_lvl_packet::Ptr&& packet)
601{
602 // We are in thread W.
603
604 // This is the general-purpose method for sending a packet along a well-defined connection (sock).
605
606 const auto& packet_ref = *packet;
607 const auto& packet_type_id = typeid(packet_ref);
608
609 sock->m_snd_stats.low_lvl_packet_xfer_requested(packet_type_id);
610
611 // Fill out typical info.
612 packet->m_packed.m_src_port = sock->m_local_port;
613 packet->m_packed.m_dst_port = sock->remote_endpoint().m_flow_port;
614 packet->m_opt_rexmit_on = sock->rexmit_on();
615
616 /* Apply packet pacing, which tries to spread out bursts of packets to prevent loss. For much
617 * more detail, see struct Send_pacing_data comment. */
618
619 if ((!sock->opt(sock->m_opts.m_st_snd_pacing_enabled)) ||
620 (sock->m_snd_smoothed_round_trip_time == Fine_duration::zero()) ||
621 (packet_type_id == typeid(Rst_packet)))
622 {
623 /* Per struct Send_pacing_data doc header, the pacing algorithm only begins once we have our
624 * first round trip time (and thus SRTT); until then we send all packets as soon as possible.
625 * Also pacing can just be disabled; in which case send all packets ASAP.
626 *
627 * Finally, if it's an RST, just send it ASAP. This is discussed in more detail in the
628 * Send_pacing_data struct header, but basically: RST means packet is being CLOSED right now.
629 * Queueing RST on the pacing queue means it may be sent after underlying socket is CLOSED.
630 * Therefore we have to keep queue and all that machinery operating past socket being CLOSED.
631 * As a rule of thumb, CLOSED => dead socket. So just send RST right away. This means it may
632 * jump the queue ahead of DATA/ACK packets already there, but since it is an error condition
633 * causing RST, we consider that OK (those packets will not be sent). */
634 async_sock_low_lvl_packet_send(sock, std::move(packet), false); // false => not queued in pacing module.
635 return;
636 }
637 // else pacing algorithm enabled and both can and must be used.
638
639 sock_pacing_new_packet_ready(sock, std::move(packet));
640} // Node::async_sock_low_lvl_packet_send_paced()
641
643{
644 using boost::chrono::duration_cast;
645 using boost::chrono::microseconds;
646 using boost::static_pointer_cast;
647 using boost::dynamic_pointer_cast;
648 using boost::shared_ptr;
649
650 // We are in thread W.
651
652 const auto& packet_ref = *packet;
653 const auto& packet_type_id = typeid(packet_ref);
654
655 // For brevity and a bit of speed.
656 Send_pacing_data& pacing = sock->m_snd_pacing_data;
657
658 const bool is_data_packet = packet_type_id == typeid(Data_packet);
659 // For logging, get the first sequence number mentioned (depending on whether it's DATA or ACK).
660 Sequence_number init_seq_num;
661 shared_ptr<const Data_packet> data;
662 if (is_data_packet)
663 {
664 init_seq_num = static_pointer_cast<const Data_packet>(packet)->m_seq_num;
665 }
666 else
667 {
668 const auto& acked_packets = static_pointer_cast<const Ack_packet>(packet)->m_rcv_acked_packets;
669 if (!acked_packets.empty())
670 {
671 init_seq_num = acked_packets.front()->m_seq_num;
672 }
673 }
674
675 const bool q_was_empty = pacing.m_packet_q.empty();
676
677 /* No matter what, we can't send packet before the ones already in the queue, so push onto end
678 * of the queue. (If queue is currently empty, then that is not a special case; we may well
679 * immediately pop and send it below.) */
680 pacing.m_packet_q.push_back(packet);
681
682 FLOW_LOG_TRACE("Pacing: On [" << sock << "] packet of type [" << packet->m_type_ostream_manip << "] "
683 "is newly available for sending; pushed onto queue; queue size [" << pacing.m_packet_q.size() << "]; "
684 "initial sequence number [" << init_seq_num << "].");
685
686 if (!q_was_empty)
687 {
688 const auto data = dynamic_pointer_cast<const Data_packet>(pacing.m_packet_q.front());
689 // !data if it's not a DATA packet.
690
691 assert((!data) || (pacing.m_bytes_allowed_this_slice < data->m_data.size()));
692
693 FLOW_LOG_TRACE("Pacing: On [" << sock << "]: was already in progress; queued and done.");
694
695 /* There were already packets in the queue, so the timer is running; certainly we can't send
696 * packet ahead of those in the queue, so push on the back of the queue above. Should we send
697 * the head packet now? No; if the last sock_pacing_new_packet_ready() or
698 * sock_pacing_time_slice_end() left a non-empty queue, then the timer has been set to fire when
699 * the slice ends, and more packets can be sent. Done. */
700 return;
701 }
702 // else if (q_was_empty)
703
704 // We have just pushed the sole packet on the queue.
705
706 if (!is_data_packet)
707 {
708 assert(packet_type_id == typeid(Ack_packet)); // Sanity check.
709 FLOW_LOG_TRACE("Pacing: On [" << sock << "]: due to packet type, sending immediately since at head of queue; "
710 "queue empty again.");
711
712 pacing.m_packet_q.clear();
713
714 /* Per discussion in struct Send_pacing_data doc header, if it's a non-DATA packet, then --
715 * other than not being allowed to be re-ordered -- it does not "count" as a paced packet. That
716 * is, it is "worth" zero bytes when compared to m_bytes_allowed_this_slice and should be sent
717 * as soon as it is at the head of the queue (which it is here, since we just placed it as the
718 * sole element on the queue). Since it doesn't count against m_bytes_allowed_this_slice, the
719 * pacing timing is irrelevant to it, and based on the "send ASAP" rule, we send it now. */
720 async_sock_low_lvl_packet_send(sock, std::move(packet), false); // false => not queued in pacing module.
721 return;
722 }
723 // else packet is DATA packet.
724
725 const Fine_time_pt now = Fine_clock::now();
726 if ((pacing.m_slice_start == Fine_time_pt()) || (now >= (pacing.m_slice_start + pacing.m_slice_period)))
727 {
728 /* We are past the current time slice (if there is such a thing) and have a packet to send. By
729 * the algorithm in struct Send_pacing_data doc header, this means we create a new time slice with
730 * starting time = now. */
731
732 FLOW_LOG_TRACE("Pacing: On [" << sock << "]: "
733 "current time "
734 "slice [epoch+" << duration_cast<microseconds>(pacing.m_slice_start.time_since_epoch()) << " "
735 "over " << duration_cast<microseconds>(pacing.m_slice_period) << "] is over.");
736
738 /* pacing.{m_slice_start, m_slice_period, m_bytes_allowed_this_slice} have all been recomputed.
739 * By definition "now" is in [m_slice_start, m_slice_start + m_slice_period), since m_slice_start
740 * IS now (plus some epsilon). */
741 }
742 // else if ("now" is basically in time range [m_slice_start, m_slice_start + m_slice_period)) {}
743
744 /* We are in the current time slice. The one just-queued packet can be sent if there is
745 * m_bytes_allowed_this_slice budget; otherwise the timer must be started to fire at slice
746 * end, at which point we're cleared to send the packet. sock_pacing_process_q() performs this
747 * (it is generalized to work with multiple packets on the queue, but it will work fine with just
748 * one also).
749 *
750 * Note: If we just created the new time slice above, then m_bytes_allowed_this_slice >=
751 * max_block_size(), so certainly the following statement will immediately send the just-queued
752 * packet. If the time slice was in progress, then it depends. */
753
754 sock_pacing_process_q(sock, false);
755} // Node::sock_pacing_new_packet_ready()
756
758{
759 using boost::chrono::duration_cast;
760 using boost::chrono::microseconds;
761 using boost::chrono::milliseconds;
762 using std::max;
763
764 // We are in thread W.
765
766 // For brevity and a bit of speed.
767 Send_pacing_data& pacing = sock->m_snd_pacing_data;
768 const Fine_duration& srtt = sock->m_snd_smoothed_round_trip_time;
769
770 assert(srtt != Fine_duration::zero());
771
772 // New slice starts now.
773 pacing.m_slice_start = now;
774
775 /* Per struct Send_pacing_data doc header: If we had perfectly fine timer precision, then we'd
776 * want a slice that is SRTT / CWND. CWND = (congestion window in bytes / max-block-size).
777 * To minimize truncation error, then, it is X / (Y / Z) = X * Z / Y, where X, Y, Z are SRTT,
778 * max-block-size, and congestion window in bytes, respectively. */
779 Fine_duration slice_ideal_period
780 = srtt * sock->max_block_size() / sock->m_snd_cong_ctl->congestion_window_bytes();
781 if (slice_ideal_period == Fine_duration::zero())
782 {
783 // Avoid division by zero and any other tomfoolery below....
784 slice_ideal_period = Fine_duration(1);
785 }
786
787 Fine_duration timer_min_period = opt(m_opts.m_st_timer_min_period);
788 if (timer_min_period == Fine_duration::zero())
789 {
790 /* They want us to pick the a nice upper bound on the timer precision ourselves.
791 *
792 * This is conservative; in my tests Windows seem to have the worst timer precision, and it is
793 * about 15 msec. @todo Perhaps choose here based on platform. It can get hairy, as there is
794 * wide variation, so it would require much experimentation; but might be worth it for
795 * performance. */
796 const Fine_duration TIMER_MIN_PERIOD_DEFAULT = milliseconds(15);
797 timer_min_period = TIMER_MIN_PERIOD_DEFAULT;
798 }
799
800 /* Per Send_pacing_data doc header, the actual slice period is slice_ideal_period, unless that
801 * is below the timer's capabilities; in which case it is timer_min_period. */
802 pacing.m_slice_period = max(slice_ideal_period, timer_min_period);
803
804 /* Finally, allow (in this time slice) a burst of as few full-sized blocks as possible while
805 * staying below the target SRTT / CWND rate: floor(m_slice_period / slice_ideal_period).
806 * Note that when slice_ideal_period >= timer_min_period, i.e. the timer precision is fine enough
807 * to handle the desired rate exactly, then this value will always equal 1.
808 *
809 * Also, convert to bytes when actually assigning the data member.
810 *
811 * @todo Consider doing some kind of rounding instead of using floor(). */
813 = static_cast<size_t>(pacing.m_slice_period * sock->max_block_size() / slice_ideal_period);
814
815 /* If I just use the above math, I notice that over time the queue size can drift becoming slowly
816 * larger and larger as more and more time slices go by. I believe it's due to our floor math
817 * above, but I have not yet fully investigated it. @todo Investigate it.
818 *
819 * However if I just increase the number of bytes allowed per slice a little bit, it makes the
820 * drift go away and probably doesn't reduce the effectiveness of the pacing much. */
821 const size_t QUEUE_SIZE_DRIFT_PREVENTION_PCT = 110; // @todo Maybe make it a configurable option.
822 pacing.m_bytes_allowed_this_slice *= QUEUE_SIZE_DRIFT_PREVENTION_PCT;
823 pacing.m_bytes_allowed_this_slice /= 100;
824
825 assert(pacing.m_bytes_allowed_this_slice >= sock->max_block_size());
826
827 FLOW_LOG_TRACE("Pacing: On [" << sock << "]: "
828 "new time "
829 "slice [epoch+" << duration_cast<microseconds>(pacing.m_slice_start.time_since_epoch()) << " "
830 "over " << duration_cast<microseconds>(pacing.m_slice_period) << "]; "
831 "ideal slice period = [SRTT " << duration_cast<microseconds>(srtt) << "] / "
832 "([cong_wnd " << sock->m_snd_cong_ctl->congestion_window_bytes() << "] / "
833 "[max-block-size " << sock->max_block_size() << "]) = "
834 "[" << duration_cast<microseconds>(slice_ideal_period) << "]; "
835 "timer_min_period = [" << duration_cast<microseconds>(timer_min_period) << "]; "
836 "bytes_allowed = max(ideal, min) / ideal * max-block-size * "
837 "[" << QUEUE_SIZE_DRIFT_PREVENTION_PCT << "%] = "
838 "[" << pacing.m_bytes_allowed_this_slice << "].");
839} // Node::sock_pacing_new_time_slice()
840
841void Node::sock_pacing_process_q(Peer_socket::Ptr sock, bool executing_after_delay)
842{
843 using boost::chrono::milliseconds;
844 using boost::chrono::round;
845 using boost::shared_ptr;
846 using boost::weak_ptr;
847 using boost::static_pointer_cast;
848 using boost::dynamic_pointer_cast;
849 using std::max;
850
851 // We are in thread W.
852
853 /* Pre-condition is that the current time is within the current time slice, and that all other
854 * invariants hold (including that the head packet, if any, is a DATA packet). So now we send as
855 * many packets as still allowed by the budget in m_bytes_allowed_this_slice. If anything remains
856 * beyond that, we schedule a timer to hit at the end of the time slice to get the rest. */
857
858 // For brevity and a bit of speed.
859 Send_pacing_data& pacing = sock->m_snd_pacing_data;
860 shared_ptr<const Data_packet> head_packet;
861
862 FLOW_LOG_TRACE("Pacing: On [" << sock << "]: processing queue; queue size [" << pacing.m_packet_q.size() << "]; "
863 "byte budget [" << sock->bytes_blocks_str(pacing.m_bytes_allowed_this_slice) << "] remaining in this "
864 "slice.");
865
866 /* Pop things from queue until we've run out of pacing byte budget for this time slice, or until
867 * there is nothing left to send. */
868 while ((!pacing.m_packet_q.empty())
870 >= (head_packet = static_pointer_cast<const Data_packet>(pacing.m_packet_q.front()))->m_data.size()))
871 {
872 // Explicit invariant: header_packet is DATA. We always send non-DATA packets as soon as they get to head of queue.
873
874 // It is a DATA packet at head of queue, and there is enough pacing budget to send it now.
875
876 // Use up the budget.
877 pacing.m_bytes_allowed_this_slice -= head_packet->m_data.size();
878
879 FLOW_LOG_TRACE("Will send [" << head_packet->m_data.size() << "] bytes of data; budget now "
880 "[" << sock->bytes_blocks_str(pacing.m_bytes_allowed_this_slice) << "]; "
881 "queue size now [" << (pacing.m_packet_q.size() - 1) << "].");
882
883 // Send it. executing_after_delay <=> packet being sent was delayed by pacing as opposed to sent immediately.
884 async_sock_low_lvl_packet_send(sock, std::move(head_packet), executing_after_delay);
885
886 pacing.m_packet_q.pop_front(); // After this the raw pointer in head_packet should be freed.
887
888 /* Since we've popped a packet, another packet is now at the head. We must maintain the
889 * invariant that no non-DATA packet is at the head of the queue (see struct Send_pacing_data
890 * doc header for reasoning), i.e. we should send any such packets immediately. Do so until
891 * we run out or encounter a DATA packet. */
892
893 // Subtlety: Using dynamic_pointer_cast<> instead of typeid() to check packet type to avoid "side effect" warning.
894 Low_lvl_packet::Const_ptr head_packet_base;
895 while ((!pacing.m_packet_q.empty())
896 && (!(dynamic_pointer_cast<const Data_packet>(head_packet_base = pacing.m_packet_q.front()))))
897 {
898 FLOW_LOG_TRACE("Pacing: On [" << sock << "]: due to packet type, sending immediately since at head of queue; "
899 "queue size now [" << (pacing.m_packet_q.size() - 1) << "].");
900
901 // See above cmnt about last arg.
902 async_sock_low_lvl_packet_send(sock, std::move(head_packet_base), executing_after_delay);
903
904 pacing.m_packet_q.pop_front(); // After this the raw pointer in head_packet should be freed.
905
906 // Note that, as discussed in struct Send_pacing_data doc header, a non-DATA packet is worth 0 budget.
907 }
908 } // while ((m_packet_q not empty) && (more m_bytes_allowed_this_slice budget available))
909
910 if (pacing.m_packet_q.empty())
911 {
912 FLOW_LOG_TRACE("Pacing: Queue emptied.");
913
914 // Successfully sent off entire queue. Pacing done for now -- until the next sock_pacing_new_packet_ready().
915 return;
916 }
917 // else
918
919 /* No more budget left in this pacing time slice, but there is at least one packet in the queue
920 * still. Per algorithm, a fresh slice should begin ASAP (since packets must be sent ASAP but no
921 * sooner). Therefore schedule timer for the end of the time slice, which is just
922 * m_slice_start + m_slice_period. */
923 const Fine_time_pt slice_end = pacing.m_slice_start + pacing.m_slice_period;
924
925 pacing.m_slice_timer.expires_at(slice_end);
926 // (Even if slice_end is slightly in the past, that'll just mean it'll fire ASAP.)
927
928 FLOW_LOG_TRACE("Pacing: Exhausted budget; queue size [" << pacing.m_packet_q.size() << "]; "
929 "scheduling next processing at end of time slice "
930 "in [" << round<milliseconds>(slice_end - Fine_clock::now()) << "].");
931
932 // When triggered or canceled, call this->sock_pacing_time_slice_end(sock, <error code>).
933 pacing.m_slice_timer.async_wait([this, sock_observer = weak_ptr<Peer_socket>(sock)]
934 (const Error_code& sys_err_code)
935 {
936 auto sock = sock_observer.lock();
937 if (sock)
938 {
939 sock_pacing_time_slice_end(sock, sys_err_code);
940 }
941 // else { Possible or not, allow for this possibility for maintainability. }
942 });
943
944 // More work to do later, but for now we've been successful.
945
946 /* That's it. The only reason the timer would get canceled is if we go into CLOSED state, in
947 * which case it can just do nothing. */
948} // Node::sock_pacing_process_q()
949
950void Node::sock_pacing_time_slice_end(Peer_socket::Ptr sock, [[maybe_unused]] const Error_code& sys_err_code)
951{
952 // We are in thread W.
953
954 // As always, no need to lock m_state, unless we plan to alter it, since no other thread can alter it.
955 if (sock->m_state == Peer_socket::State::S_CLOSED)
956 {
957 /* Once state is CLOSED, the socket is dead -- all packets have been sent. A corollary of
958 * that is that if we must send an RST, then always send it immediately (even if it has to
959 * jump ahead of other packets waiting on queue). */
960 return;
961 }
962
963 // We only cancel the timer when we close socket, and we've already returned if we'd done that.
964 assert(sys_err_code != boost::asio::error::operation_aborted);
965
966 // There could be other errors, but as in other timer handlers, we've no idea what that means, so pretend all is OK.
967
968 // Pre-condition: if we set up the timer, then the queue had to have been non-empty at the time.
969 assert(!sock->m_snd_pacing_data.m_packet_q.empty());
970
971 FLOW_LOG_TRACE("Pacing: On [" << sock << "]: slice end timer fired; creating new slice and processing queue.");
972
973 /* Timer fired, so right now we are somewhere after the end of the current time slice. Therefore,
974 * begin the next time slice. */
975 sock_pacing_new_time_slice(sock, Fine_clock::now());
976
977 /* pacing.{m_slice_start, m_slice_period, m_bytes_allowed_this_slice} have all been recomputed.
978 * By definition "now" is in [m_slice_start, m_slice_start + m_slice_period), since m_slice_start
979 * IS now (plus some epsilon). */
980
981 /* We are in the current time slice. We just created the new time slice above, so
982 * m_bytes_allowed_this_slice >= max_block_size(), and certainly the following statement will
983 * immediately send at least one packet. */
984
985 sock_pacing_process_q(sock, true); // Process as many packets as the new budget allows.
986} // Node::sock_pacing_time_slice_end()
987
989{
990 // We are in thread W.
991
992 // Fill out common fields and asynchronously send packet.
993 auto rst = Low_lvl_packet::create_uninit_packet_base<Rst_packet>(get_logger());
994 async_sock_low_lvl_packet_send_paced(sock, std::move(rst));
995} // Node::async_sock_low_lvl_rst_send()
996
998{
999 using boost::asio::buffer;
1000
1001 // We are in thread W.
1002
1003 // Fill out fields.
1004 auto rst = Low_lvl_packet::create_uninit_packet_base<Rst_packet>(get_logger());
1005 rst->m_packed.m_src_port = sock->m_local_port;
1006 rst->m_packed.m_dst_port = sock->remote_endpoint().m_flow_port;
1007 rst->m_opt_rexmit_on = false; // Unused in RST packets, so set it to something.
1008
1009 // Serialize to a buffer sequence (basically sequence of pointers/lengths referring to existing memory areas).
1011 const size_t size = rst->serialize_to_raw_data_and_log(&raw_bufs); // Logs TRACE/DATA.
1012
1013 // This special-case sending path should report stats similarly to the main path elsewhere.
1014 const auto& rst_type_id = typeid(Rst_packet); // ...a/k/a typeid(*rst).
1015 sock->m_snd_stats.low_lvl_packet_xfer_requested(rst_type_id);
1016 sock->m_snd_stats.low_lvl_packet_xfer_called(rst_type_id, false, size);
1017
1018 // Same check as when using async_send_to(). @todo Code reuse?
1019 const size_t limit = opt(m_opts.m_dyn_low_lvl_max_packet_size);
1020 if (size > limit)
1021 {
1022 // Bad and rare enough for a warning.
1023 FLOW_LOG_WARNING("Tried to send RST but before doing so detected "
1024 "serialized size [" << size << "] exceeds limit [" << limit << "]; "
1025 "check max-block-size and low-lvl-max-packet-size options! Serialized packet: "
1026 "[\n" << rst->m_concise_ostream_manip << "].");
1027
1028 sock->m_snd_stats.low_lvl_packet_xfer_completed(rst_type_id, size, 0); // No send occurred.
1029 }
1030 else
1031 {
1032 // Synchronously send to remote UDP. If non-blocking mode and not sendable, this will return error.
1033 Error_code sys_err_code;
1034 const size_t size_sent = m_low_lvl_sock.send_to(raw_bufs,
1035 sock->remote_endpoint().m_udp_endpoint, 0, sys_err_code);
1036 if (sys_err_code)
1037 {
1039 sock->m_snd_stats.low_lvl_packet_xfer_completed(rst_type_id);
1040 }
1041 else
1042 {
1043 sock->m_snd_stats.low_lvl_packet_xfer_completed(rst_type_id, size, size_sent);
1044 }
1045 }
1046} // Node::sync_sock_low_lvl_rst_send()
1047
1049 m_slice_period(0), // Comment says undefined is zero() so don't leave it uninitialized. @todo Is it really needed?
1050 m_bytes_allowed_this_slice(0),
1051 m_slice_timer(*task_engine)
1052{
1053 // Nothing.
1054}
1055
1056} // namespace flow::net_flow
const Component & get_log_component() const
Returns reference to the stored Component object, particularly as many FLOW_LOG_*() macros expect.
Definition: log.cpp:222
Logger * get_logger() const
Returns the stored Logger pointer, particularly as many FLOW_LOG_*() macros expect.
Definition: log.cpp:217
void low_lvl_packet_sent(Peer_socket::Ptr sock, Low_lvl_packet::Const_ptr packet, size_t bytes_expected_transferred, const Error_code &sys_err_code, size_t bytes_transferred)
Completion handler for async_low_lvl_packet_send_impl(); called when the packet is either successfull...
Definition: low_lvl_io.cpp:491
void sock_pacing_new_packet_ready(Peer_socket::Ptr sock, Low_lvl_packet::Ptr packet)
async_sock_low_lvl_packet_send_paced() pacing helper: Handles a DATA or ACK packet that was just pass...
Definition: low_lvl_io.cpp:642
void async_no_sock_low_lvl_packet_send(const util::Udp_endpoint &low_lvl_remote_endpoint, Low_lvl_packet::Const_ptr packet)
async_low_lvl_packet_send_impl() wrapper to call when packet is to be sent to the remote side of the ...
Definition: low_lvl_io.cpp:307
void async_sock_low_lvl_packet_send(Peer_socket::Ptr sock, Low_lvl_packet::Const_ptr &&packet, bool delayed_by_pacing)
async_low_lvl_packet_send_impl() wrapper to call when packet is to be sent to the remote side of the ...
Definition: low_lvl_io.cpp:301
void perform_accumulated_on_recv_tasks()
Performs all tasks to be performed at the end of low_lvl_recv_and_handle() or async part of async_wai...
Definition: node.cpp:375
void handle_incoming(util::Blob *packet_data, const util::Udp_endpoint &low_lvl_remote_endpoint)
Handles a just-received, not-yet-deserialized low-level packet.
Definition: node.cpp:426
void sync_sock_low_lvl_rst_send(Peer_socket::Ptr sock)
Sends an RST to the other side of the given socket, synchronously.
Definition: low_lvl_io.cpp:997
void mark_data_packet_sent(Peer_socket::Ptr sock, const Sequence_number &seq_num)
Performs important book-keeping based on the event "DATA packet was sent to destination....
Definition: low_lvl_io.cpp:416
void sock_pacing_time_slice_end(Peer_socket::Ptr sock, const Error_code &sys_err_code)
async_sock_low_lvl_packet_send_paced() pacing helper: If sock_pacing_process_q() ran out of the last ...
Definition: low_lvl_io.cpp:950
boost::shared_ptr< Net_env_simulator > m_net_env_sim
The object used to simulate stuff like packet loss and latency via local means directly in the code.
Definition: node.hpp:3671
void async_wait_latency_then_handle_incoming(const Fine_duration &latency, util::Blob *packet_data, const util::Udp_endpoint &low_lvl_remote_endpoint)
Sets up handle_incoming(packet_data, low_lvl_remote_endpoint) to be called asynchronously after a spe...
Definition: low_lvl_io.cpp:249
Opt_type opt(const Opt_type &opt_val_ref) const
Obtain a copy of the value of a given option in a thread-safe manner.
Definition: node.hpp:4138
void sock_pacing_process_q(Peer_socket::Ptr sock, bool executing_after_delay)
async_sock_low_lvl_packet_send_paced() pacing helper: Given that we are currently in the pacing time ...
Definition: low_lvl_io.cpp:841
void async_no_sock_low_lvl_rst_send(Low_lvl_packet::Const_ptr causing_packet, const util::Udp_endpoint &low_lvl_remote_endpoint)
Sends an RST to the given UDP endpoint in response to the given incoming low-level packet that came f...
Definition: low_lvl_io.cpp:580
unsigned int handle_incoming_with_simulation(util::Blob *packet_data, const util::Udp_endpoint &low_lvl_remote_endpoint, bool is_sim_duplicate_packet=false)
Helper for low_lvl_recv_and_handle() that calls handle_incoming() on the not-yet-deserialized low-lev...
Definition: low_lvl_io.cpp:181
void low_lvl_recv_and_handle(Error_code sys_err_code)
Handles the pre-condition that m_low_lvl_sock has a UDP packet available for reading,...
Definition: low_lvl_io.cpp:40
Udp_socket m_low_lvl_sock
The UDP socket used to receive low-level packets (to assemble into application layer data) and send t...
Definition: node.hpp:3711
void async_low_lvl_packet_send_impl(const util::Udp_endpoint &low_lvl_remote_endpoint, Low_lvl_packet::Const_ptr packet, bool delayed_by_pacing, Peer_socket::Ptr sock)
Takes given low-level packet structure, serializes it, and initiates asynchronous send of these data ...
Definition: low_lvl_io.cpp:315
util::Blob m_packet_data
Stores incoming raw packet data; re-used repeatedly for possible performance gains.
Definition: node.hpp:3732
void async_sock_low_lvl_rst_send(Peer_socket::Ptr sock)
Sends an RST to the other side of the given socket asynchronously when possible.
Definition: low_lvl_io.cpp:988
Node_options m_opts
This Node's global set of options.
Definition: node.hpp:3662
void async_low_lvl_recv()
Registers so that during the current or next m_task_engine.run(), the latter will wait for a receivab...
Definition: low_lvl_io.cpp:31
void async_sock_low_lvl_packet_send_paced(const Peer_socket::Ptr &sock, Low_lvl_packet::Ptr &&packet)
Begins the process of asynchronously sending the given low-level packet to the remote Node specified ...
Definition: low_lvl_io.cpp:599
util::Task_engine m_task_engine
The main loop engine, functioning in the single-threaded-but-asynchronous callback-based "reactor" st...
Definition: node.hpp:3697
void sock_pacing_new_time_slice(Peer_socket::Ptr sock, const Fine_time_pt &now)
async_sock_low_lvl_packet_send_paced() pacing helper: Resets the socket's Send_pacing_data structure ...
Definition: low_lvl_io.cpp:757
@ S_CLOSED
Neither future reads nor writes are possible, AND Node has disowned the Peer_socket.
Sequence_number::seq_num_t order_num_t
Short-hand for order number type. 0 is reserved. Caution: Keep in sync with Drop_timer::packet_id_t.
Sent_pkt_by_sent_when_map::iterator Sent_pkt_ordered_by_when_iter
Short-hand for m_snd_flying_pkts_by_sent_when iterator type.
An internal net_flow sequence number identifying a piece of data.
Definition: seq_num.hpp:126
boost::shared_ptr< Peer_socket > Ptr
Short-hand for ref-counted pointer to mutable values of type Target_type::element_type (a-la T*).
Const_target_ptr Const_ptr
Short-hand for ref-counted pointer to immutable values of type Target_type::element_type (a-la T cons...
#define FLOW_ERROR_SYS_ERROR_LOG_WARNING()
Logs a warning about the (often errno-based or from a library) error code in sys_err_code.
Definition: error.hpp:269
#define FLOW_LOG_DATA(ARG_stream_fragment)
Logs a DATA message into flow::log::Logger *get_logger() with flow::log::Component get_log_component(...
Definition: log.hpp:242
#define FLOW_LOG_WARNING(ARG_stream_fragment)
Logs a WARNING message into flow::log::Logger *get_logger() with flow::log::Component get_log_compone...
Definition: log.hpp:152
#define FLOW_LOG_TRACE_WITHOUT_CHECKING(ARG_stream_fragment)
Logs a TRACE message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
Definition: log.hpp:354
#define FLOW_LOG_DATA_WITHOUT_CHECKING(ARG_stream_fragment)
Logs a DATA message into flow::log::Logger *get_logger() with flow::log::Component get_log_component(...
Definition: log.hpp:372
#define FLOW_LOG_TRACE(ARG_stream_fragment)
Logs a TRACE message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
Definition: log.hpp:227
@ S_DATA
Message satisfies Sev::S_TRACE description AND contains variable-length structure (like packet,...
@ S_TRACE
Message indicates any condition that may occur with great frequency (thus verbose if logged).
Flow module containing the API and implementation of the Flow network protocol, a TCP-inspired stream...
Definition: node.cpp:25
Scheduled_task_handle schedule_task_from_now(log::Logger *logger_ptr, const Fine_duration &from_now, bool single_threaded, Task_engine *task_engine, Scheduled_task_handler &&task_body_moved)
Schedule the given function to execute in a certain amount of time: A handy wrapper around Timer (asi...
Definition: sched_task.hpp:34
boost::asio::io_context Task_engine
Short-hand for boost.asio event service, the central class of boost.asio.
Definition: util_fwd.hpp:135
Blob_with_log_context<> Blob
A concrete Blob_with_log_context that compile-time-disables Basic_blob::share() and the sharing API d...
Definition: blob_fwd.hpp:60
boost::asio::ip::udp::endpoint Udp_endpoint
Short-hand for the UDP endpoint (IP/port) type.
Definition: util_fwd.hpp:208
boost::system::error_code Error_code
Short-hand for a boost.system error code (which basically encapsulates an integer/enum error code and...
Definition: common.hpp:508
Fine_clock::duration Fine_duration
A high-res time duration as computed from two Fine_time_pts.
Definition: common.hpp:416
Fine_clock::time_point Fine_time_pt
A high-res time point as returned by Fine_clock::now() and suitable for precise time math in general.
Definition: common.hpp:413
Internal net_flow struct that encapsulates the Flow-protocol low-level ACK packet.
Internal net_flow struct that encapsulates the Flow-protocol low-level DATA packet.
std::vector< Const_buffer > Const_buffer_sequence
Short-hand for sequence of immutable buffers; i.e., a sequence of 1 or more scattered areas in memory...
size_t m_dyn_low_lvl_max_packet_size
Any incoming low-level (UDP) packet will be truncated to this size.
Definition: options.hpp:553
unsigned int m_dyn_max_packets_per_main_loop_iteration
The UDP net-stack may deliver 2 or more datagrams to the Flow Node at the same time.
Definition: options.hpp:545
Fine_duration m_st_timer_min_period
A time period such that the boost.asio timer implementation for this platform is able to accurately a...
Definition: options.hpp:534
Metadata (and data, if retransmission is on) for a packet that has been sent one (if retransmission i...
std::vector< Sent_when > m_sent_when
Time stamps, order numbers, and other info at the times when the different attempts (including origin...
Internal net_flow struct that encapsulates the Flow-protocol low-level RST packet.
The current outgoing packet pacing state, including queue of low-level packets to be sent,...
Definition: low_lvl_io.hpp:178
util::Timer m_slice_timer
When running, m_packet_q is non-empty, m_bytes_allowed_this_slice < data size of m_packet_q....
Definition: low_lvl_io.hpp:232
size_t m_bytes_allowed_this_slice
This many bytes worth of DATA packets may still be sent, at this time, within the time slice defined ...
Definition: low_lvl_io.hpp:212
Send_pacing_data(util::Task_engine *task_engine)
Initializes data to initial state (no active time slice).
Packet_q m_packet_q
Queue of low-level packets to be sent to the remote endpoint, in order in which they are to be sent,...
Definition: low_lvl_io.hpp:221
Fine_duration m_slice_period
The length of the current pacing time slice period; this depends on congestion window and SRTT on the...
Definition: low_lvl_io.hpp:199
Fine_time_pt m_slice_start
The time point at which the last pacing time slice began; or epoch if no packets sent so far (i....
Definition: low_lvl_io.hpp:192