Flow-IPC 2.0.0
Flow-IPC project: Full implementation reference.
native_socket_stream_impl_snd.cpp
Go to the documentation of this file.
1/* Flow-IPC: Core
2 * Copyright 2023 Akamai Technologies, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the
5 * "License"); you may not use this file except in
6 * compliance with the License. You may obtain a copy
7 * of the License at
8 *
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in
12 * writing, software distributed under the License is
13 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
14 * CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing
16 * permissions and limitations under the License. */
17
18/// @file
21#include <boost/move/make_unique.hpp>
22
24{
25
26// Native_socket_stream::Impl implementations (::snd_*() and send-API methods only).
27
29{
30 using util::Blob_const;
31
32 if (!start_ops<Op::S_SND>(std::move(ev_wait_func)))
33 {
34 return false;
35 }
36 // else
37
38 const auto protocol_ver_to_send_if_needed = m_protocol_negotiator.local_max_proto_ver_for_sending();
39 if (protocol_ver_to_send_if_needed != Protocol_negotiator::S_VER_UNKNOWN)
40 {
42 && "Protocol_negotiator not properly marking the once-only sending-out of protocol version?");
43
44 assert((!m_snd_pending_err_code) && "We should be the first send-related transmission code possible.");
45
46 /* As discussed in m_protocol_negotiator doc header and class doc header "Protocol negotiation" section:
47 * send a special as-if-send_blob()-user-message: no Native_handle; meta-blob = sized
48 * to sizeof(protocol_ver_to_send_if_needed), containing protocol_ver_to_send_if_needed.
49 * By the way m_protocol_negotiator logged about the fact we're about to send it, so we can be pretty quiet.
50 *
51 * The mechanics here are very similar to how send_native_handle() invokes snd_sync_write_or_q_payload().
52 * Keeping comments light, except where something different applies (as of this writing that's just: the
53 * meaning of snd_sync_write_or_q_payload() return value). */
54
55 const auto fake_meta_length_raw = low_lvl_payload_blob_length_t(protocol_ver_to_send_if_needed);
56 const Blob_const payload_blob(&fake_meta_length_raw, sizeof(fake_meta_length_raw));
57
58 FLOW_LOG_TRACE("Socket stream [" << *this << "]: Want to send protocol-negotiation info. "
59 "About to send payload 1 of 1; "
60 "contains low-level blob of size [" << payload_blob.size() << "] "
61 "located @ [" << payload_blob.data() << "].");
62
63 snd_sync_write_or_q_payload({}, payload_blob, false);
64 /* m_send_pending_err_code may have become truthy; just means next send_*()/whatever will emit that error.
65 *
66 * Otherwise: Either it inline-sent it (very likely), or it got queued.
67 * Either way: no error; let's get on with queuing-or-sending real stuff like send_*() payloads.
68 * P.S. There's only 1 protocol version as of this writing, so there's no ambiguity, and we can just get on with
69 * sending stuff right away. This could change in the future. See m_protocol_negotiator doc header for more. */
70 }
71 else
72 {
73 FLOW_LOG_TRACE("Socket stream [" << *this << "]: Wanted to send protocol-negotiation info; "
74 "but we've marked it as already-sent, even though we are in start_*_ops() in PEER state. "
75 "Probably we come from a .release()d Native_socket_stream which has already done it; cool.");
76
77 }
78 // else { Corner case... we come from a .release()d guy. See Impl::reset_sync_io_setup(). Already sent it. }
79
80 return true;
81} // Native_socket_stream::Impl::start_send_native_handle_ops()
82
84{
85 return start_send_native_handle_ops(std::move(ev_wait_func));
86}
87
89{
90 // Just use this in degraded fashion.
91 return send_native_handle(Native_handle(), blob, err_code);
92}
93
95 Error_code* err_code)
96{
98 using util::Blob_const;
99 using flow::util::buffers_dump_string;
100 using boost::chrono::round;
101 using boost::chrono::milliseconds;
102
103 FLOW_ERROR_EXEC_AND_THROW_ON_ERROR(bool, send_native_handle, hndl_or_null, meta_blob, _1);
104 // ^-- Call ourselves and return if err_code is null. If got to present line, err_code is not null.
105
106 // We comment liberally, but tactically, inline; but please read the strategy in the class doc header's impl section.
107
108 if ((!op_started<Op::S_SND>("send_native_handle()")) || (!state_peer("send_native_handle()")))
109 {
110 err_code->clear();
111 return false;
112 }
113 // else
114
115 const size_t meta_size = meta_blob.size();
116 assert(((!hndl_or_null.null()) || (meta_size != 0))
117 && "Native_socket_stream::send_blob() blob must have length 1+; "
118 "Native_socket_stream::send_native_handle() must have same or non-null hndl_or_null or both.");
119
120 FLOW_LOG_TRACE("Socket stream [" << *this << "]: Will send handle [" << hndl_or_null << "] with "
121 "meta-blob of size [" << meta_blob.size() << "].");
122 if (meta_size != 0)
123 {
124 // Verbose and slow (100% skipped unless log filter passes).
125 FLOW_LOG_DATA("Socket stream [" << *this << "]: Meta-blob contents are "
126 "[\n" << buffers_dump_string(meta_blob, " ") << "].");
127 }
128
129 if (m_snd_finished)
130 {
131 /* If they called *end_sending() before, then by definition (see doc header impl discussion)
132 * any future send attempt is to be ignored with this error. Even though previously queued stuff can and should
133 * keep being sent, once that's done this clause will prevent any more from being initiated.
134 *
135 * Corner case: If those queued sends indeed exist (are ongoing) then user should have a way of knowing when it's
136 * done. That isn't the case for regular send_native_handle() calls which are silently queued up as-needed,
137 * which is why I mention it here. So that's why in *end_sending() there's a way for
138 * user to be informed (via sync callback) when everything has been sent through, or if an error stops it
139 * from happening. None of our business here though: we just refuse to do anything and emit this error. */
141 // Note that this clause will always be reached subsequently also.
142 }
143 else if (meta_blob.size() > S_MAX_META_BLOB_LENGTH)
144 {
146 FLOW_LOG_WARNING("Socket stream [" << *this << "]: Send: User argument length [" << meta_blob.size() << "] "
147 "exceeds limit [" << S_MAX_META_BLOB_LENGTH << "].");
148 // More WARNING logging below; just wanted to show those particular details also.
149 }
150 else if (m_snd_pending_err_code) // && (!m_snd_finished) && (meta_blob.size() OK)
151 {
152 /* This --^ holds either the last inline-completed send_native_handle() call's emitted Error_code, or (rarely) one
153 * that was found while attempting to dequeue previously-would-blocked queued-up (due to incomplete s_n_h())
154 * payload(s). This may seem odd, but that's part of the design of the send interface which we wanted to be
155 * as close to looking like a series of synchronous always-inline-completed send_*() calls as humanly possible,
156 * especially given that 99.9999% of the time that will indeed occur given proper behavior by the opposing
157 * receiver. */
158
159 FLOW_LOG_INFO("Socket stream [" << *this << "]: An error was detected earlier and saved for any subsequent "
160 "send attempts like this. Will not proceed with send. More info in WARNING below.");
161 *err_code = m_snd_pending_err_code;
162 }
163 else // if (!m_snd_finished) && (meta_blob.size() OK) && (!m_snd_pending_err_code)
164 {
165 /* As seen in protocol definition in class doc header, payload 1 contains handle, if any, and the length of
166 * the blob in payload 2 (or 0 if no payload 2). Set up the meta-length thing on the stack before entering
167 * critical section. Endianness stays constant on the machine, so don't worry about that.
168 * @todo Actually it would be (1) more forward-compatible and (2) consistent to do the same as how
169 * the structured layer encodes UUIDs -- mandating a pre-send conversion native->little-endian, post-send
170 * conversion backwards. The forward-compatibility is for when this mechanism is expanded to inter-machine IPC;
171 * while noting that the conversion is actually a no-op given our known hardware, so no real perf penalty. */
172 const auto meta_length_raw = low_lvl_payload_blob_length_t(meta_size);
173 // ^-- must stay on stack while snd_sync_write_or_q_payload() executes, and it will. Will be copied if must queue.
174 const Blob_const meta_length_blob(&meta_length_raw, sizeof(meta_length_raw));
175
176 // Payload 2, if any, consists of stuff we already have ready, namely simply meta_blob itself. Nothing to do now.
177
178 // Little helper for below; handles each `(handle or none, blob)` payload. Sets m_snd_pending_err_code.
179 const auto send_low_lvl_payload
180 = [&](unsigned int idx, Native_handle payload_hndl, const Blob_const& payload_blob)
181 {
182 FLOW_LOG_TRACE("Socket stream [" << *this << "]: Wanted to send handle [" << hndl_or_null << "] with "
183 "meta-blob of size [" << meta_blob.size() << "]. "
184 "About to send payload index [" << idx << "] of "
185 "[" << ((meta_length_raw == 0) ? 1 : 2) << "] new low-level payloads; "
186 "includes handle [" << payload_hndl << "] and "
187 "low-level blob of size [" << payload_blob.size() << "] "
188 "located @ [" << payload_blob.data() << "].");
189
190 /* As the name indicates this may synchronously finish it or queue up any parts instead to be done once
191 * a would-block clears, when user informs of this past the present function's return. */
192 snd_sync_write_or_q_payload(payload_hndl, payload_blob, false);
193 /* That may have returned `true` indicating everything (up to and including our payload) was synchronously
194 * given to kernel successfuly; or this will never occur, because outgoing-pipe-ending error was encountered.
195 * Since this is send_native_handle(), we do not care: there is no on-done
196 * callback to invoke, as m_snd_finished is false, as *end_sending() has not been called yet. */
197 }; // const auto send_low_lvl_payload =
198
199 /* Send-or-queue each payload of which we spoke above. There's a very high chance all of this is done inline;
200 * but there's a small chance either there's stuff queued already (we've delegated waiting for would-block
201 * to clear to user), or not but this can't fully complete (encountered would-block). We don't care here per
202 * se; I am just saying for context, to clarify what "send-or-queue" means. */
203 send_low_lvl_payload(1, hndl_or_null, meta_length_blob); // It sets m_snd_pending_err_code.
204 if ((meta_length_raw != 0) && (!m_snd_pending_err_code))
205 {
206 send_low_lvl_payload(2, Native_handle(), meta_blob); // It sets m_snd_pending_err_code.
207 }
208
209 *err_code = m_snd_pending_err_code; // Emit the new error.
210
211 // Did either thing generate a new error?
212 if (*err_code)
213 {
214 FLOW_LOG_TRACE("Socket stream [" << *this << "]: Wanted to send user message but detected error "
215 "synchronously. "
216 "Error code details follow: [" << *err_code << "] [" << err_code->message() << "]. "
217 "Saved error code to return in next user send attempt if any, after this attempt also "
218 "returns that error code synchronously first.");
219 }
220 else if (m_snd_auto_ping_period != Fine_duration::zero()) // && (!*err_code)
221 {
222 /* Send requested, and there was no error; that represents non-idleness. If auto_ping() has been called
223 * (the feature is engaged), idleness shall occur at worst in m_snd_auto_ping_period; hence reschedule
224 * snd_on_ev_auto_ping_now_timer_fired(). */
225
226 const size_t n_canceled = m_snd_auto_ping_timer.expires_after(m_snd_auto_ping_period);
227
228 FLOW_LOG_TRACE("Socket stream [" << *this << "]: Send request from user; hence rescheduled "
229 "auto-ping to occur in "
230 "[" << round<milliseconds>(m_snd_auto_ping_period) << "] (will re-reschedule "
231 "again upon any other outgoing traffic that might be requested before then). As a result "
232 "[" << n_canceled << "] previously scheduled auto-pings have been canceled; 1 is most likely; "
233 "0 means an auto-ping is *just* about to fire (we lost the race -- which is fine).");
234 if (n_canceled == 1)
235 {
236 /* m_timer_worker will m_snd_auto_ping_timer.async_wait(F), where F() will signal through pipe,
237 * making *m_snd_auto_ping_timer_fired_peer readable. We've already used m_snd_ev_wait_func() to start
238 * wait on it being readable and invoke snd_on_ev_auto_ping_now_timer_fired() in that case; but we've
239 * canceled the previous .async_wait() that would make it readable; so just redo that part. */
240 m_timer_worker.timer_async_wait(&m_snd_auto_ping_timer, m_snd_auto_ping_timer_fired_peer);
241 }
242 else
243 {
244 assert((n_canceled == 0) && "We only invoke one timer async_wait() at a time.");
245
246 /* Too late to cancel snd_on_ev_auto_ping_now_timer_fired(), so it'll just schedule next one itself.
247 * Note that in practice the effect is about the same. */
248 }
249 } // else if (m_snd_auto_ping_period != zero) && (!*err_code)
250 // else if (m_snd_auto_ping_period == zero) && (!*err_code) { Auto-ping feature not engaged. }
251 } /* else if (!m_snd_finished) && (meta_blob.size() OK) && (!m_snd_pending_err_code)
252 * (but m_snd_pending_err_code may have become truthy inside) */
253
254 if (*err_code)
255 {
256 // At the end try to categorize nature of error.
257 FLOW_LOG_WARNING("Socket stream [" << *this << "]: Wanted to send handle [" << hndl_or_null << "] with "
258 "meta-blob of size [" << meta_blob.size() << "], but an error (not necessarily new error) "
259 "encountered on pipe or in user API args. Error code details follow: "
260 "[" << *err_code << "] [" << err_code->message() << "]; "
261 "pipe hosed (sys/protocol error)? = "
262 "[" << ((*err_code != error::Code::S_INVALID_ARGUMENT)
263 && (*err_code != error::Code::S_SENDS_FINISHED_CANNOT_SEND)) << "]; "
264 "sending disabled by user? = "
265 "[" << (*err_code == error::Code::S_SENDS_FINISHED_CANNOT_SEND) << "].");
266 }
267
268 return true;
269} // Native_socket_stream::Impl::send_native_handle()
270
272{
273 return async_end_sending_impl(nullptr, flow::async::Task_asio_err());
274}
275
277 flow::async::Task_asio_err&& on_done_func)
278{
279 Error_code sync_err_code;
280 // This guy either takes null/.empty(), or non-null/non-.empty(): it doesn't do the Flow-style error emission itself.
281 const bool ok = async_end_sending_impl(&sync_err_code, std::move(on_done_func));
282
283 if (!ok)
284 {
285 return false; // False start.
286 }
287 // else
288
289 // Standard error-reporting semantics.
290 if ((!sync_err_code_ptr) && sync_err_code)
291 {
292 throw flow::error::Runtime_error(sync_err_code, "Native_socket_stream::Impl::async_end_sending()");
293 }
294 // else
295 sync_err_code_ptr && (*sync_err_code_ptr = sync_err_code);
296 // And if (!sync_err_code_ptr) + no error => no throw.
297
298 return true;
299} // Native_socket_stream::Impl::async_end_sending()
300
302 flow::async::Task_asio_err&& on_done_func_or_empty)
303{
304 using util::Blob_const;
305 using flow::async::Task_asio_err;
306
307 assert(bool(sync_err_code_ptr_or_null) == (!on_done_func_or_empty.empty()));
308
309 // We comment liberally, but tactically, inline; but please read the strategy in the class doc header's impl section.
310
311 if ((!op_started<Op::S_SND>("async_end_sending()")) || (!state_peer("async_end_sending()")))
312 {
313 return false;
314 }
315 // else
316
317 /* Went back and forth on this semantic. Choices, if m_snd_finished==true already, were:
318 * -1- Just return. Never call on_done_func_or_empty() either.
319 * -2- on_done_func_or_empty().
320 * -a- Could pass success Error_code.
321 * -b- Could pass some new special Error_code for doubly-ending sends.
322 * -3- Just assert(false) -- undefined behavior.
323 * -4- Just return... but return false or something.
324 *
325 * 1 is out, because then they might be expecting it to be called, and it's never called; and since they can't solve
326 * the halting problem, they could never know that it won't be called; of course they shouldn't be calling us in the
327 * first place... but if they DID call us then presumably it's because it was a mistake, or they want to find out.
328 * In the latter case, they're messed over; so 1 is out.
329 *
330 * (Note there is NO other way m_snd_finished becomes true.)
331 *
332 * 2b seems annoying -- an entirely new error code for something that's most likely an easily avoidable mistake
333 * (though could reuse S_SENDS_FINISHED_CANNOT_SEND or S_INVALID_ARGUMENT...); and if they're trying to determine
334 * *if* they'd already called it, then doing it via async handler is annoying from user's PoV and much better done
335 * with an accessor synchronously. Most importantly it breaks the normal pattern, wherein asynchronously reported
336 * errors are asynchronously encountered (system) conditions, which this isn't; it's just meh. Not awful but meh.
337 *
338 * 2a is pretty good for the user. Though it won't indicate there was a problem... but on the other hand who cares?
339 * However, internally it creates another async flow which would require some reasoning to ensure it doesn't interact
340 * in some way with the rest of the outgoing direction (and incoming for that matter). It'd be annoying to have
341 * to think hard for such a dinky scenario.
342 *
343 * 3 is pretty good. Implementation-wise it's by far the easiest. Usage-wise, the user just needs to not make the
344 * obvious error of calling it twice. This can be done with their own flag if desired. This seems sufficient, though
345 * through experience we may determine otherwise ultimately which would require an API change, and that'd suck a bit.
346 *
347 * 4 is pretty good. The interface is a little less clean, but if the user wouldn't cause the assert() in 3, then
348 * they can exactly equally ignore the return value in 4 or assert() on it themselves. It also allows the user to
349 * detect the mistake easily.
350 *
351 * So 3 and 4 seem the best, and 4 is more robust at the cost of a small diff in interface complexity. */
352 if (m_snd_finished)
353 {
354 FLOW_LOG_WARNING("Socket stream [" << *this << "]: User wants to end sending, but we're in sends-finished "
355 "state already. Ignoring.");
356 return false;
357 }
358 // else
359 m_snd_finished = true; // Cause future send_native_handle() to emit S_SENDS_FINISHED_CANNOT_SEND and return.
360
361 bool qd; // Set to false to report results *now*: basically true <=> stuff is still queued to send.
362 if (m_snd_pending_err_code)
363 {
364 qd = false; // There was outgoing-pipe-ending error detected before us; so should immediately report.
365 }
366 else
367 {
368 // Save this to emit once everything (including the thing we just made) has been sent off. Or not if empty.
369 assert(m_snd_pending_on_last_send_done_func_or_empty.empty());
370 m_snd_pending_on_last_send_done_func_or_empty = std::move(on_done_func_or_empty);
371 // on_done_func_or_empty is potentially hosed now.
372
373 /* Prepare/send the payload per aforementioned (class doc header) strategy: no handle, and a 0x0000 integer.
374 * Keeping comments light, as this is essentially a much simplified version of send_native_handle(). */
375
376 const auto ZERO_SIZE_RAW = low_lvl_payload_blob_length_t(0);
377 const Blob_const blob_with_0(&ZERO_SIZE_RAW, sizeof(ZERO_SIZE_RAW));
378
379 /* snd_sync_write_or_q_payload():
380 * Returns true => out-queue flushed successfully; or error detected.
381 * => report synchronously now. (Unless they don't care about any such report.)
382 * Returns false => out-queue has stuff in it and will continue to, until transport is writable.
383 * => cannot report completion yet. */
384
385 qd = !snd_sync_write_or_q_payload(Native_handle(), blob_with_0, false);
386 if (qd && sync_err_code_ptr_or_null)
387 {
388 /* It has not been flushed (we will return would-block).
389 * Save this to emit once everything (including the thing we just made) has been sent off, since
390 * they care about completion (sync_err_code_ptr_or_null not null). */
391 assert(m_snd_pending_on_last_send_done_func_or_empty.empty());
392 m_snd_pending_on_last_send_done_func_or_empty = std::move(on_done_func_or_empty);
393 // on_done_func_or_empty is potentially hosed now.
394 }
395 /* else if (qd && (!sync_err_code_ptr_or_null))
396 * { on_done_func_or_empty is .empty() anyway. Anyway they don't care about emitting result. Done:
397 * It'll be async-sent when/if possible. }
398 * else if (!qd)
399 * { All flushed synchronously. We will emit it synchronously, if they're interested in that. } */
400 } // if (!m_snd_pending_err_code) (but it may have become truthy inside)
401
402 // Log the error, if any; report the result synchronously if applicable.
403
404 if (m_snd_pending_err_code)
405 {
406 FLOW_LOG_WARNING("Socket stream [" << *this << "]: User wanted to end sending, but an error (not necessarily "
407 "new error) encountered on pipe synchronously when trying to send graceful-close. "
408 "Nevertheless locally sends-finished state is now active. Will report completion via "
409 "sync-args (if any). Error code details follow: "
410 "[" << m_snd_pending_err_code << "] [" << m_snd_pending_err_code.message() << "].");
411 assert(!qd);
412 }
413 else if (qd)
414 {
415 FLOW_LOG_INFO("Socket stream [" << *this << "]: User wanted to end sending. Success so far but out-queue "
416 "has payloads -- at least the graceful-close payload -- still pending while waiting for "
417 "writability. Locally sends-finished state is now active, and the other side will be informed "
418 "of this barring subsequent system errors. "
419 "We cannot report completion via sync-args (if any).");
420 }
421 else // if ((!m_snd_pending_err_code) && (!qd))
422 {
423 FLOW_LOG_INFO("Socket stream [" << *this << "]: User wanted to end sending. Immediate success: out-queue "
424 "flushed permanently. "
425 "Locally sends-finished state is now active, and the other side will be informed of this. "
426 "Locally will report completion via sync-args (if any).");
427 }
428
429 if (sync_err_code_ptr_or_null)
430 {
431 *sync_err_code_ptr_or_null = qd ? error::Code::S_SYNC_IO_WOULD_BLOCK
432 : m_snd_pending_err_code; // Could be falsy (probably is usually).
433 }
434 // else { Don't care about completion. }
435
436 return true;
437} // Native_socket_stream::Impl::async_end_sending_impl()
438
440{
441 using util::Blob_const;
443 using util::Task;
444 using boost::chrono::round;
445 using boost::chrono::milliseconds;
446
447 if ((!op_started<Op::S_SND>("auto_ping()")) || (!state_peer("auto_ping()")))
448 {
449 return false;
450 }
451 // else
452
453 assert(period.count() > 0);
454
455 if (m_snd_finished)
456 {
457 FLOW_LOG_WARNING("Socket stream [" << *this << "]: User wants to start auto-pings, but we're in "
458 "sends-finished state already. Ignoring.");
459 return false;
460 }
461 // else
462
463 /* By concept requirements we are to do 2 things: send an auto-ping now as a baseline; and then make it so
464 * *some* message (auto-ping or otherwise) is sent at least every `period` until *end_sending() or error. */
465
466 /* Prepare the payload per class doc header strategy: no handle, and a 0xFFFF... integer.
467 * Keeping comments somewhat light, as this is essentially a much simplified version of send_native_handle()
468 * and is very similar to what async_end_sending() does in this spot. */
469
470 if (m_snd_auto_ping_period != Fine_duration::zero())
471 {
472 FLOW_LOG_WARNING("Socket stream [" << *this << "]: User wants to start auto-pings, but this "
473 "has already been engaged earlier. Ignoring.");
474 return false;
475 }
476 // else
477 m_snd_auto_ping_period = period; // Remember this, both as flag (non-zero()) and to know how often to reschedule it.
478
479 FLOW_LOG_INFO("Socket stream [" << *this << "]: User wants to start auto-pings so that there are "
480 "outgoing messages at least as frequently as every "
481 "[" << round<milliseconds>(m_snd_auto_ping_period) << "]. Sending baseline auto-ping and scheduling "
482 "first subsequent auto-ping; it may be rescheduled if more user traffic occurs before then.");
483
484 if (m_snd_pending_err_code)
485 {
486 /* Concept does not require us to report any error via auto_ping() itself. It's for receiver's benefit anyway.
487 * The local user will discover it, assuming they have interest, via the next send_*() or *end_sending(). */
488 FLOW_LOG_WARNING("Socket stream [" << *this << "]: User wanted to start auto-pings, but an error was "
489 "previously encountered on pipe; so will not auto-ping. "
490 "Error code details follow: [" << m_snd_pending_err_code << "] "
491 "[" << m_snd_pending_err_code.message() << "].");
492 return true;
493 }
494 // else
495
496 const Blob_const blob_with_ff(&S_META_BLOB_LENGTH_PING_SENTINEL, sizeof(S_META_BLOB_LENGTH_PING_SENTINEL));
497
498 /* Important: avoid_qing=true for reasons explained in its doc header. Namely:
499 * If blob_with_ff would-block entirely, then there are already data that would signal-non-idleness sitting
500 * in the kernel buffer, so the auto-ping can be safely dropped in that case. */
501 snd_sync_write_or_q_payload(Native_handle(), blob_with_ff, true);
502 if (m_snd_pending_err_code)
503 {
504 FLOW_LOG_WARNING("Socket stream [" << *this << "]: Wanted to send initial auto-ping but detected error "
505 "synchronously. "
506 "Error code details follow: [" << m_snd_pending_err_code << "] "
507 "[" << m_snd_pending_err_code.message() << "]. "
508 "Saved error code to return in next user send attempt if any; otherwise ignoring; "
509 "will not schedule periodic auto-pings.");
510 return true;
511 }
512 // else
513
514 // Initial auto-ping partially- or fully-sent fine (anything unsent was queued). Now schedule next one per above.
515
516 /* Now we can schedule it, similarly to send_native_handle() and snd_on_ev_auto_ping_now_timer_fired() itself.
517 * Recall that we can't simply .async_wait(F) on some timer and have the desired code --
518 * snd_on_ev_auto_ping_now_timer_fired() which sends the ping -- be the handler F. We have to use the sync_io
519 * pattern, where the user waits on events for us... but there's no timer FD, so we do it in the separate thread and
520 * then ping via a pipe. (This is all discussed elsewhere, but yes, timerfd_*() is available; the boost.asio
521 * timer API is much nicer however -- and while it appears to use timerfd_*() at least optionally, this
522 * functionality is not exposed via .native_handle() or something. But I digress!!! Discussed elsewhere.) */
523
524 m_snd_ev_wait_func(&m_snd_ev_wait_hndl_auto_ping_timer_fired_peer,
525 false, // Wait for read.
526 // Once readable do this: pop pipe; send ping; schedule again.
527 boost::make_shared<Task>
528 ([this]() { snd_on_ev_auto_ping_now_timer_fired(); }));
529 /* Reminder: That has no effect (other than user recording stuff) until this method returns.
530 * So it cannot execute concurrently or anything. They'd need to do their poll()/epoll_wait() or do so
531 * indirectly by returning from the present boost.asio task (if they're running a boost.asio event loop). */
532
533 /* Set up the actual timer. The second call really does m_snd_auto_ping_timer.async_wait().
534 * Note that could fire immediately, even concurrently (if m_snd_auto_ping_period is somehow insanely short,
535 * and the timer resolution is amazing)... but it would only cause snd_on_ev_auto_ping_now_timer_fired()
536 * once they detect the pipe-readable event, which (again) can only happen after we return. */
537 m_snd_auto_ping_timer.expires_after(m_snd_auto_ping_period);
538 m_timer_worker.timer_async_wait(&m_snd_auto_ping_timer, m_snd_auto_ping_timer_fired_peer);
539
540 return true;
541} // Native_socket_stream::Impl::auto_ping()
542
544{
545 using util::Blob_const;
546 using util::Task;
547
548 /* This is an event handler! Specifically for the *m_snd_auto_ping_timer_fired_peer pipe reader being
549 * readable. To avoid infinite-loopiness, we'd best pop the thing that was written there. */
550 m_timer_worker.consume_timer_firing_signal(m_snd_auto_ping_timer_fired_peer);
551
552 // Now do the auto-ping itself.
553
554 if (m_snd_pending_err_code)
555 {
556 /* Concept does not require us to report any error via auto_ping() itself. It's for receiver's benefit anyway.
557 * The local user will discover it, assuming they have interest, via the next send_*() or *end_sending(). */
558 FLOW_LOG_WARNING("Socket stream [" << *this << "]: Auto-ping timer fired, but an error was "
559 "previously encountered in 2-way pipe; so will neither auto-ping nor schedule next auto-ping. "
560 "Error code details follow: [" << m_snd_pending_err_code << "] "
561 "[" << m_snd_pending_err_code.message() << "].");
562 return;
563 }
564 // else
565
566 if (m_snd_finished)
567 {
568 // This is liable to be quite common and not of much interest at the INFO level; though it's not that verbose.
569 FLOW_LOG_TRACE("Socket stream [" << *this << "]: "
570 "Auto-ping timer fired; but graceful-close API earlier instructed us to no-op. No-op.");
571 return;
572 }
573 // else
574
575 // This may be of some interest sufficient for INFO. @todo Reconsider due to non-trivial verbosity possibly.
576 FLOW_LOG_INFO("Socket stream [" << *this << "]: "
577 "Auto-ping timer fired; sending/queueing auto-ping; scheduling for next time; it may be "
578 "rescheduled if more user traffic occurs before then.");
579
580 // The next code is similar to the initial auto_ping(). Keeping comments light.
581
582 const Blob_const blob_with_ff{&S_META_BLOB_LENGTH_PING_SENTINEL, sizeof(S_META_BLOB_LENGTH_PING_SENTINEL)};
583
584 snd_sync_write_or_q_payload(Native_handle(), blob_with_ff, true);
585 if (m_snd_pending_err_code)
586 {
587 FLOW_LOG_WARNING("Socket stream [" << *this << "]: Wanted to send non-initial auto-ping but detected error "
588 "synchronously. "
589 "Error code details follow: [" << m_snd_pending_err_code << "] "
590 "[" << m_snd_pending_err_code.message() << "]. "
591 "Saved error code to return in next user send attempt if any; otherwise ignoring; "
592 "will not continue scheduling periodic auto-pings.");
593 return;
594 }
595 // else
596
597 m_snd_ev_wait_func(&m_snd_ev_wait_hndl_auto_ping_timer_fired_peer,
598 false, // Wait for read.
599 boost::make_shared<Task>
600 ([this]() { snd_on_ev_auto_ping_now_timer_fired(); }));
601 m_snd_auto_ping_timer.expires_after(m_snd_auto_ping_period);
602 m_timer_worker.timer_async_wait(&m_snd_auto_ping_timer, m_snd_auto_ping_timer_fired_peer);
603} // Native_socket_stream::Impl::snd_on_ev_auto_ping_now_timer_fired()
604
606 const util::Blob_const& orig_blob, bool avoid_qing)
607{
608 using flow::util::Blob;
609 using util::Blob_const;
610
611 // We comment liberally, but tactically, inline; but please read the strategy in the class doc header's impl section.
612
613 assert((!m_snd_pending_err_code) && "Pipe must not be pre-hosed by contract.");
614
615 size_t n_sent_or_zero;
616 if (m_snd_pending_payloads_q.empty())
617 {
618 FLOW_LOG_TRACE("Socket stream [" << *this << "]: Want to send low-level payload: "
619 "handle [" << hndl_or_null << "] with blob of size [" << orig_blob.size() << "] "
620 "located @ [" << orig_blob.data() << "]; no write is pending so proceeding immediately. "
621 "Will drop if all of it would-block? = [" << avoid_qing << "].");
622
623 n_sent_or_zero = snd_nb_write_low_lvl_payload(hndl_or_null, orig_blob, &m_snd_pending_err_code);
624 if (m_snd_pending_err_code) // It will *not* emit would-block (will just return 0 but no error).
625 {
626 assert(n_sent_or_zero == 0);
627 return true; // Pipe-direction-ending error encountered; outgoing-direction pipe is finished forevermore.
628 }
629 // else
630
631 if (n_sent_or_zero == orig_blob.size())
632 {
633 // Awesome: Mainstream case: We wrote the whole thing synchronously.
634 return true; // Outgoing-direction pipe flushed.
635 // ^-- No error. Logged about success in snd_nb_write_low_lvl_payload().
636 }
637 // else if (n_sent_or_zero < orig_blob.size()) { Fall through. n_sent_or_zero is significant. }
638 } // if (m_snd_pending_payloads_q.empty())
639 else // if (!m_snd_pending_payloads_q.empty())
640 {
641 // Other stuff is currently being asynchronously sent, so we can only queue our payload behind all that.
642 n_sent_or_zero = 0;
643 }
644
645 /* At this point some or all of the payload could not be sent (either because another async write-op is in progress,
646 * or not but it would-block if we tried to send the rest now).
647 * Per algorithm, we shall now have to queue it up to be sent (and if nothing is currently pending begin
648 * asynchronously sending it ASAP). The question now is what is "it" exactly: it needs to be exactly the payload
649 * except the parts that *were* just sent (if any).
650 *
651 * avoid_qing==true, as of this writing used for auto-pings only, affects
652 * the above as follows: If *all* of orig_blob would need to be queued (queue was already non-empty, or it
653 * was empty, and snd_nb_write_low_lvl_payload() yielded would-block for *all* of orig_blob.size()), then:
654 * simply pretend like it was sent fine; and continue like nothing happened. (See our doc header for
655 * rationale.) */
656
657 if (avoid_qing)
658 {
659 assert(hndl_or_null.null()
660 && "Internal bug? Do not ask to drop a payload with a native handle inside under any circumstances.");
661 if (n_sent_or_zero == 0)
662 {
663 /* This would happen at most every few sec (with auto-pings) and is definitely a rather interesting
664 * situation though not an error; INFO is suitable. */
665 const auto q_size = m_snd_pending_payloads_q.size();
666 FLOW_LOG_INFO("Socket stream [" << *this << "]: Wanted to send low-level payload: "
667 "blob of size [" << orig_blob.size() << "] located @ [" << orig_blob.data() << "]; "
668 "result was would-block for all of its bytes (either because blocked-queue was non-empty "
669 "already, or it was empty, but all of payload's bytes would-block at this time). "
670 "Therefore dropping payload (done for auto-pings at least). Out-queue size remains "
671 "[" << q_size << "].");
672
673 /* We won't enqueue it, so there's nothing more to do, but careful in deciding what to return:
674 * If the queue is empty, we promised we would return true. If the queue is not empty, we promised
675 * we would return false. Whether that's what we should do is subtly questionable, but as of this
676 * writing what we return when avoid_qing==true is immaterial (is ignored). */
677 return q_size == 0;
678 }
679 // else if (n_sent_or_zero > 0) (but not == orig_blob.size())
680
681 // This is even more interesting; definitely INFO as well.
682 FLOW_LOG_INFO("Socket stream [" << *this << "]: Wanted to send low-level payload: "
683 "blob of size [" << orig_blob.size() << "] located @ [" << orig_blob.data() << "]; "
684 "result was would-block for all but [" << n_sent_or_zero << "] of its bytes (blocked-queue "
685 "was empty, so nb-send was attmpted, and some -- but not all -- of payload's bytes "
686 "would-block at this time). We cannot \"get back\" the sent bytes and thus are forced "
687 "to queue the remaining ones (would have dropped payload if all the bytes would-block).");
688 // Fall-through.
689 } // if (avoid_qing)
690 // else if (!avoid_qing) { Fall through. }
691
692 auto new_low_lvl_payload = boost::movelib::make_unique<Snd_low_lvl_payload>();
693 if (n_sent_or_zero == 0)
694 {
695 new_low_lvl_payload->m_hndl_or_null = hndl_or_null;
696 }
697 // else { Leave it as null. Even if (!hndl_or_null.null()): 1+ bytes were sent OK => so was hndl_or_null. }
698
699 new_low_lvl_payload->m_blob = Blob(get_logger());
700
701 /* Allocate N bytes; copy N bytes from area referred to by new_blob. Start at 1st unsent byte (possibly 1st byte).
702 * This is the first and only place we copy the source blob (not counting the transmission into kernel buffer); we
703 * have tried our best to synchronously send all of `new_blob`, which would've avoided getting here and this copy.
704 * Now we have no choice. As discussed in the class doc header, probabilistically speaking we should rarely (if
705 * ever) get here (and do this annoying alloc, and copy, and later dealloc) under normal operation of both sides. */
706 const auto& new_blob = orig_blob + n_sent_or_zero;
707 new_low_lvl_payload->m_blob.assign_copy(new_blob);
708
709 FLOW_LOG_TRACE("Socket stream [" << *this << "]: Want to send pending-from-would-block low-level payload: "
710 "handle [" << new_low_lvl_payload->m_hndl_or_null << "] with "
711 "blob of size [" << new_low_lvl_payload->m_blob.size() << "] "
712 "located @ [" << new_blob.data() << "]; "
713 "created blob copy @ [" << new_low_lvl_payload->m_blob.const_buffer().data() << "]; "
714 "enqueued to out-queue which is now of size [" << (m_snd_pending_payloads_q.size() + 1) << "].");
715
716 m_snd_pending_payloads_q.emplace(std::move(new_low_lvl_payload)); // Push a new Snd_low_lvl_payload::Ptr.
717
718 if (m_snd_pending_payloads_q.size() == 1)
719 {
720 /* Queue was empty; now it isn't; so start the chain of async send head=>dequeue=>async send head=>dequeue=>....
721 * (In our case "async send head" means asking (via m_snd_ev_wait_func) user to inform (via callback we pass
722 * to m_snd_ev_wait_func) us when would-block has cleared, call snd_nb_write_low_lvl_payload() again....
723 * If we were operating directly as a boost.asio async loop then the first part would just be
724 * m_peer_socket->async_wait(); but we cannot do that; user does it for us, controlling what gets called when
725 * synchronously.) */
726 snd_async_write_q_head_payload();
727
728 /* That immediately failed => m_snd_pending_err_code is truthy
729 * => Pipe-direction-ending error encountered; outgoing-direction pipe is finished forevermore. => return true;
730 * That did not fail ("async"-wait for writable begins) => m_snd_pending_err_code is falsy
731 * => Outgoing-direction pipe has pending queued stuff. => return false; */
732 return bool(m_snd_pending_err_code);
733 }
734 // else
735 assert(!m_snd_pending_err_code);
736 return false; // Outgoing-direction pipe has (even more) pending queued stuff; nothing to do about it for now.
737} // Native_socket_stream::Impl::snd_sync_write_or_q_payload()
738
740 const util::Blob_const& blob, Error_code* err_code)
741{
742 using flow::util::Lock_guard;
744
745 // We comment liberally, but tactically, inline; but please read the strategy in the class doc header's impl section.
746
747 /* Result semantics (reminder of what we promised in contract):
748 * Partial or total success in non-blockingly sending blob+handle: >= 1; falsy *err_code.
749 * No success in sending blob+handle, because it would-block (not fatal): == 0; falsy *err_code.
750 * No success in sending blob+handle, because fatal error: == 0, truthy *err_code. */
751 size_t n_sent_or_zero = 0;
752
753 // m_*peer_socket = (the only) shared data between our- and opposite-direction code. Must lock (see class docs).
754 {
755 Lock_guard<decltype(m_peer_socket_mutex)> peer_socket_lock(m_peer_socket_mutex);
756
757 if (m_peer_socket)
758 {
759 if (hndl_or_null.null())
760 {
761 /* hndl_or_null is in fact null, so we can just use boost.asio's normal non-blocking send (non_blocking(true),
762 * write_some()). */
763
764 /* First set non-blocking mode. (Subtlety: We could use Linux extension per-call MSG_DONTWAIT flag instead;
765 * but this way is fully portable. For posterity: to do that, use m_peer_socket->send(), which is identical to
766 * write_some() but has an overload accepting flags to OS send() call, where one could supply MSG_DONTWAIT.
767 * boost.asio lacks a constant for it, but we could just use actual MSG_DONTWAIT; of course stylistically that's
768 * not as nice and suggests lesser portability.) */
769 if (!m_peer_socket->non_blocking()) // This is fast (it doesn't run system calls but uses a cached value).
770 {
771 FLOW_LOG_TRACE("Socket stream [" << *this << "]: Setting boost.asio peer socket non-blocking mode.");
772 m_peer_socket->non_blocking(true, *err_code); // Sets *err_code to success or the triggering error.
773 }
774 else // if (already non-blocking)
775 {
776 err_code->clear();
777 }
778
779 if (!*err_code)
780 {
781 assert(m_peer_socket->non_blocking());
782
783 FLOW_LOG_TRACE("Writing low-level blob directly via boost.asio (blob details logged above hopefully).");
784
785 n_sent_or_zero = m_peer_socket->write_some(blob, *err_code);
786 }
787 // else if (*err_code) { *err_code is truthy; n_sent_or_zero == 0; cool. }
788 } // if (hndl_or_null.null())
789 else // if (!hndl_or_null.null())
790 {
791 /* Per contract, nb_write_some_with_native_handle() is identical to setting non_blocking(true) and attempting
792 * write_some(orig_blob) -- except if it's able to send even 1 byte it'll also have sent through hndl_or_null.
793 * When we say identical we mean identical result semantics along with everything else. */
794 n_sent_or_zero = nb_write_some_with_native_handle(get_logger(), m_peer_socket.get(),
795 hndl_or_null, blob, err_code);
796 // That should have TRACE-logged stuff, so we won't (it's our function).
797 }
798
799 /* Almost home free; but our result semantics are a little different from the low-level-write functions'.
800 *
801 * Plus, if we just discovered the connection is hosed, do whatever's needed with that. */
802 assert(((!*err_code) && (n_sent_or_zero != 0))
803 || (*err_code && (n_sent_or_zero == 0)));
804 if (*err_code == boost::asio::error::would_block)
805 {
806 err_code->clear();
807 // *err_code is falsy; n_sent_or_zero == 0; cool.
808 }
809 else if (*err_code)
810 {
811 /* True-blue system error. Kill off *m_peer_socket (connection hosed). We could simply nullify it, which would
812 * give it back to the system (it's a resource), but see m_peer_socket_hosed doc header for explanation as
813 * to why we cannot, and why instead we transfer it to "death row" until dtor executes, at which
814 * point ::close(m_peer_socket_hosed->native_handle()) will actually happen. */
815
816 assert((!m_peer_socket_hosed) && "m_peer_socket_hosed must start as null and only become non-null once. Bug?");
817 m_peer_socket_hosed = std::move(m_peer_socket);
818 assert((!m_peer_socket) && "Shocking unique_ptr misbehavior!");
819
820 // *err_code is truthy; n_sent_or_zero == 0; cool.
821 }
822 // else if (!*err_code) { *err_code is falsy; n_sent_or_zero >= 1; cool. }
823 } // if (m_peer_socket)
824 else // if (!m_peer_socket)
825 {
827 }
828 } // Lock_guard peer_socket_lock(m_peer_socket_mutex)
829
830 assert((!*err_code)
831 || (n_sent_or_zero == 0)); // && *err_code
832
833 if (*err_code)
834 {
835 FLOW_LOG_TRACE("Sent nothing due to error [" << *err_code << "] [" << err_code->message() << "].");
836 }
837 else
838 {
839 FLOW_LOG_TRACE("Send: no error. Was able to send [" << n_sent_or_zero << "] of [" << blob.size() << "] bytes.");
840 if (!hndl_or_null.null())
841 {
842 FLOW_LOG_TRACE("Able to send the native handle? = [" << (n_sent_or_zero != 0) << "].");
843 }
844 }
845
846 return n_sent_or_zero;
847} // Native_socket_stream::Impl::snd_nb_write_low_lvl_payload()
848
850{
852 using util::Task;
853 using flow::util::Lock_guard;
854 using boost::asio::async_write;
855
856 // We comment liberally, but tactically, inline; but please read the strategy in the class doc header's impl section.
857
858 assert((!m_snd_pending_payloads_q.empty()) && "Contract is stuff is queued to be async-sent. Bug?");
859 assert((!m_snd_pending_err_code) && "Pipe must not be pre-hosed by contract.");
860
861 /* Conceptually we'd like to do m_peer_socket->async_wait(writable, F), where F() would perform
862 * snd_nb_write_low_lvl_payload() (nb-send over m_peer_socket). However this is the sync_io pattern, so
863 * the user will be performing the conceptual async_wait() for us. We must ask them to do so
864 * via m_snd_ev_wait_func(), giving them m_peer_socket's FD -- m_snd_ev_wait_hndl_peer_socket -- to wait-on. */
865
866 // m_*peer_socket = (the only) shared data between our- and opposite-direction code. Must lock (see class docs).
867 {
868 Lock_guard<decltype(m_peer_socket_mutex)> peer_socket_lock(m_peer_socket_mutex);
869
870 if (m_peer_socket)
871 {
872 /* @todo Previously, when we ran our own boost.asio loop in a separate thread mandatorily (non-sync_io pattern),
873 * we did either boost::asio::async_write() (stubborn write, no handle to send) or
874 * asio_local_stream_socket::async_write_with_native_handle (same but with handle to send).
875 * Either one would do the async_wait()->nb-write->async_wait()->... chain for us. Now with sync_io we
876 * manually split it up into our version of "async"-wait->nb-write->.... However other than the
877 * "async-wait" mechanism itself, the logic is the same. The to-do would be to perhaps provide
878 * those functions in such a way as to work with the sync_io way of "async"-waiting, without getting rid
879 * of the publicly useful non-sync_io API (some layered design, confined to
880 * asio_local_stream_socket::*). The logic would not need to be repeated and maintained in two places;
881 * as of this writing the non-sync_io version has become for direct public use (by user) only anyway;
882 * two birds one stone to have both use cases use the same core logic code. */
883
884 m_snd_ev_wait_func(&m_ev_wait_hndl_peer_socket,
885 true, // Wait for write.
886 // Once writable do this:
887 boost::make_shared<Task>
888 ([this]() { snd_on_ev_peer_socket_writable_or_error(); }));
889 return;
890 }
891 // else:
892 } // Lock_guard peer_socket_lock(m_peer_socket_mutex)
893
895
896 /* Style note: I (ygoldfel) was tempted to make snd_on_ev_peer_socket_writable_or_error() a closure right in here,
897 * which saves boiler-plate lines, but subjectively the reading flow seemed too gnarly here. Typically I would have
898 * done it that other way though; in fact originally that's how I had it, but it proved too gnarly over time.
899 * Among other things it's nice to have the on-sent handler appear right *below* the async-op that takes it as
900 * an arg; *above* is dicey to read. */
901} // Native_socket_stream::Impl::snd_async_write_q_head_payload()
902
904{
905 FLOW_LOG_TRACE("Socket stream [" << *this << "]: User-performed wait-for-writable finished (writable or error, "
906 "we do not know which yet). We endeavour to send->pop->send->... as much of the queue as we "
907 "can until would-block or total success.");
908
909 assert((!m_snd_pending_payloads_q.empty()) && "Send-queue should not be touched while async-write of head is going.");
910 assert((!m_snd_pending_err_code) && "Send error would only be detected by us. Bug?");
911
912 // Let's do as much as we can.
913 bool would_block = false;
914 do
915 {
916 auto& low_lvl_payload = *m_snd_pending_payloads_q.front();
917 auto& hndl_or_null = low_lvl_payload.m_hndl_or_null;
918 auto& low_lvl_blob = low_lvl_payload.m_blob;
919 auto low_lvl_blob_view = low_lvl_blob.const_buffer();
920
921 FLOW_LOG_TRACE("Socket stream [" << *this << "]: "
922 "Out-queue size is [" << m_snd_pending_payloads_q.size() << "]; "
923 "want to send handle [" << hndl_or_null << "] with "
924 "low-level blob of size [" << low_lvl_blob_view.size() << "] "
925 "located @ [" << low_lvl_blob_view.data() << "].");
926
927 const auto n_sent_or_zero
928 = snd_nb_write_low_lvl_payload(low_lvl_payload.m_hndl_or_null, low_lvl_blob_view, &m_snd_pending_err_code);
929 if (m_snd_pending_err_code)
930 {
931 continue; // Get out of the loop.
932 }
933 // else
934
935 if (n_sent_or_zero == low_lvl_blob_view.size())
936 {
937 // Everything was sent nicely!
938 m_snd_pending_payloads_q.pop(); // This should dealloc low_lvl_payload.m_blob in particular.
939 }
940 else // if (n_sent_or_zero != low_lvl_payload.m_blob.size())
941 {
942 /* Some or all of the payload could not be sent (it would-block if we tried to send the rest now).
943 * This is similar to snd_sync_write_or_q_payload(), but we needn't enqueue it, as it's already enqueued;
944 * just "edit" it in-place as needed. */
945 if (n_sent_or_zero != 0)
946 {
947 // Even if (!m_hndl_or_null.null()): 1 bytes were sent => so was the native handle.
948 low_lvl_payload.m_hndl_or_null = Native_handle();
949 /* Slide its .begin() to the right by n_sent_or_zero (might be no-op if was not writable after all).
950 * Note internally it's just a size_t +=; no realloc or anything. */
951 low_lvl_payload.m_blob.start_past_prefix_inc(n_sent_or_zero);
952 }
953 // else if (n_sent_or_zero == 0) { Nothing was sent, so no edits needed to low_lvl_payload. }
954 would_block = true; // Stop; would-block if we tried more.
955 }
956 }
957 while ((!m_snd_pending_payloads_q.empty()) && (!would_block) && (!m_snd_pending_err_code));
958
959 // Careful! This must be done before the `if` sequence below, as it can make m_snd_pending_err_code truthy after all.
960 if ((!m_snd_pending_err_code) && (!m_snd_pending_payloads_q.empty()))
961 {
962 FLOW_LOG_TRACE("Out-queue has not been emptied. Must keep async-send chain going.");
963
964 // Continue the chain (this guy "asynchronously" brought us here in the first place).
965 snd_async_write_q_head_payload();
966 /* To be clear: queue can now only become empty in "async" handler, not synchronously here.
967 *
968 * Reasoning sanity check: How could m_snd_pending_err_code become truthy here yet falsy (success)
969 * through the do/while() loop above? Answer: The writes could work; then after the last such write,
970 * but before snd_async_write_q_head_payload() -- which needs an m_*peer_socket member to indicate
971 * connection is still healthy, to initiate the user-executed wait-for-writable -- incoming-direction processing
972 * could have hosed m_*peer_socket. E.g., it tried to nb-receive and exposed newly-arrived error condition. */
973 }
974 /* ^-- if ((!m_snd_pending_err_code) && (!m_snd_pending_payloads_q.empty()))
975 * (but m_snd_pending_err_code may have become truthy inside). */
976
977 // Lastly deal with possibly having to fire async_end_sending() completion handler.
978
979 bool invoke_on_done = false;
980 if (m_snd_pending_err_code)
981 {
982 invoke_on_done = !m_snd_pending_on_last_send_done_func_or_empty.empty();
983 FLOW_LOG_WARNING("Socket stream [" << *this << "]: User-performed wait-for-writable reported completion; "
984 "wanted to nb-send any queued data and possibly initiated another wait-for-writable; "
985 "got error during an nb-send or when initiating wait; TRACE details above. "
986 "Error code details follow: "
987 "[" << m_snd_pending_err_code << "] [" << m_snd_pending_err_code.message() << "]. "
988 "Saved error code to return in next user send attempt if any. "
989 "Will run graceful-sends-close completion handler? = [" << invoke_on_done << "].");
990
991 assert((!m_snd_pending_payloads_q.empty()) && "Opportunistic sanity check.");
992 }
993 else if (m_snd_pending_payloads_q.empty()) // && (!m_snd_pending_err_code)
994 {
995 FLOW_LOG_TRACE("Out-queue has been emptied.");
996
997 if (!m_snd_pending_on_last_send_done_func_or_empty.empty())
998 {
999 // INFO-log is okay, as this occurs at most once per *this.
1000 FLOW_LOG_INFO("Socket stream [" << *this << "]: "
1001 "We sent graceful-close and any preceding user messages with success. Will now inform user via "
1002 "graceful-sends-close completion handler.");
1003 invoke_on_done = true;
1004 }
1005 } // else if (m_snd_pending_payloads_q.empty() && (!m_snd_pending_err_code))
1006 // else if ((!m_snd_pending_payloads_q.empty()) && (!m_snd_pending_err_code)) { Async-wait started. }
1007
1008 if (invoke_on_done)
1009 {
1010 FLOW_LOG_TRACE("Socket stream [" << *this << "]: Executing end-sending completion handler now.");
1011 auto on_done_func = std::move(m_snd_pending_on_last_send_done_func_or_empty);
1012 m_snd_pending_on_last_send_done_func_or_empty.clear(); // For cleanliness, in case move() didn't do it.
1013
1014 on_done_func(m_snd_pending_err_code);
1015 FLOW_LOG_TRACE("Handler completed.");
1016 }
1017} // Native_socket_stream::Impl::snd_on_ev_peer_socket_writable_or_error()
1018
1020{
1021 return state_peer("send_meta_blob_max_size()") ? S_MAX_META_BLOB_LENGTH : 0;
1022}
1023
1025{
1026 return send_meta_blob_max_size();
1027}
1028
1029} // namespace ipc::transport::sync_io
proto_ver_t local_max_proto_ver_for_sending()
To be called at most once, this returns local_max_proto_ver from ctor the first time and S_VER_UNKNOW...
static constexpr proto_ver_t S_VER_UNKNOWN
A proto_ver_t value, namely a negative one, which is a reserved value indicating "unknown version"; i...
Error_code m_snd_pending_err_code
The first and only connection-hosing error condition detected when attempting to low-level-write on m...
bool async_end_sending(Error_code *sync_err_code, flow::async::Task_asio_err &&on_done_func)
See Native_socket_stream counterpart.
void snd_on_ev_peer_socket_writable_or_error()
Completion handler, from outside event loop via sync_io pattern, for the async-wait initiated by snd_...
Protocol_negotiator m_protocol_negotiator
Handles the protocol negotiation at the start of the pipe.
bool snd_sync_write_or_q_payload(Native_handle hndl_or_null, const util::Blob_const &orig_blob, bool avoid_qing)
Either synchronously sends hndl_or_null handle (if any) and orig_blob low-level blob over m_peer_sock...
void snd_on_ev_auto_ping_now_timer_fired()
Handler for the async-wait, via util::sync_io::Timer_event_emitter, of the auto-ping timer firing; if...
bool send_native_handle(Native_handle hndl_or_null, const util::Blob_const &meta_blob, Error_code *err_code)
See Native_socket_stream counterpart.
void snd_async_write_q_head_payload()
Initiates async-write over m_peer_socket of the low-level payload at the head of out-queue m_snd_pend...
size_t send_meta_blob_max_size() const
See Native_socket_stream counterpart.
bool async_end_sending_impl(Error_code *sync_err_code_ptr_or_null, flow::async::Task_asio_err &&on_done_func_or_empty)
*end_sending() body.
bool start_send_native_handle_ops(util::sync_io::Event_wait_func &&ev_wait_func)
See Native_socket_stream counterpart.
bool start_send_blob_ops(util::sync_io::Event_wait_func &&ev_wait_func)
See Native_socket_stream counterpart.
uint16_t low_lvl_payload_blob_length_t
The type used to encode the meta-blob length; this puts a cap on how long the meta-blobs can be.
bool send_blob(const util::Blob_const &blob, Error_code *err_code)
See Native_socket_stream counterpart.
bool auto_ping(util::Fine_duration period)
See Native_socket_stream counterpart.
size_t snd_nb_write_low_lvl_payload(Native_handle hndl_or_null, const util::Blob_const &blob, Error_code *err_code)
Utility that sends non-empty blob, and (unless null) hndl_or_null associated with its 1st byte,...
size_t send_blob_max_size() const
See Native_socket_stream counterpart.
static const size_t & S_MAX_META_BLOB_LENGTH
The maximum length of a blob that can be sent by this protocol.
bool start_send_native_handle_ops(Event_wait_func_t &&ev_wait_func)
Implements Native_handle_sender API per contract.
size_t send_meta_blob_max_size() const
Implements Native_handle_sender API per contract.
bool send_native_handle(Native_handle hndl_or_null, const util::Blob_const &meta_blob, Error_code *err_code=0)
Implements Native_handle_sender API per contract.
flow::log::Logger * get_logger() const
Returns logger (possibly null).
size_t nb_write_some_with_native_handle(flow::log::Logger *logger_ptr, Peer_socket *peer_socket_ptr, Native_handle payload_hndl, const util::Blob_const &payload_blob, Error_code *err_code)
boost.asio extension similar to peer_socket->non_blocking(true); auto n = peer_socket->write_some(pay...
void async_write_with_native_handle(flow::log::Logger *logger_ptr, Peer_socket *peer_socket_ptr, Native_handle payload_hndl, const util::Blob_const &payload_blob, Task_err &&on_sent_or_error)
boost.asio extension similar to boost::asio::async_write(Peer_socket&, Blob_const,...
@ S_SENDS_FINISHED_CANNOT_SEND
Will not send message: local user already ended sending via API marking this.
@ S_SYNC_IO_WOULD_BLOCK
A sync_io operation could not immediately complete; it will complete contingent on active async-wait ...
@ S_INVALID_ARGUMENT
User called an API with 1 or more arguments against the API spec.
@ S_LOW_LVL_TRANSPORT_HOSED_CANNOT_SEND
Unable to send outgoing traffic: an earlier-reported, or at least logged, system error had hosed the ...
sync_io-pattern counterparts to async-I/O-pattern object types in parent namespace ipc::transport.
util::Native_handle Native_handle
Convenience alias for the commonly used type util::Native_handle.
Function< void(Asio_waitable_native_handle *hndl_of_interest, bool ev_of_interest_snd_else_rcv, Task_ptr &&on_active_ev_func)> Event_wait_func
In sync_io pattern, concrete type storing user-supplied function invoked by pattern-implementing ipc:...
flow::async::Task Task
Short-hand for polymorphic function (a-la std::function<>) that takes no arguments and returns nothin...
Definition: util_fwd.hpp:122
flow::Fine_duration Fine_duration
Short-hand for Flow's Fine_duration.
Definition: util_fwd.hpp:117
boost::asio::const_buffer Blob_const
Short-hand for an immutable blob somewhere in memory, stored as exactly a void const * and a size_t.
Definition: util_fwd.hpp:134
flow::Error_code Error_code
Short-hand for flow::Error_code which is very common.
Definition: common.hpp:298
A monolayer-thin wrapper around a native handle, a/k/a descriptor a/k/a FD.
bool null() const
Returns true if and only if m_native_handle equals S_NULL_HANDLE.