Flow 1.0.2
Flow project: Full implementation reference.
low_lvl_io.cpp
Go to the documentation of this file.
1/* Flow
2 * Copyright 2023 Akamai Technologies, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the
5 * "License"); you may not use this file except in
6 * compliance with the License. You may obtain a copy
7 * of the License at
8 *
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in
12 * writing, software distributed under the License is
13 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
14 * CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing
16 * permissions and limitations under the License. */
17
18/// @file
24#include <utility>
25
26namespace flow::net_flow
27{
28
29// Implementations.
30
32{
33 using boost::asio::null_buffers;
34
35 // We are in thread W.
36
37 /* null_buffers() => don't actually read into any buffer. Invoked handler will read.
38 * Invoke this->low_lvl_recv_and_handle(<error code>) when data/error ready. */
39 m_low_lvl_sock.async_receive(null_buffers(),
40 [this](const Error_code& sys_err_code, size_t)
41 {
42 low_lvl_recv_and_handle(sys_err_code);
43 });
44}
45
47{
48 using util::Blob;
49 using boost::asio::buffer;
50
51 // We are in thread W.
52
53 // Number of packets received and thus handled by handle_incoming(). Useful at least for a log message at the end.
54 unsigned int handled_packet_count = 0;
55 /* Number of packets received. (The number handled by handle_incoming() may be lower or higher
56 * if simulator is in use. */
57 unsigned int recvd_packet_count = 0;
58
59 /* The limit on the # of received packets to handle in this handler. 0 means unlimited. For
60 * reasoning as to why we'd possibly want to limit it, see doc header for this option. */
61 unsigned int recvd_packet_count_limit = opt(m_opts.m_dyn_max_packets_per_main_loop_iteration);
62
63 if (!sys_err_code)
64 {
65 /* boost.asio has reason to believe there's at least one UDP datagram ready to read from the
66 * UDP net-stack's buffer. We'll read that one. We'll also keep reading them until UDP net-stack
67 * says it "would block" (i.e., no more datagrams have been received). Why not simply call
68 * async_low_lvl_recv() and let boost.asio deal with it and call us again? Consider a user
69 * waiting for Readable for a given Peer_socket. If did that, we will
70 * read one DATA packet, load it on the Receive buffer, and signal the waiting user. Thus they
71 * may immediately read from the Receive buffer and move onto the rest of their event loop
72 * iteration. However, there may be 50 more packets we're then going to immediately put on the
73 * Receive buffer in thread W. It would be arguably more efficient to read all 51 and THEN
74 * signal the user. Therefore we use Node::m_sock_events to accumulate active events
75 * and then read all available datagrams; only after that do we then signal Event_sets for
76 * accumulated events.
77 *
78 * @todo One can conceive of some pathological case where due to extreme traffic we'd keep
79 * reading more and more datagrams and not get out of the loop for a long time. Perhaps add a
80 * knob for the maximum number of iterations to go through before ending the loop and
81 * signaling about the accumulated events. */
82
83 // Note: m_packet_data is a member that is reused repeatedly for some performance savings.
84 auto& packet_data = m_packet_data;
85 /* Don't let this dynamic option's value's change to affect this main loop iteration's value.
86 * This way packet_data.capacity() will essentially remain constant which is easier to reason about than
87 * the alternative. Soon enough this method will exit, and any new value will take effect next time. */
88 const size_t max_packet_size = opt(m_opts.m_dyn_low_lvl_max_packet_size);
89
90 util::Udp_endpoint low_lvl_remote_endpoint;
91
92 // Read until error or "would block" (no data available).
93 size_t packet_size = 0;
94 do
95 {
96 /* Make buffer big enough for any packet we'd accept as valid.
97 * resize(MAX) will only work if packet_data.zero() (the case in first iteration) or if packet_data.capacity()
98 * >= MAX. Hence, if a loop iteration below has left packet_data with unsatisfactory capacity below MAX,
99 * then ensure zero() is true before the resize(MAX) call. In other words, reallocate packet_data but only
100 * if necessary. */
101 if ((!packet_data.zero()) && (packet_data.capacity() < max_packet_size))
102 {
103 packet_data.make_zero(); // This must be done explicitly: acknowledges we force reallocation in next line.
104 }
105 packet_data.resize(max_packet_size);
106 // ^-- Could use UDP available(), but its meaning is slightly ambiguous, + I don't care for the tiny race it adds.
107
108 // Read packet from UDP net-stack internal buffer into "packet_data."
109 packet_size = m_low_lvl_sock.receive_from(buffer(packet_data.data(), packet_data.size()),
110 low_lvl_remote_endpoint, 0, sys_err_code);
111 if (!sys_err_code)
112 {
113 assert(packet_size <= packet_data.size());
114 packet_data.resize(packet_size); // boost.asio NEVER resizes vector to the # of bytes it read.
115
116 FLOW_LOG_TRACE("Received low-level packet at [UDP " << m_low_lvl_sock.local_endpoint() << "] from "
117 "[UDP " << low_lvl_remote_endpoint << "]:");
118
119 // Count it against the limit.
120 ++recvd_packet_count;
121
122 handled_packet_count += handle_incoming_with_simulation(&packet_data, low_lvl_remote_endpoint);
123 /* packet_data is still valid and owned by us but may have any structure at all.
124 * As of this writing, in practice -- assuming no simulated delay -- packet_data would be untouched
125 * (and therefore could be reused sans reallocation for the next packet read in above, if any) for all
126 * packet types except DATA, which is moved elsewhere via std::move() semantics and would require
127 * reallocation above. */
128 }
129 else if (sys_err_code != boost::asio::error::would_block) // "Would block" is normal (no data available).
130 {
131 /* Weird it failed, since it's UDP. Oh well.
132 * Note: One might be tempted to do something dire like close some sockets. Note that in the current setup
133 * we wouldn't know which socket(s) to close, since several live on the same low-level (UDP) port. Even if
134 * we did, the failure to read could mean many things, so individual err_codes would need to be examined
135 * with various actions taken depending on what happened. (@todo Consider it.) Without that, just ignore it
136 * as we would any lost packet. Obviously, we deal with UDP's unreliability already.
137 *
138 * What about the would_block and try_again (EAGAIN/EWOULDBLOCK in POSIX world and similar for Windows)
139 * error codes which often demand special handling? Well, firstly, we are likely mostly not hitting that
140 * situation, as async_receive() (the call for which this is the handler function) specifically attempts
141 * to avoid those error codes by only executing the handler once the socket is Readable.
142 * Still, it's probably not impossible: we used null_buffers as of this writing, which means the actual
143 * receiving is done in this handler, not by boost.asio (and even if it was, we would still try more receives
144 * until no more are available). Between detection of Readable by boost.asio and the actual receive call,
145 * the situation may have changed. Well, fine. What's there to do? would_block/try_again means
146 * not Readable right now... try later. Great! We do just that in any case below by executing
147 * async_receive() again (inside the call at the end of this method). So this read failed due to some
148 * odd change in conditions; best we can do is wait for Readable and try again later. And we do. */
150 assert(packet_size == 0); // Should be the case on error.
151 }
152 else
153 {
154 assert(packet_size == 0);
155 }
156 }
157 // while (still getting data && (haven't exceeded per-handler limit || there is no limit))
158 while ((packet_size != 0)
159 && ((recvd_packet_count_limit == 0) || (recvd_packet_count < recvd_packet_count_limit)));
160
161 /* Have read all available low-level data. This may have accumulated certain tasks, like
162 * combining pending individual acknowledgments into larger ACK packet(s), to be performed at
163 * the end of the handler. Do so now. */
165
166 // Helpful in understanding the relationship between handler invocations and "delta" Event_set checks.
167 FLOW_LOG_TRACE("Handled a total of [" << handled_packet_count << "] incoming packets "
168 "out of [" << recvd_packet_count << "] received (limit [" << recvd_packet_count_limit << "]) "
169 "in this boost.asio handler.");
170 } // if (!err_code)
171 else
172 {
173 // boost.asio called us with an error. Strange, since this is UDP, but treat it same as a read error above.
175 }
176
177 /* If socket errors are detected above, since it's UDP and connectionless, there is hope we're
178 * still OK, so still set up wait at the end of this callback. Do not stop event loop.
179 *
180 * @todo This isn't necessarily sound. Could investigate the possible errors and act
181 * accordingly. */
182
183 // Register ourselves for the next UDP receive.
185} // Node::low_lvl_recv_and_handle()
186
188 const util::Udp_endpoint& low_lvl_remote_endpoint,
189 bool is_sim_duplicate_packet)
190{
191 using util::Blob;
192 using boost::chrono::milliseconds; // Just for the output streaming.
193 using boost::chrono::round;
194
195 // We are in thread W.
196
197 unsigned int handled = 0; // How many times we called handle_incoming() inside this invocation.
198
199 // Basically we should call handle_incoming(), but we may first have to simulate various network conditions.
200
201 if (is_sim_duplicate_packet || (!m_net_env_sim) || (!m_net_env_sim->should_drop_received_packet()))
202 {
203 // See below.
204 const bool must_dupe
205 = (!is_sim_duplicate_packet) && m_net_env_sim && m_net_env_sim->should_duplicate_received_packet();
206
207 Blob packet_data_copy(get_logger());
208 if (must_dupe)
209 {
210 /* We will simulate duplication of the packet below. Since packet handling can be
211 * destructive (for performance reasons -- see handle_data_to_established()), we must create
212 * a copy that will not be hurt by this process. */
213 packet_data_copy = *(static_cast<const Blob*>(packet_data)); // Add const to express we require a copy, not move.
214 }
215
216 Fine_duration latency(m_net_env_sim ? m_net_env_sim->received_packet_latency() : Fine_duration::zero());
217 if (latency == Fine_duration::zero())
218 {
219 // No simulated latency; just handle the packet now (mainstream case).
220 handle_incoming(packet_data, low_lvl_remote_endpoint);
221 // *packet_data may now be decimated! Do not use it.
222 ++handled;
223 }
224 else
225 {
226 /* Pretend it was actually delivered later, as the simulator told us to do. More precisely,
227 * call handle_incoming() but later. */
228 FLOW_LOG_TRACE("SIMULATION: Delaying reception of packet by simulated latency "
229 "[" << round<milliseconds>(latency) << "].");
230 async_wait_latency_then_handle_incoming(latency, packet_data, low_lvl_remote_endpoint);
231 // *packet_data may now be decimated! Do not use it.
232 }
233
234 // Possibly simulate packet duplication.
235 if (must_dupe)
236 {
237 /* Simulator told us to pretend this packet was received twice. Note that we only model a
238 * single duplication (i.e., a duplicated packet will not be simulated to itself get
239 * duplicated). Also, we don't drop a duplicated packet. */
240 FLOW_LOG_TRACE("SIMULATION: Duplicating received packet.");
241 handled += handle_incoming_with_simulation(&packet_data_copy, // Pass the COPY, not the possibly damaged original.
242 low_lvl_remote_endpoint, true);
243 // packet_data_copy may now be decimated! Do not use it.
244 }
245 }
246 else
247 {
248 // Got packet, but pretend it was dropped on the way due to packet loss, as simulator told us to do.
249 FLOW_LOG_TRACE("SIMULATION: Dropped received packet.");
250 }
251
252 return handled;
253} // Node::handle_incoming_with_simulation()
254
256 util::Blob* packet_data,
257 const util::Udp_endpoint& low_lvl_remote_endpoint)
258{
259 using util::Blob;
261 using boost::chrono::milliseconds;
262 using boost::chrono::round;
263 using boost::shared_ptr;
264
265 // We are in thread W.
266
267 // Schedule call to handle_incoming() to occur asynchronously after the latency period passes.
268
269 /* As advertised, *packet_data loses its buffer into this new container, so that caller can immediately
270 * use it for whatever they want. Meanwhile, we asynchronously own the actual data in it now.
271 * Make a smart pointer to ensure it lives long enough for handler to execute... but likely no longer than that. */
272 shared_ptr<Blob> packet_data_moved_ptr(new Blob(std::move(*packet_data)));
273
274 // Unused if it doesn't get logged, which is a slight perf hit, but anyway this sim feature is a debug/test thing.
275 const Fine_time_pt started_at = Fine_clock::now();
276
278 [this, packet_data_moved_ptr, low_lvl_remote_endpoint, latency, started_at]
279 (bool)
280 {
281 // We are in thread W.
282
284 ("SIMULATOR: Handling low-level packet after "
285 "simulated latency [" << round<milliseconds>(latency) << "]; "
286 "actual simulated latency was "
287 "[" << round<milliseconds>(Fine_clock::now() - started_at) << "]; "
288 "from [UDP " << low_lvl_remote_endpoint << "].");
289
290 // Move (again) the actual buffer to handle_incoming()'s ownership.
291 Blob packet_data_moved_again(std::move(*packet_data_moved_ptr));
292 // *packet_data_moved_ptr is now empty and will be deleted once that smart pointer goes out of scope below.
293 handle_incoming(&packet_data_moved_again, low_lvl_remote_endpoint);
294 // packet_data_moved_again may now be decimated also! Do not use it.
295
296 /* We must do this here for similar reasons as at the end of low_lvl_recv_and_handle(). Think
297 * of the present closure as simply low_lvl_recv_and_handle() running and being able
298 * to read off the one low-level UDP packet (the argument "packet"). */
300
301 // Log a similar thing to that in low_lvl_recv_and_handle().
302 FLOW_LOG_TRACE("Handled a total of [1] incoming packets "
303 "out of a simulated [1] received in this boost.asio handler.");
304 }); // Async callback.
305} // Node::async_wait_latency_then_handle_incoming()
306
308 bool delayed_by_pacing)
309{
310 async_low_lvl_packet_send_impl(sock->remote_endpoint().m_udp_endpoint, std::move(packet), delayed_by_pacing, sock);
311}
312
315{
316 /* As of this writing we don't pace things, when no Peer_socket is involved (e.g., some RSTs) => always `false`: -|
317 * v-------------------------------------------------| */
318 async_low_lvl_packet_send_impl(low_lvl_remote_endpoint, packet, false, Peer_socket::Ptr());
319}
320
323 bool delayed_by_pacing, Peer_socket::Ptr sock)
324{
325 using boost::asio::buffer;
326
327 assert(packet);
328 const auto& packet_ref = *packet;
329 const auto& packet_type_id = typeid(packet_ref);
330
331 // We are in thread W.
332
333 Sequence_number seq_num; // See below.
334
335 /* As explained in send_worker(), if it's a DATA packet, then we must overwrite
336 * m_sent_when. That way we'll avoid the deadly RTT drift (increase) due to pacing causing a
337 * higher RTT, then using that RTT to spread out packets more, which causes higher RTT again,
338 * etc. (If pacing is disabled, then the effect of doing this here rather than in send_worker()
339 * will not be dramatic.)
340 *
341 * Note that a number of important structures are based on m_sent_when; because m_sent_when
342 * ordering determines how things are ordered in m_snd_flying_pkts_by_sent_when and friends and thus the
343 * operation of the Drop Timer. So whenever we do finalize m_sent_when, we should handle those
344 * things too. */
345 if (packet_type_id == typeid(Data_packet))
346 {
347 assert(sock);
348 /* OK, so this is where I update m_sent_when and handle everything related to that
349 * (updating the scoreboard, Drop Timer, etc.). async_send_to() will indeed invoke UDP sendto()
350 * synchronously; that call occurs just below. */
351
352 mark_data_packet_sent(sock, static_cast<const Data_packet&>(packet_ref).m_seq_num);
353 } // if (packet type is DATA)
354 // @todo See the @todo in Node::async_low_lvl_ack_send() regarding correcting ACK delay value due to pacing.
355
356 /* Serialize to raw data. Important subtleties:
357 *
358 * This does *not* create a copied buffer. It generates a Const_buffer_sequence,
359 * which is a sequence container (like vector) containing a series of buffer
360 * start pointers and associated lengths. So it is a "scattered" buffer already in memory, namely within
361 * the Low_lvl_packet `packet`; and the boost.asio UDP async_send_to() we call below performs a "gather"
362 * operation when it generates the UDP datagram to send out. This should be superior to performing a
363 * copy that is at least proportional (in speed and memory use) to the size of the ultimate datagram.
364 * (In fact, an earlier version of the code indeed serialized to a newly generated single buffer here.)
365 *
366 * The only thing one must be careful of here is that the generated Const_buffer_sequence is valid only as long
367 * as the underlying areas in memory continue to exist; otherwise it's just a bunch of pointers into invalid
368 * memory. The underlying memory, as documented in Low_lvl_packet::serialize_to_raw_data() doc header,
369 * is valid as long as the Low_lvl_packet on which the method is called exists. For this reason, we pass
370 * `packet` (which is a shared_ptr<>) to the completion handler. Thus it will continue to exist until
371 * after the async_send_to() attempt itself. (If we hadn't dobe this, there's a large danger that
372 * `packet`'s ref-count would drop to zero at method exit just below, and hence async_send_to() asynchronously
373 * would crash a little later.)
374 *
375 * Update: Through empirical evidence, I found that, at least with Boost 1.63 and macOS, UDP async_send_to()
376 * below will silently truncate to the first 64 elements of raw_bufs. I could find no evidence of this being
377 * a common OS limitation (I've found other limits like 1024 in Linux for writev()), though I didn't dig very hard.
378 * The silent truncation was quite alarming and something to look into if only for educational purposes.
379 * In any case, we must keep raw_bufs.size() fairly low -- probably this is good for performance as well.
380 *
381 * Update: I have since looked into this in another project. This limit, 64, continues in Boost 1.75 and is actually
382 * at least imposed by boost.asio code itself, though it might just be using that value as a lowest-common-denominator
383 * technique. It is my impression that they do this silently on account of thinking only of the TCP context,
384 * where a send/write op call is free to write as few bytes as it wants, and one must deal with it in any
385 * case (until would-block). The silent dropping behavior in UDP, however, is a disaster and is really a Boost bug
386 * (@todo file it against Boost if needed). In any case it is what it is, and until it goes away, we must not
387 * exceed it. */
389 const size_t bytes_to_send = packet->serialize_to_raw_data_and_log(&raw_bufs);
390 assert(bytes_to_send != 0);
391
392 // Count an actual UDP stack send() call.
393 sock->m_snd_stats.low_lvl_packet_xfer_called(packet_type_id, delayed_by_pacing, bytes_to_send);
394
395 const size_t limit = opt(m_opts.m_dyn_low_lvl_max_packet_size);
396 if (bytes_to_send > limit)
397 {
398 // Bad and rare enough for a warning.
399 FLOW_LOG_WARNING("Tried to send low-level packet but before doing so detected "
400 "serialized size [" << bytes_to_send << "] exceeds limit [" << limit << "]; "
401 "check max-block-size and low-lvl-max-packet-size options! Serialized packet: "
402 "[\n" << packet->m_concise_ostream_manip << "].");
403 // However, full packets should not be logged unless DATA log level allowed, period.
404 FLOW_LOG_DATA("Detailed serialized packet details from preceding warning: "
405 "[\n" << packet->m_verbose_ostream_manip << "].");
406
407 // Short-circuit this, since no send occurred.
408 sock->m_snd_stats.low_lvl_packet_xfer_completed(packet_type_id, bytes_to_send, 0);
409 return;
410 }
411 // else
412
413 // Initiate asynchronous send.
414 m_low_lvl_sock.async_send_to(raw_bufs,
415 low_lvl_remote_endpoint,
416 [this, sock, packet, bytes_to_send](const Error_code& sys_err_code, size_t n_sent)
417 {
418 low_lvl_packet_sent(sock, packet, bytes_to_send, sys_err_code, n_sent);
419 });
420} // Node::async_low_lvl_packet_send_impl()
421
423{
424 using boost::chrono::microseconds;
425 using boost::chrono::round;
426
427 // We are in thread W.
428
429 Peer_socket::Sent_pkt_ordered_by_when_iter sent_pkt_it = sock->m_snd_flying_pkts_by_sent_when.find(seq_num);
430 if (sent_pkt_it == sock->m_snd_flying_pkts_by_sent_when.past_oldest())
431 {
432 // We haven't even (or we have just) sent packet, and its In-Flight data already gone? Very weird but not fatal.
433 FLOW_LOG_WARNING("Sending [DATA] packet over [" << sock << "] with "
434 "sequence number [" << seq_num << "] but cannot find corresponding Sent_packet. "
435 "Cannot deal with some of the related data structures; still sending. Bug?");
436 // @todo Reading this now, it seems more than "very weird." Possibly it should be an assert() or at least RST?
437 return;
438 }
439 // else
440
441 Peer_socket::Sent_packet& pkt = *sent_pkt_it->second;
442
443 /* Mark the given packet as being sent right now. The following structures depend on this
444 * event:
445 *
446 * - pkt.m_sent_when: Obviously needs to be marked with current time stamp (for RTT calculation
447 * later, among other things).
448 * - sock->m_snd_drop_timer: Since Drop_timer has an event for a packet becoming In-flight in the
449 * sense that it was actually sent over wire, which is exactly what's happening.
450 * So we call that.
451 * - Idle Timer: m_snd_last_data_sent_when is the m_sent_when value of the last DATA packet sent.
452 * Obviously we set that here. */
453
454 // Get current time; and record it for Idle Timeout calculation. See top of send_worker() for how it is used.
455 const Fine_time_pt& now = sock->m_snd_last_data_sent_when = Fine_clock::now();
456 const size_t cwnd = sock->m_snd_cong_ctl->congestion_window_bytes();
457 auto& last_send_attempt = pkt.m_sent_when.back();
458 const Peer_socket::order_num_t order_num = last_send_attempt.m_order_num;
459 auto const logger_ptr = get_logger();
460 if (logger_ptr && logger_ptr->should_log(log::Sev::S_TRACE, get_log_component()))
461 {
462 const Fine_time_pt prev_sent_when = pkt.m_sent_when.back().m_sent_time;
463 // REPLACE the values. Note m_order_num is just left alone.
464 last_send_attempt.m_sent_time = now;
465 last_send_attempt.m_sent_cwnd_bytes = cwnd;
466 const microseconds diff = round<microseconds>(now - prev_sent_when);
467
469 ("Sending/sent [DATA] packet over [" << sock << "] with "
470 "sequence number [" << seq_num << "] order_num [" << order_num << "]. Send timestamp changed from "
471 "[" << prev_sent_when << "] -> [" << now << "]; difference [" << diff << "].");
472 }
473 else
474 {
475 // Same but no logging.
476 last_send_attempt.m_sent_time = now;
477 last_send_attempt.m_sent_cwnd_bytes = cwnd;
478 }
479
480 /* Inform this guy as required, now that packet has actually been sent off over the wire (not merely In-flight
481 * by the Peer_socket definition, wherein it's been placed into sock->m_snd_flying_pkts*). */
482 const Drop_timer::Ptr drop_timer = sock->m_snd_drop_timer;
483 drop_timer->start_contemporaneous_events();
484 drop_timer->on_packet_in_flight(order_num);
485 drop_timer->end_contemporaneous_events();
486 /* ^-- @todo Is it possible to smoosh a bunch of mark_data_packet_sent() calls into a single
487 * start/end_contemporaneous_events() group? There are probably situations where 2 or more packets are sent
488 * at a time which means ultimately mark_data_packet_sent() is called essentially contemporaneously.
489 * However, at least as of this writing, mark_data_packet_sent() arises from a variety of situations, at least
490 * some of which are "singular" and not really contemporaneous with any other packet(s) being sent.
491 * So to isolate the cases where that is in fact true (2+ calls to us are contemporaneous) is certainly non-trivial,
492 * and the code to do it would be... highly technical and hard to maintain. So I wouldn't trip over myself to
493 * attempt this; and actually whatever book-keeping might be necessary to pull it off might itself have
494 * some performance cost. */
495} // Node::mark_data_packet_sent()
496
498 size_t bytes_expected_transferred, const Error_code& sys_err_code,
499 size_t bytes_transferred)
500{
501 using std::numeric_limits;
502
503 // We are in thread W.
504
505 const auto& packet_ref = *packet;
506 const auto& packet_type_id = typeid(packet_ref);
507
508 // Note: we don't save `packet` anywhere, so the memory will finally be freed, when we're done here.
509
510 // Log detailed info on packet but only if TRACE or DATA logging enabled.
511 auto const logger_ptr = get_logger();
512 if (logger_ptr && logger_ptr->should_log(log::Sev::S_TRACE, get_log_component()))
513 {
514 if (logger_ptr->should_log(log::Sev::S_DATA, get_log_component()))
515 {
516 FLOW_LOG_DATA_WITHOUT_CHECKING("Tried to send low-level packet: "
517 "[\n" << packet->m_verbose_ostream_manip << "].");
518 }
519 else
520 {
521 FLOW_LOG_TRACE_WITHOUT_CHECKING("Tried to send low-level packet: "
522 "[\n" << packet->m_concise_ostream_manip << "].");
523 }
524 }
525
526 if (sys_err_code)
527 {
528 /* Weird it failed, since it's UDP. Oh well. (Update: One thing to watch out for is "Message too long."!)
529 * Note: One might be tempted to do something dire like close some sockets. Several Flow connections might be
530 * using the same UDP port, so should we close all of them or just the one to which this outgoing packet applies?
531 * Even if we made some smart decision there, the failure to write could mean many things, so individual err_codes
532 * would need to be examined with various actions taken depending on what happened. (@todo Consider it.)
533 * Without that, just ignore it as we would any lost packet. Obviously, we deal with UDP's unreliability already.
534 *
535 * What about the specific error_codes 'would_block' and 'try_again' (in POSIX these probably both point to
536 * code EAGAIN which == EWOULDBLOCK; in Windows possibly just the former, but to be safe both should be handled the
537 * same way)? Currently I am ignoring this possibility. Why? Answer: we specifically use async_send() to wait
538 * for the socket to be ready for the trasmit operation before actually transmitting. This _should_ ensure
539 * that those two errors are _specifically_ avoided. However, that's probably not a 100% safe assumption.
540 * Even if the OS reports "Writable" at time T, a few microseconds later some resource might get used up
541 * (unrelated to our activities); and by the time the actual send executes, we might get would_block/try_again.
542 * Now, it's possible that since we do NOT use null_buffers in our async_send() call -- meaning we let
543 * boost.asio both wait for Writable *and* itself execute the write -- that it would hide any such
544 * corner-case EAGAIN/etc. from us and just not call this handler in that case and retry later by itself.
545 * However, I don't know if it does that. @todo Therefore it would be safer to treat those error codes,
546 * would_block and try_again, specially here. Simply, we should retry the async_send(), meaning it will wait for
547 * Writable again and execute it again later on. One might worry that this might open the possibility of
548 * more than one outstanding async_send(). If this happens, it's fine. Even if boost.asio would be so rash as
549 * to allow outstanding async_send()s to execute their handlers in a different order from the calls, it's not
550 * a problem, as out-of-order datagrams are handled just fine by the other side.
551 *
552 * Note: I've placed a more visible @todo in the class doc header. So delete that if this gets done. */
554
555 if (sock)
556 {
557 sock->m_snd_stats.low_lvl_packet_xfer_completed(packet_type_id);
558 }
559
560 return;
561 }
562 // else
563
564 if (sock)
565 {
566 sock->m_snd_stats.low_lvl_packet_xfer_completed(packet_type_id, bytes_expected_transferred, bytes_transferred);
567 }
568
569 if (bytes_transferred != bytes_expected_transferred)
570 {
571 /* If a UDP send was partial, the datagram must have been too big, which we shouldn't have allowed; worth warning.
572 * Update: Actually, at least on Mac with Boost 1.63 I've seen these outcomes:
573 * - If message is too big byte-wise, it results in truthy sys_err_code.
574 * - If too many sub-buffers in scatter/gather buffer container passed to async_send_to() were passed,
575 * they may be silently truncated (!sys_err_code but the present condition is triggered). Limit was observed to
576 * be ~64. */
577 FLOW_LOG_WARNING("Low-level packet sent, but only [" << bytes_transferred << "] of "
578 "[" << bytes_expected_transferred << "] bytes "
579 "were sent. Internal error with packet size calculations? More likely, did stack truncate?");
580 return;
581 }
582 // else
583 FLOW_LOG_TRACE("Success.");
584} // Node::low_lvl_packet_sent()
585
587 const util::Udp_endpoint& low_lvl_remote_endpoint)
588{
589 using boost::asio::buffer;
590 using boost::shared_ptr;
591
592 // We are in thread W.
593
594 // Fill out typical info.
595 auto rst_base = Low_lvl_packet::create_uninit_packet_base<Rst_packet>(get_logger());
596 // No sequence number. See m_seq_num for discussion.
597 rst_base->m_packed.m_src_port = causing_packet->m_packed.m_dst_port; // Right back atcha.
598 rst_base->m_packed.m_dst_port = causing_packet->m_packed.m_src_port;
599 rst_base->m_opt_rexmit_on = false; // Not used in RST packets, so set it to something.
600
601 async_no_sock_low_lvl_packet_send(low_lvl_remote_endpoint, rst_base);
602 // If that returned false: It's an RST, so there's no one to inform of an error anymore. Oh well.
603} // Node::async_no_sock_low_lvl_rst_send()
604
606 Low_lvl_packet::Ptr&& packet,
607 Error_code* err_code)
608{
609 // We are in thread W.
610
611 // This is the general-purpose method for sending a packet along a well-defined connection (sock).
612
613 const auto& packet_ref = *packet;
614 const auto& packet_type_id = typeid(packet_ref);
615
616 sock->m_snd_stats.low_lvl_packet_xfer_requested(packet_type_id);
617
618 // Fill out typical info.
619 packet->m_packed.m_src_port = sock->m_local_port;
620 packet->m_packed.m_dst_port = sock->remote_endpoint().m_flow_port;
621 packet->m_opt_rexmit_on = sock->rexmit_on();
622
623 /* Apply packet pacing, which tries to spread out bursts of packets to prevent loss. For much
624 * more detail, see struct Send_pacing_data comment. */
625
626 if ((!sock->opt(sock->m_opts.m_st_snd_pacing_enabled)) ||
627 (sock->m_snd_smoothed_round_trip_time == Fine_duration::zero()) ||
628 (packet_type_id == typeid(Rst_packet)))
629 {
630 /* Per struct Send_pacing_data doc header, the pacing algorithm only begins once we have our
631 * first round trip time (and thus SRTT); until then we send all packets as soon as possible.
632 * Also pacing can just be disabled; in which case send all packets ASAP.
633 *
634 * Finally, if it's an RST, just send it ASAP. This is discussed in more detail in the
635 * Send_pacing_data struct header, but basically: RST means packet is being CLOSED right now.
636 * Queueing RST on the pacing queue means it may be sent after underlying socket is CLOSED.
637 * Therefore we have to keep queue and all that machinery operating past socket being CLOSED.
638 * As a rule of thumb, CLOSED => dead socket. So just send RST right away. This means it may
639 * jump the queue ahead of DATA/ACK packets already there, but since it is an error condition
640 * causing RST, we consider that OK (those packets will not be sent). */
641 async_sock_low_lvl_packet_send(sock, std::move(packet), false); // false => not queued in pacing module.
642 return true;
643 }
644 // else pacing algorithm enabled and both can and must be used.
645
646 return sock_pacing_new_packet_ready(sock, std::move(packet), err_code);
647} // Node::async_sock_low_lvl_packet_send_paced()
648
650 Error_code* err_code)
651{
652 using boost::chrono::duration_cast;
653 using boost::chrono::microseconds;
654 using boost::static_pointer_cast;
655 using boost::dynamic_pointer_cast;
656 using boost::shared_ptr;
657
658 // We are in thread W.
659
660 const auto& packet_ref = *packet;
661 const auto& packet_type_id = typeid(packet_ref);
662
663 // For brevity and a bit of speed.
664 Send_pacing_data& pacing = sock->m_snd_pacing_data;
665
666 const bool is_data_packet = packet_type_id == typeid(Data_packet);
667 // For logging, get the first sequence number mentioned (depending on whether it's DATA or ACK).
668 Sequence_number init_seq_num;
669 shared_ptr<const Data_packet> data;
670 if (is_data_packet)
671 {
672 init_seq_num = static_pointer_cast<const Data_packet>(packet)->m_seq_num;
673 }
674 else
675 {
676 const auto& acked_packets = static_pointer_cast<const Ack_packet>(packet)->m_rcv_acked_packets;
677 if (!acked_packets.empty())
678 {
679 init_seq_num = acked_packets.front()->m_seq_num;
680 }
681 }
682
683 const bool q_was_empty = pacing.m_packet_q.empty();
684
685 /* No matter what, we can't send packet before the ones already in the queue, so push onto end
686 * of the queue. (If queue is currently empty, then that is not a special case; we may well
687 * immediately pop and send it below.) */
688 pacing.m_packet_q.push_back(packet);
689
690 FLOW_LOG_TRACE("Pacing: On [" << sock << "] packet of type [" << packet->m_type_ostream_manip << "] "
691 "is newly available for sending; pushed onto queue; queue size [" << pacing.m_packet_q.size() << "]; "
692 "initial sequence number [" << init_seq_num << "].");
693
694 if (!q_was_empty)
695 {
696 const auto data = dynamic_pointer_cast<const Data_packet>(pacing.m_packet_q.front());
697 // !data if it's not a DATA packet.
698
699 assert((!data) || (pacing.m_bytes_allowed_this_slice < data->m_data.size()));
700
701 FLOW_LOG_TRACE("Pacing: On [" << sock << "]: was already in progress; queued and done.");
702
703 /* There were already packets in the queue, so the timer is running; certainly we can't send
704 * packet ahead of those in the queue, so push on the back of the queue above. Should we send
705 * the head packet now? No; if the last sock_pacing_new_packet_ready() or
706 * sock_pacing_time_slice_end() left a non-empty queue, then the timer has been set to fire when
707 * the slice ends, and more packets can be sent. Done. */
708 return true;
709 }
710 // else if (q_was_empty)
711
712 // We have just pushed the sole packet on the queue.
713
714 if (!is_data_packet)
715 {
716 assert(packet_type_id == typeid(Ack_packet)); // Sanity check.
717 FLOW_LOG_TRACE("Pacing: On [" << sock << "]: due to packet type, sending immediately since at head of queue; "
718 "queue empty again.");
719
720 pacing.m_packet_q.clear();
721
722 /* Per discussion in struct Send_pacing_data doc header, if it's a non-DATA packet, then --
723 * other than not being allowed to be re-ordered -- it does not "count" as a paced packet. That
724 * is, it is "worth" zero bytes when compared to m_bytes_allowed_this_slice and should be sent
725 * as soon as it is at the head of the queue (which it is here, since we just placed it as the
726 * sole element on the queue). Since it doesn't count against m_bytes_allowed_this_slice, the
727 * pacing timing is irrelevant to it, and based on the "send ASAP" rule, we send it now. */
728 async_sock_low_lvl_packet_send(sock, std::move(packet), false); // false => not queued in pacing module.
729 return true;
730 }
731 // else packet is DATA packet.
732
733 const Fine_time_pt now = Fine_clock::now();
734 if ((pacing.m_slice_start == Fine_time_pt()) || (now >= (pacing.m_slice_start + pacing.m_slice_period)))
735 {
736 /* We are past the current time slice (if there is such a thing) and have a packet to send. By
737 * the algorithm in struct Send_pacing_data doc header, this means we create a new time slice with
738 * starting time = now. */
739
740 FLOW_LOG_TRACE("Pacing: On [" << sock << "]: "
741 "current time "
742 "slice [epoch+" << duration_cast<microseconds>(pacing.m_slice_start.time_since_epoch()) << " "
743 "over " << duration_cast<microseconds>(pacing.m_slice_period) << "] is over.");
744
746 /* pacing.{m_slice_start, m_slice_period, m_bytes_allowed_this_slice} have all been recomputed.
747 * By definition "now" is in [m_slice_start, m_slice_start + m_slice_period), since m_slice_start
748 * IS now (plus some epsilon). */
749 }
750 // else if ("now" is basically in time range [m_slice_start, m_slice_start + m_slice_period)) {}
751
752 /* We are in the current time slice. The one just-queued packet can be sent if there is
753 * m_bytes_allowed_this_slice budget; otherwise the timer must be started to fire at slice
754 * end, at which point we're cleared to send the packet. sock_pacing_process_q() performs this
755 * (it is generalized to work with multiple packets on the queue, but it will work fine with just
756 * one also).
757 *
758 * Note: If we just created the new time slice above, then m_bytes_allowed_this_slice >=
759 * max_block_size(), so certainly the following statement will immediately send the just-queued
760 * packet. If the time slice was in progress, then it depends. */
761
762 return sock_pacing_process_q(sock, err_code, false);
763} // Node::sock_pacing_new_packet_ready()
764
766{
767 using boost::chrono::duration_cast;
768 using boost::chrono::microseconds;
769 using boost::chrono::milliseconds;
770 using std::max;
771
772 // We are in thread W.
773
774 // For brevity and a bit of speed.
775 Send_pacing_data& pacing = sock->m_snd_pacing_data;
776 const Fine_duration& srtt = sock->m_snd_smoothed_round_trip_time;
777
778 assert(srtt != Fine_duration::zero());
779
780 // New slice starts now.
781 pacing.m_slice_start = now;
782
783 /* Per struct Send_pacing_data doc header: If we had perfectly fine timer precision, then we'd
784 * want a slice that is SRTT / CWND. CWND = (congestion window in bytes / max-block-size).
785 * To minimize truncation error, then, it is X / (Y / Z) = X * Z / Y, where X, Y, Z are SRTT,
786 * max-block-size, and congestion window in bytes, respectively. */
787 Fine_duration slice_ideal_period
788 = srtt * sock->max_block_size() / sock->m_snd_cong_ctl->congestion_window_bytes();
789 if (slice_ideal_period == Fine_duration::zero())
790 {
791 // Avoid division by zero and any other tomfoolery below....
792 slice_ideal_period = Fine_duration(1);
793 }
794
795 Fine_duration timer_min_period = opt(m_opts.m_st_timer_min_period);
796 if (timer_min_period == Fine_duration::zero())
797 {
798 /* They want us to pick the a nice upper bound on the timer precision ourselves.
799 *
800 * This is conservative; in my tests Windows seem to have the worst timer precision, and it is
801 * about 15 msec. @todo Perhaps choose here based on platform. It can get hairy, as there is
802 * wide variation, so it would require much experimentation; but might be worth it for
803 * performance. */
804 const Fine_duration TIMER_MIN_PERIOD_DEFAULT = milliseconds(15);
805 timer_min_period = TIMER_MIN_PERIOD_DEFAULT;
806 }
807
808 /* Per Send_pacing_data doc header, the actual slice period is slice_ideal_period, unless that
809 * is below the timer's capabilities; in which case it is timer_min_period. */
810 pacing.m_slice_period = max(slice_ideal_period, timer_min_period);
811
812 /* Finally, allow (in this time slice) a burst of as few full-sized blocks as possible while
813 * staying below the target SRTT / CWND rate: floor(m_slice_period / slice_ideal_period).
814 * Note that when slice_ideal_period >= timer_min_period, i.e. the timer precision is fine enough
815 * to handle the desired rate exactly, then this value will always equal 1.
816 *
817 * Also, convert to bytes when actually assigning the data member.
818 *
819 * @todo Consider doing some kind of rounding instead of using floor(). */
821 = static_cast<size_t>(pacing.m_slice_period * sock->max_block_size() / slice_ideal_period);
822
823 /* If I just use the above math, I notice that over time the queue size can drift becoming slowly
824 * larger and larger as more and more time slices go by. I believe it's due to our floor math
825 * above, but I have not yet fully investigated it. @todo Investigate it.
826 *
827 * However if I just increase the number of bytes allowed per slice a little bit, it makes the
828 * drift go away and probably doesn't reduce the effectiveness of the pacing much. */
829 const size_t QUEUE_SIZE_DRIFT_PREVENTION_PCT = 110; // @todo Maybe make it a configurable option.
830 pacing.m_bytes_allowed_this_slice *= QUEUE_SIZE_DRIFT_PREVENTION_PCT;
831 pacing.m_bytes_allowed_this_slice /= 100;
832
833 assert(pacing.m_bytes_allowed_this_slice >= sock->max_block_size());
834
835 FLOW_LOG_TRACE("Pacing: On [" << sock << "]: "
836 "new time "
837 "slice [epoch+" << duration_cast<microseconds>(pacing.m_slice_start.time_since_epoch()) << " "
838 "over " << duration_cast<microseconds>(pacing.m_slice_period) << "]; "
839 "ideal slice period = [SRTT " << duration_cast<microseconds>(srtt) << "] / "
840 "([cong_wnd " << sock->m_snd_cong_ctl->congestion_window_bytes() << "] / "
841 "[max-block-size " << sock->max_block_size() << "]) = "
842 "[" << duration_cast<microseconds>(slice_ideal_period) << "]; "
843 "timer_min_period = [" << duration_cast<microseconds>(timer_min_period) << "]; "
844 "bytes_allowed = max(ideal, min) / ideal * max-block-size * "
845 "[" << QUEUE_SIZE_DRIFT_PREVENTION_PCT << "%] = "
846 "[" << pacing.m_bytes_allowed_this_slice << "].");
847} // Node::sock_pacing_new_time_slice()
848
849bool Node::sock_pacing_process_q(Peer_socket::Ptr sock, Error_code* err_code, bool executing_after_delay)
850{
851 using boost::chrono::milliseconds;
852 using boost::chrono::round;
853 using boost::shared_ptr;
854 using boost::weak_ptr;
855 using boost::static_pointer_cast;
856 using boost::dynamic_pointer_cast;
857 using std::max;
858
859 // We are in thread W.
860
861 /* Pre-condition is that the current time is within the current time slice, and that all other
862 * invariants hold (including that the head packet, if any, is a DATA packet). So now we send as
863 * many packets as still allowed by the budget in m_bytes_allowed_this_slice. If anything remains
864 * beyond that, we schedule a timer to hit at the end of the time slice to get the rest. */
865
866 // For brevity and a bit of speed.
867 Send_pacing_data& pacing = sock->m_snd_pacing_data;
868 shared_ptr<const Data_packet> head_packet;
869
870 FLOW_LOG_TRACE("Pacing: On [" << sock << "]: processing queue; queue size [" << pacing.m_packet_q.size() << "]; "
871 "byte budget [" << sock->bytes_blocks_str(pacing.m_bytes_allowed_this_slice) << "] remaining in this "
872 "slice.");
873
874 /* Pop things from queue until we've run out of pacing byte budget for this time slice, or until
875 * there is nothing left to send. */
876 while ((!pacing.m_packet_q.empty())
878 >= (head_packet = static_pointer_cast<const Data_packet>(pacing.m_packet_q.front()))->m_data.size()))
879 {
880 // Explicit invariant: header_packet is DATA. We always send non-DATA packets as soon as they get to head of queue.
881
882 // It is a DATA packet at head of queue, and there is enough pacing budget to send it now.
883
884 // Use up the budget.
885 pacing.m_bytes_allowed_this_slice -= head_packet->m_data.size();
886
887 FLOW_LOG_TRACE("Will send [" << head_packet->m_data.size() << "] bytes of data; budget now "
888 "[" << sock->bytes_blocks_str(pacing.m_bytes_allowed_this_slice) << "]; "
889 "queue size now [" << (pacing.m_packet_q.size() - 1) << "].");
890
891 // Send it. executing_after_delay <=> packet being sent was delayed by pacing as opposed to sent immediately.
892 async_sock_low_lvl_packet_send(sock, std::move(head_packet), executing_after_delay);
893
894 pacing.m_packet_q.pop_front(); // After this the raw pointer in head_packet should be freed.
895
896 /* Since we've popped a packet, another packet is now at the head. We must maintain the
897 * invariant that no non-DATA packet is at the head of the queue (see struct Send_pacing_data
898 * doc header for reasoning), i.e. we should send any such packets immediately. Do so until
899 * we run out or encounter a DATA packet. */
900
901 // Subtlety: Using dynamic_pointer_cast<> instead of typeid() to check packet type to avoid "side effect" warning.
902 Low_lvl_packet::Const_ptr head_packet_base;
903 while ((!pacing.m_packet_q.empty())
904 && (!(dynamic_pointer_cast<const Data_packet>(head_packet_base = pacing.m_packet_q.front()))))
905 {
906 FLOW_LOG_TRACE("Pacing: On [" << sock << "]: due to packet type, sending immediately since at head of queue; "
907 "queue size now [" << (pacing.m_packet_q.size() - 1) << "].");
908
909 // See above cmnt about last arg.
910 async_sock_low_lvl_packet_send(sock, std::move(head_packet_base), executing_after_delay);
911
912 pacing.m_packet_q.pop_front(); // After this the raw pointer in head_packet should be freed.
913
914 // Note that, as discussed in struct Send_pacing_data doc header, a non-DATA packet is worth 0 budget.
915 }
916 } // while ((m_packet_q not empty) && (more m_bytes_allowed_this_slice budget available))
917
918 if (pacing.m_packet_q.empty())
919 {
920 FLOW_LOG_TRACE("Pacing: Queue emptied.");
921
922 // Successfully sent off entire queue. Pacing done for now -- until the next sock_pacing_new_packet_ready().
923 return true;
924 }
925 // else
926
927 /* No more budget left in this pacing time slice, but there is at least one packet in the queue
928 * still. Per algorithm, a fresh slice should begin ASAP (since packets must be sent ASAP but no
929 * sooner). Therefore schedule timer for the end of the time slice, which is just
930 * m_slice_start + m_slice_period. */
931 const Fine_time_pt slice_end = pacing.m_slice_start + pacing.m_slice_period;
932
933 Error_code sys_err_code;
934 pacing.m_slice_timer.expires_at(slice_end, sys_err_code);
935 // (Even if slice_end is slightly in the past, that'll just mean it'll fire ASAP.)
936
937 FLOW_LOG_TRACE("Pacing: Exhausted budget; queue size [" << pacing.m_packet_q.size() << "]; "
938 "scheduling next processing at end of time slice "
939 "in [" << round<milliseconds>(slice_end - Fine_clock::now()) << "].");
940
941 if (sys_err_code) // If that failed, it's probably the death of the socket....
942 {
943 FLOW_ERROR_SYS_ERROR_LOG_WARNING(); // Log the non-portable system error code/message.
945
946 return false;
947 }
948 // else
949
950 // When triggered or canceled, call this->sock_pacing_time_slice_end(sock, <error code>).
951 pacing.m_slice_timer.async_wait([this, sock_observer = weak_ptr<Peer_socket>(sock)]
952 (const Error_code& sys_err_code)
953 {
954 auto sock = sock_observer.lock();
955 if (sock)
956 {
957 sock_pacing_time_slice_end(sock, sys_err_code);
958 }
959 // else { Possible or not, allow for this possibility for maintainability. }
960 });
961
962 // More work to do later, but for now we've been successful.
963 return true;
964
965 /* That's it. The only reason the timer would get canceled is if we go into CLOSED state, in
966 * which case it can just do nothing. */
967} // Node::sock_pacing_process_q()
968
969void Node::sock_pacing_time_slice_end(Peer_socket::Ptr sock, [[maybe_unused]] const Error_code& sys_err_code)
970{
971 // We are in thread W.
972
973 // As always, no need to lock m_state, unless we plan to alter it, since no other thread can alter it.
974 if (sock->m_state == Peer_socket::State::S_CLOSED)
975 {
976 /* Once state is CLOSED, the socket is dead -- all packets have been sent. A corollary of
977 * that is that if we must send an RST, then always send it immediately (even if it has to
978 * jump ahead of other packets waiting on queue). */
979 return;
980 }
981
982 // We only cancel the timer when we close socket, and we've already returned if we'd done that.
983 assert(sys_err_code != boost::asio::error::operation_aborted);
984
985 // There could be other errors, but as in other timer handlers, we've no idea what that means, so pretend all is OK.
986
987 // Pre-condition: if we set up the timer, then the queue had to have been non-empty at the time.
988 assert(!sock->m_snd_pacing_data.m_packet_q.empty());
989
990 FLOW_LOG_TRACE("Pacing: On [" << sock << "]: slice end timer fired; creating new slice and processing queue.");
991
992 /* Timer fired, so right now we are somewhere after the end of the current time slice. Therefore,
993 * begin the next time slice. */
994 sock_pacing_new_time_slice(sock, Fine_clock::now());
995
996 /* pacing.{m_slice_start, m_slice_period, m_bytes_allowed_this_slice} have all been recomputed.
997 * By definition "now" is in [m_slice_start, m_slice_start + m_slice_period), since m_slice_start
998 * IS now (plus some epsilon). */
999
1000 /* We are in the current time slice. We just created the new time slice above, so
1001 * m_bytes_allowed_this_slice >= max_block_size(), and certainly the following statement will
1002 * immediately send at least one packet. */
1003
1004 Error_code err_code;
1005 if (!sock_pacing_process_q(sock, &err_code, true)) // Process as many packets as the new budget allows.
1006 {
1007 /* Error sending. Unlike in sock_pacing_process_q() or sock_pacing_new_packet_ready() --
1008 * which are called by something else that would handle the error appropriately -- we are
1009 * called by boost.asio on a timer event. Therefore we must handle the error ourselves. As is
1010 * standard procedure elsewhere in the code, in this situation we close socket. */
1011
1012 // Pre-conditions: sock is in m_socks and S_OPEN, err_code contains reason for closing.
1013 rst_and_close_connection_immediately(socket_id(sock), sock, err_code, false); // This will log err_code.
1014 /* ^-- defer_delta_check == false: for similar reason as when calling send_worker() from
1015 * send_worker_check_state(). */
1016 }
1017} // Node::sock_pacing_time_slice_end()
1018
1020 Low_lvl_packet::Ptr&& packet,
1021 bool defer_delta_check)
1022{
1023 Error_code err_code;
1024 if (!async_sock_low_lvl_packet_send_paced(sock, std::move(packet), &err_code))
1025 {
1026 close_connection_immediately(socket_id(sock), sock, err_code, defer_delta_check);
1027 return false;
1028 }
1029 // else
1030 return true;
1031}
1032
1034{
1035 // We are in thread W.
1036
1037 // Fill out common fields and asynchronously send packet.
1038 auto rst = Low_lvl_packet::create_uninit_packet_base<Rst_packet>(get_logger());
1039 Error_code dummy;
1040 async_sock_low_lvl_packet_send_paced(sock, std::move(rst), &dummy);
1041
1042 // If that returned false: It's an RST, so there's no one to inform of an error anymore. Oh well.
1043} // Node::async_sock_low_lvl_rst_send()
1044
1046{
1047 using boost::asio::buffer;
1048
1049 // We are in thread W.
1050
1051 // Fill out fields.
1052 auto rst = Low_lvl_packet::create_uninit_packet_base<Rst_packet>(get_logger());
1053 rst->m_packed.m_src_port = sock->m_local_port;
1054 rst->m_packed.m_dst_port = sock->remote_endpoint().m_flow_port;
1055 rst->m_opt_rexmit_on = false; // Unused in RST packets, so set it to something.
1056
1057 // Serialize to a buffer sequence (basically sequence of pointers/lengths referring to existing memory areas).
1059 const size_t size = rst->serialize_to_raw_data_and_log(&raw_bufs); // Logs TRACE/DATA.
1060
1061 // This special-case sending path should report stats similarly to the main path elsewhere.
1062 const auto& rst_type_id = typeid(Rst_packet); // ...a/k/a typeid(*rst).
1063 sock->m_snd_stats.low_lvl_packet_xfer_requested(rst_type_id);
1064 sock->m_snd_stats.low_lvl_packet_xfer_called(rst_type_id, false, size);
1065
1066 // Same check as when using async_send_to(). @todo Code reuse?
1067 const size_t limit = opt(m_opts.m_dyn_low_lvl_max_packet_size);
1068 if (size > limit)
1069 {
1070 // Bad and rare enough for a warning.
1071 FLOW_LOG_WARNING("Tried to send RST but before doing so detected "
1072 "serialized size [" << size << "] exceeds limit [" << limit << "]; "
1073 "check max-block-size and low-lvl-max-packet-size options! Serialized packet: "
1074 "[\n" << rst->m_concise_ostream_manip << "].");
1075
1076 sock->m_snd_stats.low_lvl_packet_xfer_completed(rst_type_id, size, 0); // No send occurred.
1077 }
1078 else
1079 {
1080 // Synchronously send to remote UDP. If non-blocking mode and not sendable, this will return error.
1081 Error_code sys_err_code;
1082 const size_t size_sent = m_low_lvl_sock.send_to(raw_bufs,
1083 sock->remote_endpoint().m_udp_endpoint, 0, sys_err_code);
1084 if (sys_err_code)
1085 {
1087 sock->m_snd_stats.low_lvl_packet_xfer_completed(rst_type_id);
1088 }
1089 else
1090 {
1091 sock->m_snd_stats.low_lvl_packet_xfer_completed(rst_type_id, size, size_sent);
1092 }
1093 }
1094} // Node::sync_sock_low_lvl_rst_send()
1095
1097 m_slice_period(0), // Comment says undefined is zero() so don't leave it uninitialized. @todo Is it really needed?
1098 m_bytes_allowed_this_slice(0),
1099 m_slice_timer(*task_engine)
1100{
1101 // Nothing.
1102}
1103
1104} // namespace flow::net_flow
const Component & get_log_component() const
Returns reference to the stored Component object, particularly as many FLOW_LOG_*() macros expect.
Definition: log.cpp:230
Logger * get_logger() const
Returns the stored Logger pointer, particularly as many FLOW_LOG_*() macros expect.
Definition: log.cpp:225
void low_lvl_packet_sent(Peer_socket::Ptr sock, Low_lvl_packet::Const_ptr packet, size_t bytes_expected_transferred, const Error_code &sys_err_code, size_t bytes_transferred)
Completion handler for async_low_lvl_packet_send_impl(); called when the packet is either successfull...
Definition: low_lvl_io.cpp:497
void async_no_sock_low_lvl_packet_send(const util::Udp_endpoint &low_lvl_remote_endpoint, Low_lvl_packet::Const_ptr packet)
async_low_lvl_packet_send_impl() wrapper to call when packet is to be sent to the remote side of the ...
Definition: low_lvl_io.cpp:313
void async_sock_low_lvl_packet_send(Peer_socket::Ptr sock, Low_lvl_packet::Const_ptr &&packet, bool delayed_by_pacing)
async_low_lvl_packet_send_impl() wrapper to call when packet is to be sent to the remote side of the ...
Definition: low_lvl_io.cpp:307
void perform_accumulated_on_recv_tasks()
Performs all tasks to be performed at the end of low_lvl_recv_and_handle() or async part of async_wai...
Definition: node.cpp:375
void handle_incoming(util::Blob *packet_data, const util::Udp_endpoint &low_lvl_remote_endpoint)
Handles a just-received, not-yet-deserialized low-level packet.
Definition: node.cpp:426
void sync_sock_low_lvl_rst_send(Peer_socket::Ptr sock)
Sends an RST to the other side of the given socket, synchronously.
void mark_data_packet_sent(Peer_socket::Ptr sock, const Sequence_number &seq_num)
Performs important book-keeping based on the event "DATA packet was sent to destination....
Definition: low_lvl_io.cpp:422
bool async_sock_low_lvl_packet_send_paced(const Peer_socket::Ptr &sock, Low_lvl_packet::Ptr &&packet, Error_code *err_code)
Begins the process of asynchronously sending the given low-level packet to the remote Node specified ...
Definition: low_lvl_io.cpp:605
void sock_pacing_time_slice_end(Peer_socket::Ptr sock, const Error_code &sys_err_code)
async_sock_low_lvl_packet_send_paced() pacing helper: If sock_pacing_process_q() ran out of the last ...
Definition: low_lvl_io.cpp:969
boost::shared_ptr< Net_env_simulator > m_net_env_sim
The object used to simulate stuff like packet loss and latency via local means directly in the code.
Definition: node.hpp:3713
void async_wait_latency_then_handle_incoming(const Fine_duration &latency, util::Blob *packet_data, const util::Udp_endpoint &low_lvl_remote_endpoint)
Sets up handle_incoming(packet_data, low_lvl_remote_endpoint) to be called asynchronously after a spe...
Definition: low_lvl_io.cpp:255
Opt_type opt(const Opt_type &opt_val_ref) const
Obtain a copy of the value of a given option in a thread-safe manner.
Definition: node.hpp:4180
bool sock_pacing_new_packet_ready(Peer_socket::Ptr sock, Low_lvl_packet::Ptr packet, Error_code *err_code)
async_sock_low_lvl_packet_send_paced() pacing helper: Handles a DATA or ACK packet that was just pass...
Definition: low_lvl_io.cpp:649
void async_no_sock_low_lvl_rst_send(Low_lvl_packet::Const_ptr causing_packet, const util::Udp_endpoint &low_lvl_remote_endpoint)
Sends an RST to the given UDP endpoint in response to the given incoming low-level packet that came f...
Definition: low_lvl_io.cpp:586
unsigned int handle_incoming_with_simulation(util::Blob *packet_data, const util::Udp_endpoint &low_lvl_remote_endpoint, bool is_sim_duplicate_packet=false)
Helper for low_lvl_recv_and_handle() that calls handle_incoming() on the not-yet-deserialized low-lev...
Definition: low_lvl_io.cpp:187
void low_lvl_recv_and_handle(Error_code sys_err_code)
Handles the pre-condition that m_low_lvl_sock has a UDP packet available for reading,...
Definition: low_lvl_io.cpp:46
static Socket_id socket_id(Peer_socket::Const_ptr sock)
Constructs the socket pair (connection ID) for the given socket.
Udp_socket m_low_lvl_sock
The UDP socket used to receive low-level packets (to assemble into application layer data) and send t...
Definition: node.hpp:3753
void async_low_lvl_packet_send_impl(const util::Udp_endpoint &low_lvl_remote_endpoint, Low_lvl_packet::Const_ptr packet, bool delayed_by_pacing, Peer_socket::Ptr sock)
Takes given low-level packet structure, serializes it, and initiates asynchronous send of these data ...
Definition: low_lvl_io.cpp:321
bool async_sock_low_lvl_packet_send_or_close_immediately(const Peer_socket::Ptr &sock, Low_lvl_packet::Ptr &&packet, bool defer_delta_check)
Similar to async_sock_low_lvl_packet_send_paced() except it also calls close_connection_immediately(s...
bool sock_pacing_process_q(Peer_socket::Ptr sock, Error_code *err_code, bool executing_after_delay)
async_sock_low_lvl_packet_send_paced() pacing helper: Given that we are currently in the pacing time ...
Definition: low_lvl_io.cpp:849
util::Blob m_packet_data
Stores incoming raw packet data; re-used repeatedly for possible performance gains.
Definition: node.hpp:3774
void async_sock_low_lvl_rst_send(Peer_socket::Ptr sock)
Sends an RST to the other side of the given socket asynchronously when possible.
Node_options m_opts
This Node's global set of options.
Definition: node.hpp:3704
void async_low_lvl_recv()
Registers so that during the current or next m_task_engine.run(), the latter will wait for a receivab...
Definition: low_lvl_io.cpp:31
void close_connection_immediately(const Socket_id &socket_id, Peer_socket::Ptr sock, const Error_code &err_code, bool defer_delta_check)
A thread W method that handles the transition of the given socket from OPEN (any sub-state) to CLOSED...
util::Task_engine m_task_engine
The main loop engine, functioning in the single-threaded-but-asynchronous callback-based "reactor" st...
Definition: node.hpp:3739
void sock_pacing_new_time_slice(Peer_socket::Ptr sock, const Fine_time_pt &now)
async_sock_low_lvl_packet_send_paced() pacing helper: Resets the socket's Send_pacing_data structure ...
Definition: low_lvl_io.cpp:765
void rst_and_close_connection_immediately(const Socket_id &socket_id, Peer_socket::Ptr sock, const Error_code &err_code, bool defer_delta_check)
Asynchronously send RST to the other side of the given socket and close_connection_immediately().
@ S_CLOSED
Neither future reads nor writes are possible, AND Node has disowned the Peer_socket.
Sequence_number::seq_num_t order_num_t
Short-hand for order number type. 0 is reserved. Caution: Keep in sync with Drop_timer::packet_id_t.
Sent_pkt_by_sent_when_map::iterator Sent_pkt_ordered_by_when_iter
Short-hand for m_snd_flying_pkts_by_sent_when iterator type.
An internal net_flow sequence number identifying a piece of data.
Definition: seq_num.hpp:126
boost::shared_ptr< Peer_socket > Ptr
Short-hand for ref-counted pointer to mutable values of type Target_type::element_type (a-la T*).
Const_target_ptr Const_ptr
Short-hand for ref-counted pointer to immutable values of type Target_type::element_type (a-la T cons...
#define FLOW_ERROR_SYS_ERROR_LOG_WARNING()
Logs a warning about the (often errno-based or from a library) error code in sys_err_code.
Definition: error.hpp:269
#define FLOW_ERROR_EMIT_ERROR(ARG_val)
Sets *err_code to ARG_val and logs a warning about the error using FLOW_LOG_WARNING().
Definition: error.hpp:202
#define FLOW_LOG_DATA(ARG_stream_fragment)
Logs a DATA message into flow::log::Logger *get_logger() with flow::log::Component get_log_component(...
Definition: log.hpp:242
#define FLOW_LOG_WARNING(ARG_stream_fragment)
Logs a WARNING message into flow::log::Logger *get_logger() with flow::log::Component get_log_compone...
Definition: log.hpp:152
#define FLOW_LOG_TRACE_WITHOUT_CHECKING(ARG_stream_fragment)
Logs a TRACE message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
Definition: log.hpp:354
#define FLOW_LOG_DATA_WITHOUT_CHECKING(ARG_stream_fragment)
Logs a DATA message into flow::log::Logger *get_logger() with flow::log::Component get_log_component(...
Definition: log.hpp:372
#define FLOW_LOG_TRACE(ARG_stream_fragment)
Logs a TRACE message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
Definition: log.hpp:227
@ S_DATA
Message satisfies Sev::S_TRACE description AND contains variable-length structure (like packet,...
@ S_TRACE
Message indicates any condition that may occur with great frequency (thus verbose if logged).
@ S_INTERNAL_ERROR_SYSTEM_ERROR_ASIO_TIMER
Internal error: System error: Something went wrong with boost.asio timer subsystem.
Flow module containing the API and implementation of the Flow network protocol, a TCP-inspired stream...
Definition: node.cpp:25
Scheduled_task_handle schedule_task_from_now(log::Logger *logger_ptr, const Fine_duration &from_now, bool single_threaded, Task_engine *task_engine, Scheduled_task_handler &&task_body_moved)
Schedule the given function to execute in a certain amount of time: A handy wrapper around Timer (asi...
Definition: sched_task.hpp:34
boost::asio::io_service Task_engine
Short-hand for boost.asio event service, the central class of boost.asio.
Definition: util_fwd.hpp:135
Blob_with_log_context<> Blob
A concrete Blob_with_log_context that compile-time-disables Basic_blob::share() and the sharing API d...
Definition: blob_fwd.hpp:60
boost::asio::ip::udp::endpoint Udp_endpoint
Short-hand for the UDP endpoint (IP/port) type.
Definition: util_fwd.hpp:208
boost::system::error_code Error_code
Short-hand for a boost.system error code (which basically encapsulates an integer/enum error code and...
Definition: common.hpp:503
Fine_clock::duration Fine_duration
A high-res time duration as computed from two Fine_time_pts.
Definition: common.hpp:411
Fine_clock::time_point Fine_time_pt
A high-res time point as returned by Fine_clock::now() and suitable for precise time math in general.
Definition: common.hpp:408
Internal net_flow struct that encapsulates the Flow-protocol low-level ACK packet.
Internal net_flow struct that encapsulates the Flow-protocol low-level DATA packet.
std::vector< Const_buffer > Const_buffer_sequence
Short-hand for sequence of immutable buffers; i.e., a sequence of 1 or more scattered areas in memory...
size_t m_dyn_low_lvl_max_packet_size
Any incoming low-level (UDP) packet will be truncated to this size.
Definition: options.hpp:553
unsigned int m_dyn_max_packets_per_main_loop_iteration
The UDP net-stack may deliver 2 or more datagrams to the Flow Node at the same time.
Definition: options.hpp:545
Fine_duration m_st_timer_min_period
A time period such that the boost.asio timer implementation for this platform is able to accurately a...
Definition: options.hpp:534
Metadata (and data, if retransmission is on) for a packet that has been sent one (if retransmission i...
std::vector< Sent_when > m_sent_when
Time stamps, order numbers, and other info at the times when the different attempts (including origin...
Internal net_flow struct that encapsulates the Flow-protocol low-level RST packet.
The current outgoing packet pacing state, including queue of low-level packets to be sent,...
Definition: low_lvl_io.hpp:178
util::Timer m_slice_timer
When running, m_packet_q is non-empty, m_bytes_allowed_this_slice < data size of m_packet_q....
Definition: low_lvl_io.hpp:232
size_t m_bytes_allowed_this_slice
This many bytes worth of DATA packets may still be sent, at this time, within the time slice defined ...
Definition: low_lvl_io.hpp:212
Send_pacing_data(util::Task_engine *task_engine)
Initializes data to initial state (no active time slice).
Packet_q m_packet_q
Queue of low-level packets to be sent to the remote endpoint, in order in which they are to be sent,...
Definition: low_lvl_io.hpp:221
Fine_duration m_slice_period
The length of the current pacing time slice period; this depends on congestion window and SRTT on the...
Definition: low_lvl_io.hpp:199
Fine_time_pt m_slice_start
The time point at which the last pacing time slice began; or epoch if no packets sent so far (i....
Definition: low_lvl_io.hpp:192