Flow-IPC 1.0.2
Flow-IPC project: Full implementation reference.
native_socket_stream_impl_snd.cpp
Go to the documentation of this file.
1/* Flow-IPC: Core
2 * Copyright 2023 Akamai Technologies, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the
5 * "License"); you may not use this file except in
6 * compliance with the License. You may obtain a copy
7 * of the License at
8 *
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in
12 * writing, software distributed under the License is
13 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
14 * CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing
16 * permissions and limitations under the License. */
17
18/// @file
21#include <boost/move/make_unique.hpp>
22
24{
25
26// Native_socket_stream::Impl implementations (::snd_*() and send-API methods only).
27
29{
30 using util::Blob_const;
31
32 if (!start_ops<Op::S_SND>(std::move(ev_wait_func)))
33 {
34 return false;
35 }
36 // else
37
38 const auto protocol_ver_to_send_if_needed = m_protocol_negotiator.local_max_proto_ver_for_sending();
39 if (protocol_ver_to_send_if_needed != Protocol_negotiator::S_VER_UNKNOWN)
40 {
42 && "Protocol_negotiator not properly marking the once-only sending-out of protocol version?");
43
44 assert((!m_snd_pending_err_code) && "We should be the first send-related transmission code possible.");
45
46 /* As discussed in m_protocol_negotiator doc header and class doc header "Protocol negotiation" section:
47 * send a special as-if-send_blob()-user-message: no Native_handle; meta-blob = sized
48 * to sizeof(protocol_ver_to_send_if_needed), containing protocol_ver_to_send_if_needed.
49 * By the way m_protocol_negotiator logged about the fact we're about to send it, so we can be pretty quiet.
50 *
51 * The mechanics here are very similar to how send_native_handle() invokes snd_sync_write_or_q_payload().
52 * Keeping comments light, except where something different applies (as of this writing that's just: the
53 * meaning of snd_sync_write_or_q_payload() return value). */
54
55 const auto fake_meta_length_raw = low_lvl_payload_blob_length_t(protocol_ver_to_send_if_needed);
56 const Blob_const payload_blob(&fake_meta_length_raw, sizeof(fake_meta_length_raw));
57
58 FLOW_LOG_TRACE("Socket stream [" << *this << "]: Want to send protocol-negotiation info. "
59 "About to send payload 1 of 1; "
60 "contains low-level blob of size [" << payload_blob.size() << "] "
61 "located @ [" << payload_blob.data() << "].");
62
63 snd_sync_write_or_q_payload({}, payload_blob, false);
64 /* m_send_pending_err_code may have become truthy; just means next send_*()/whatever will emit that error.
65 *
66 * Otherwise: Either it inline-sent it (very likely), or it got queued.
67 * Either way: no error; let's get on with queuing-or-sending real stuff like send_*() payloads.
68 * P.S. There's only 1 protocol version as of this writing, so there's no ambiguity, and we can just get on with
69 * sending stuff right away. This could change in the future. See m_protocol_negotiator doc header for more. */
70 }
71 else
72 {
73 FLOW_LOG_TRACE("Socket stream [" << *this << "]: Wanted to send protocol-negotiation info; "
74 "but we've marked it as already-sent, even though we are in start_*_ops() in PEER state. "
75 "Probably we come from a .release()d Native_socket_stream which has already done it; cool.");
76
77 }
78 // else { Corner case... we come from a .release()d guy. See Impl::reset_sync_io_setup(). Already sent it. }
79
80 return true;
81} // Native_socket_stream::Impl::start_send_native_handle_ops()
82
84{
85 return start_send_native_handle_ops(std::move(ev_wait_func));
86}
87
89{
90 // Just use this in degraded fashion.
91 return send_native_handle(Native_handle(), blob, err_code);
92}
93
95 Error_code* err_code)
96{
98 using util::Blob_const;
99 using flow::util::buffers_dump_string;
100 using boost::chrono::round;
101 using boost::chrono::milliseconds;
102
103 FLOW_ERROR_EXEC_AND_THROW_ON_ERROR(bool, Native_socket_stream::Impl::send_native_handle,
104 hndl_or_null, flow::util::bind_ns::cref(meta_blob), _1);
105 // ^-- Call ourselves and return if err_code is null. If got to present line, err_code is not null.
106
107 // We comment liberally, but tactically, inline; but please read the strategy in the class doc header's impl section.
108
109 if ((!op_started<Op::S_SND>("send_native_handle()")) || (!state_peer("send_native_handle()")))
110 {
111 err_code->clear();
112 return false;
113 }
114 // else
115
116 const size_t meta_size = meta_blob.size();
117 assert(((!hndl_or_null.null()) || (meta_size != 0))
118 && "Native_socket_stream::send_blob() blob must have length 1+; "
119 "Native_socket_stream::send_native_handle() must have same or non-null hndl_or_null or both.");
120
121 FLOW_LOG_TRACE("Socket stream [" << *this << "]: Will send handle [" << hndl_or_null << "] with "
122 "meta-blob of size [" << meta_blob.size() << "].");
123 if (meta_size != 0)
124 {
125 // Verbose and slow (100% skipped unless log filter passes).
126 FLOW_LOG_DATA("Socket stream [" << *this << "]: Meta-blob contents are "
127 "[\n" << buffers_dump_string(meta_blob, " ") << "].");
128 }
129
130 if (m_snd_finished)
131 {
132 /* If they called *end_sending() before, then by definition (see doc header impl discussion)
133 * any future send attempt is to be ignored with this error. Even though previously queued stuff can and should
134 * keep being sent, once that's done this clause will prevent any more from being initiated.
135 *
136 * Corner case: If those queued sends indeed exist (are ongoing) then user should have a way of knowing when it's
137 * done. That isn't the case for regular send_native_handle() calls which are silently queued up as-needed,
138 * which is why I mention it here. So that's why in *end_sending() there's a way for
139 * user to be informed (via sync callback) when everything has been sent through, or if an error stops it
140 * from happening. None of our business here though: we just refuse to do anything and emit this error. */
142 // Note that this clause will always be reached subsequently also.
143 }
144 else if (meta_blob.size() > S_MAX_META_BLOB_LENGTH)
145 {
147 FLOW_LOG_WARNING("Socket stream [" << *this << "]: Send: User argument length [" << meta_blob.size() << "] "
148 "exceeds limit [" << S_MAX_META_BLOB_LENGTH << "].");
149 // More WARNING logging below; just wanted to show those particular details also.
150 }
151 else if (m_snd_pending_err_code) // && (!m_snd_finished) && (meta_blob.size() OK)
152 {
153 /* This --^ holds either the last inline-completed send_native_handle() call's emitted Error_code, or (rarely) one
154 * that was found while attempting to dequeue previously-would-blocked queued-up (due to incomplete s_n_h())
155 * payload(s). This may seem odd, but that's part of the design of the send interface which we wanted to be
156 * as close to looking like a series of synchronous always-inline-completed send_*() calls as humanly possible,
157 * especially given that 99.9999% of the time that will indeed occur given proper behavior by the opposing
158 * receiver. */
159
160 FLOW_LOG_INFO("Socket stream [" << *this << "]: An error was detected earlier and saved for any subsequent "
161 "send attempts like this. Will not proceed with send. More info in WARNING below.");
162 *err_code = m_snd_pending_err_code;
163 }
164 else // if (!m_snd_finished) && (meta_blob.size() OK) && (!m_snd_pending_err_code)
165 {
166 /* As seen in protocol definition in class doc header, payload 1 contains handle, if any, and the length of
167 * the blob in payload 2 (or 0 if no payload 2). Set up the meta-length thing on the stack before entering
168 * critical section. Endianness stays constant on the machine, so don't worry about that.
169 * @todo Actually it would be (1) more forward-compatible and (2) consistent to do the same as how
170 * the structured layer encodes UUIDs -- mandating a pre-send conversion native->little-endian, post-send
171 * conversion backwards. The forward-compatibility is for when this mechanism is expanded to inter-machine IPC;
172 * while noting that the conversion is actually a no-op given our known hardware, so no real perf penalty. */
173 const auto meta_length_raw = low_lvl_payload_blob_length_t(meta_size);
174 // ^-- must stay on stack while snd_sync_write_or_q_payload() executes, and it will. Will be copied if must queue.
175 const Blob_const meta_length_blob(&meta_length_raw, sizeof(meta_length_raw));
176
177 // Payload 2, if any, consists of stuff we already have ready, namely simply meta_blob itself. Nothing to do now.
178
179 // Little helper for below; handles each `(handle or none, blob)` payload. Sets m_snd_pending_err_code.
180 const auto send_low_lvl_payload
181 = [&](unsigned int idx, Native_handle payload_hndl, const Blob_const& payload_blob)
182 {
183 FLOW_LOG_TRACE("Socket stream [" << *this << "]: Wanted to send handle [" << hndl_or_null << "] with "
184 "meta-blob of size [" << meta_blob.size() << "]. "
185 "About to send payload index [" << idx << "] of "
186 "[" << ((meta_length_raw == 0) ? 1 : 2) << "] new low-level payloads; "
187 "includes handle [" << payload_hndl << "] and "
188 "low-level blob of size [" << payload_blob.size() << "] "
189 "located @ [" << payload_blob.data() << "].");
190
191 /* As the name indicates this may synchronously finish it or queue up any parts instead to be done once
192 * a would-block clears, when user informs of this past the present function's return. */
193 snd_sync_write_or_q_payload(payload_hndl, payload_blob, false);
194 /* That may have returned `true` indicating everything (up to and including our payload) was synchronously
195 * given to kernel successfuly; or this will never occur, because outgoing-pipe-ending error was encountered.
196 * Since this is send_native_handle(), we do not care: there is no on-done
197 * callback to invoke, as m_snd_finished is false, as *end_sending() has not been called yet. */
198 }; // const auto send_low_lvl_payload =
199
200 /* Send-or-queue each payload of which we spoke above. There's a very high chance all of this is done inline;
201 * but there's a small chance either there's either stuff queued already (we've delegated waiting for would-block
202 * to clear to user), or not but this can't fully complete (encountered would-block). We don't care here per
203 * se; I am just saying for context, to clarify what "send-or-queue" means. */
204 send_low_lvl_payload(1, hndl_or_null, meta_length_blob); // It sets m_snd_pending_err_code.
205 if ((meta_length_raw != 0) && (!m_snd_pending_err_code))
206 {
207 send_low_lvl_payload(2, Native_handle(), meta_blob); // It sets m_snd_pending_err_code.
208 }
209
210 *err_code = m_snd_pending_err_code; // Emit the new error.
211
212 // Did either thing generate a new error?
213 if (*err_code)
214 {
215 FLOW_LOG_TRACE("Socket stream [" << *this << "]: Wanted to send user message but detected error "
216 "synchronously. "
217 "Error code details follow: [" << *err_code << "] [" << err_code->message() << "]. "
218 "Saved error code to return in next user send attempt if any, after this attempt also "
219 "returns that error code synchronously first.");
220 }
221 else if (m_snd_auto_ping_period != Fine_duration::zero()) // && (!*err_code)
222 {
223 /* Send requested, and there was no error; that represents non-idleness. If auto_ping() has been called
224 * (the feature is engaged), idleness shall occur at worst in m_snd_auto_ping_period; hence reschedule
225 * snd_on_ev_auto_ping_now_timer_fired(). */
226
227 const size_t n_canceled = m_snd_auto_ping_timer.expires_after(m_snd_auto_ping_period);
228
229 FLOW_LOG_TRACE("Socket stream [" << *this << "]: Send request from user; hence rescheduled "
230 "auto-ping to occur in "
231 "[" << round<milliseconds>(m_snd_auto_ping_period) << "] (will re-reschedule "
232 "again upon any other outgoing traffic that might be requested before then). As a result "
233 "[" << n_canceled << "] previously scheduled auto-pings have been canceled; 1 is most likely; "
234 "0 means an auto-ping is *just* about to fire (we lost the race -- which is fine).");
235 if (n_canceled == 1)
236 {
237 /* m_timer_worker will m_snd_auto_ping_timer.async_wait(F), where F() will signal through pipe,
238 * making *m_snd_auto_ping_timer_fired_peer readable. We've already used m_snd_ev_wait_func() to start
239 * wait on it being readable and invoke snd_on_ev_auto_ping_now_timer_fired() in that case; but we've
240 * canceled the previous .async_wait() that would make it readable; so just redo that part. */
241 m_timer_worker.timer_async_wait(&m_snd_auto_ping_timer, m_snd_auto_ping_timer_fired_peer);
242 }
243 else
244 {
245 assert((n_canceled == 0) && "We only invoke one timer async_wait() at a time.");
246
247 /* Too late to cancel snd_on_ev_auto_ping_now_timer_fired(), so it'll just schedule next one itself.
248 * Note that in practice the effect is about the same. */
249 }
250 } // else if (m_snd_auto_ping_period != zero) && (!*err_code)
251 // else if (m_snd_auto_ping_period == zero) && (!*err_code) { Auto-ping feature not engaged. }
252 } /* else if (!m_snd_finished) && (meta_blob.size() OK) && (!m_snd_pending_err_code)
253 * (but m_snd_pending_err_code may have become truthy inside) */
254
255 if (*err_code)
256 {
257 // At the end try to categorize nature of error.
258 FLOW_LOG_WARNING("Socket stream [" << *this << "]: Wanted to send handle [" << hndl_or_null << "] with "
259 "meta-blob of size [" << meta_blob.size() << "], but an error (not necessarily new error) "
260 "encountered on pipe or in user API args. Error code details follow: "
261 "[" << *err_code << "] [" << err_code->message() << "]; "
262 "pipe hosed (sys/protocol error)? = "
263 "[" << ((*err_code != error::Code::S_INVALID_ARGUMENT)
264 && (*err_code != error::Code::S_SENDS_FINISHED_CANNOT_SEND)) << "]; "
265 "sending disabled by user? = "
266 "[" << (*err_code == error::Code::S_SENDS_FINISHED_CANNOT_SEND) << "].");
267 }
268
269 return true;
270} // Native_socket_stream::Impl::send_native_handle()
271
273{
274 return async_end_sending_impl(nullptr, flow::async::Task_asio_err());
275}
276
278 flow::async::Task_asio_err&& on_done_func)
279{
280 Error_code sync_err_code;
281 // This guy either takes null/.empty(), or non-null/non-.empty(): it doesn't do the Flow-style error emission itself.
282 const bool ok = async_end_sending_impl(&sync_err_code, std::move(on_done_func));
283
284 if (!ok)
285 {
286 return false; // False start.
287 }
288 // else
289
290 // Standard error-reporting semantics.
291 if ((!sync_err_code_ptr) && sync_err_code)
292 {
293 throw flow::error::Runtime_error(sync_err_code, "Native_socket_stream::Impl::async_end_sending()");
294 }
295 // else
296 sync_err_code_ptr && (*sync_err_code_ptr = sync_err_code);
297 // And if (!sync_err_code_ptr) + no error => no throw.
298
299 return true;
300} // Native_socket_stream::Impl::async_end_sending()
301
303 flow::async::Task_asio_err&& on_done_func_or_empty)
304{
305 using util::Blob_const;
306 using flow::async::Task_asio_err;
307
308 assert(bool(sync_err_code_ptr_or_null) == (!on_done_func_or_empty.empty()));
309
310 // We comment liberally, but tactically, inline; but please read the strategy in the class doc header's impl section.
311
312 if ((!op_started<Op::S_SND>("async_end_sending()")) || (!state_peer("async_end_sending()")))
313 {
314 return false;
315 }
316 // else
317
318 /* Went back and forth on this semantic. Choices, if m_snd_finished==true already, were:
319 * -1- Just return. Never call on_done_func_or_empty() either.
320 * -2- on_done_func_or_empty().
321 * -a- Could pass success Error_code.
322 * -b- Could pass some new special Error_code for doubly-ending sends.
323 * -3- Just assert(false) -- undefined behavior.
324 * -4- Just return... but return false or something.
325 *
326 * 1 is out, because then they might be expecting it to be called, and it's never called; and since they can't solve
327 * the halting problem, they could never know that it won't be called; of course they shouldn't be calling us in the
328 * first place... but if they DID call us then presumably it's because it was a mistake, or they want to find out.
329 * In the latter case, they're messed over; so 1 is out.
330 *
331 * (Note there is NO other way m_snd_finished becomes true.)
332 *
333 * 2b seems annoying -- an entirely new error code for something that's most likely an easily avoidable mistake
334 * (though could reuse S_SENDS_FINISHED_CANNOT_SEND or S_INVALID_ARGUMENT...); and if they're trying to determine
335 * *if* they'd already called it, then doing it via async handler is annoying from user's PoV and much better done
336 * with an accessor synchronously. Most importantly it breaks the normal pattern, wherein asynchronously reported
337 * errors are asynchronously encountered (system) conditions, which this isn't; it's just meh. Not awful but meh.
338 *
339 * 2a is pretty good for the user. Though it won't indicate there was a problem... but on the other hand who cares?
340 * However, internally it creates another async flow which would require some reasoning to ensure it doesn't interact
341 * in some way with the rest of the outgoing direction (and incoming for that matter). It'd be annoying to have
342 * to think hard for such a dinky scenario.
343 *
344 * 3 is pretty good. Implementation-wise it's by far the easiest. Usage-wise, the user just needs to not make the
345 * obvious error of calling it twice. This can be done with their own flag if desired. This seems sufficient, though
346 * through experience we may determine otherwise ultimately which would require an API change, and that'd suck a bit.
347 *
348 * 4 is pretty good. The interface is a little less clean, but if the user wouldn't cause the assert() in 3, then
349 * they can exactly equally ignore the return value in 4 or assert() on it themselves. It also allows the user to
350 * detect the mistake easily.
351 *
352 * So 3 and 4 seem the best, and 4 is more robust at the cost of a small diff in interface complexity. */
353 if (m_snd_finished)
354 {
355 FLOW_LOG_WARNING("Socket stream [" << *this << "]: User wants to end sending, but we're in sends-finished "
356 "state already. Ignoring.");
357 return false;
358 }
359 // else
360 m_snd_finished = true; // Cause future send_native_handle() to emit S_SENDS_FINISHED_CANNOT_SEND and return.
361
362 bool qd; // Set to false to report results *now*: basically true <=> stuff is still queued to send.
363 if (m_snd_pending_err_code)
364 {
365 qd = false; // There was outgoing-pipe-ending error detected before us; so should immediately report.
366 }
367 else
368 {
369 // Save this to emit once everything (including the thing we just made) has been sent off. Or not if empty.
370 assert(m_snd_pending_on_last_send_done_func_or_empty.empty());
371 m_snd_pending_on_last_send_done_func_or_empty = std::move(on_done_func_or_empty);
372 // on_done_func_or_empty is potentially hosed now.
373
374 /* Prepare/send the payload per aforementioned (class doc header) strategy: no handle, and a 0x0000 integer.
375 * Keeping comments light, as this is essentially a much simplified version of send_native_handle(). */
376
377 const auto ZERO_SIZE_RAW = low_lvl_payload_blob_length_t(0);
378 const Blob_const blob_with_0(&ZERO_SIZE_RAW, sizeof(ZERO_SIZE_RAW));
379
380 /* snd_sync_write_or_q_payload():
381 * Returns true => out-queue flushed successfully; or error detected.
382 * => report synchronously now. (Unless they don't care about any such report.)
383 * Returns false => out-queue has stuff in it and will continue to, until transport is writable.
384 * => cannot report completion yet. */
385
386 qd = !snd_sync_write_or_q_payload(Native_handle(), blob_with_0, false);
387 if (qd && sync_err_code_ptr_or_null)
388 {
389 /* It has not been flushed (we will return would-block).
390 * Save this to emit once everything (including the thing we just made) has been sent off, since
391 * they care about completion (sync_err_code_ptr_or_null not null). */
392 assert(m_snd_pending_on_last_send_done_func_or_empty.empty());
393 m_snd_pending_on_last_send_done_func_or_empty = std::move(on_done_func_or_empty);
394 // on_done_func_or_empty is potentially hosed now.
395 }
396 /* else if (qd && (!sync_err_code_ptr_or_null))
397 * { on_done_func_or_empty is .empty() anyway. Anyway they don't care about emitting result. Done:
398 * It'll be async-sent when/if possible. }
399 * else if (!qd)
400 * { All flushed synchronously. We will emit it synchronously, if they're interested in that. } */
401 } // if (!m_snd_pending_err_code) (but it may have become truthy inside)
402
403 // Log the error, if any; report the result synchronously if applicable.
404
405 if (m_snd_pending_err_code)
406 {
407 FLOW_LOG_WARNING("Socket stream [" << *this << "]: User wanted to end sending, but an error (not necessarily "
408 "new error) encountered on pipe synchronously when trying to send graceful-close. "
409 "Nevertheless locally sends-finished state is now active. Will report completion via "
410 "sync-args (if any). Error code details follow: "
411 "[" << m_snd_pending_err_code << "] [" << m_snd_pending_err_code.message() << "].");
412 assert(!qd);
413 }
414 else if (qd)
415 {
416 FLOW_LOG_INFO("Socket stream [" << *this << "]: User wanted to end sending. Success so far but out-queue "
417 "has payloads -- at least the graceful-close payload -- still pending while waiting for "
418 "writability. Locally sends-finished state is now active, and the other side will be informed "
419 "of this barring subsequent system errors. "
420 "We cannot report completion via sync-args (if any).");
421 }
422 else // if ((!m_snd_pending_err_code) && (!qd))
423 {
424 FLOW_LOG_INFO("Socket stream [" << *this << "]: User wanted to end sending. Immediate success: out-queue "
425 "flushed permanently. "
426 "Locally sends-finished state is now active, and the other side will be informed of this. "
427 "Locally will report completion via sync-args (if any).");
428 }
429
430 if (sync_err_code_ptr_or_null)
431 {
432 *sync_err_code_ptr_or_null = qd ? error::Code::S_SYNC_IO_WOULD_BLOCK
433 : m_snd_pending_err_code; // Could be falsy (probably is usually).
434 }
435 // else { Don't care about completion. }
436
437 return true;
438} // Native_socket_stream::Impl::async_end_sending_impl()
439
441{
442 using util::Blob_const;
444 using util::Task;
445 using boost::chrono::round;
446 using boost::chrono::milliseconds;
447
448 if ((!op_started<Op::S_SND>("auto_ping()")) || (!state_peer("auto_ping()")))
449 {
450 return false;
451 }
452 // else
453
454 assert(period.count() > 0);
455
456 if (m_snd_finished)
457 {
458 FLOW_LOG_WARNING("Socket stream [" << *this << "]: User wants to start auto-pings, but we're in "
459 "sends-finished state already. Ignoring.");
460 return false;
461 }
462 // else
463
464 /* By concept requirements we are to do 2 things: send an auto-ping now as a baseline; and then make it so
465 * *some* message (auto-ping or otherwise) is sent at least every `period` until *end_sending() or error. */
466
467 /* Prepare the payload per class doc header strategy: no handle, and a 0xFFFF... integer.
468 * Keeping comments somewhat light, as this is essentially a much simplified version of send_native_handle()
469 * and is very similar to what async_end_sending() does in this spot. */
470
471 if (m_snd_auto_ping_period != Fine_duration::zero())
472 {
473 FLOW_LOG_WARNING("Socket stream [" << *this << "]: User wants to start auto-pings, but this "
474 "has already been engaged earlier. Ignoring.");
475 return false;
476 }
477 // else
478 m_snd_auto_ping_period = period; // Remember this, both as flag (non-zero()) and to know how often to reschedule it.
479
480 FLOW_LOG_INFO("Socket stream [" << *this << "]: User wants to start auto-pings so that there are "
481 "outgoing messages at least as frequently as every "
482 "[" << round<milliseconds>(m_snd_auto_ping_period) << "]. Sending baseline auto-ping and scheduling "
483 "first subsequent auto-ping; it may be rescheduled if more user traffic occurs before then.");
484
485 if (m_snd_pending_err_code)
486 {
487 /* Concept does not require us to report any error via auto_ping() itself. It's for receiver's benefit anyway.
488 * The local user will discover it, assuming they have interest, via the next send_*() or *end_sending(). */
489 FLOW_LOG_WARNING("Socket stream [" << *this << "]: User wanted to start auto-pings, but an error was "
490 "previously encountered on pipe; so will not auto-ping. "
491 "Error code details follow: [" << m_snd_pending_err_code << "] "
492 "[" << m_snd_pending_err_code.message() << "].");
493 return true;
494 }
495 // else
496
497 const Blob_const blob_with_ff(&S_META_BLOB_LENGTH_PING_SENTINEL, sizeof(S_META_BLOB_LENGTH_PING_SENTINEL));
498
499 /* Important: avoid_qing=true for reasons explained in its doc header. Namely:
500 * If blob_with_ff would-block entirely, then there are already data that would signal-non-idleness sitting
501 * in the kernel buffer, so the auto-ping can be safely dropped in that case. */
502 snd_sync_write_or_q_payload(Native_handle(), blob_with_ff, true);
503 if (m_snd_pending_err_code)
504 {
505 FLOW_LOG_WARNING("Socket stream [" << *this << "]: Wanted to send initial auto-ping but detected error "
506 "synchronously. "
507 "Error code details follow: [" << m_snd_pending_err_code << "] "
508 "[" << m_snd_pending_err_code.message() << "]. "
509 "Saved error code to return in next user send attempt if any; otherwise ignoring; "
510 "will not schedule periodic auto-pings.");
511 return true;
512 }
513 // else
514
515 // Initial auto-ping partially- or fully-sent fine (anything unsent was queued). Now schedule next one per above.
516
517 /* Now we can schedule it, similarly to send_native_handle() and snd_on_ev_auto_ping_now_timer_fired() itself.
518 * Recall that we can't simply .async_wait(F) on some timer and have the desired code --
519 * snd_on_ev_auto_ping_now_timer_fired() which sends the ping -- be the handler F. We have to use the sync_io
520 * pattern, where the user waits on events for us... but there's no timer FD, so we do it in the separate thread and
521 * then ping via a pipe. (This is all discussed elsewhere, but yes, timerfd_*() is available; the boost.asio
522 * timer API is much nicer however -- and while it appears to use timerfd_*() at least optionally, this
523 * functionality is not exposed via .native_handle() or something. But I digress!!! Discussed elsewhere.) */
524
525 m_snd_ev_wait_func(&m_snd_ev_wait_hndl_auto_ping_timer_fired_peer,
526 false, // Wait for read.
527 // Once readable do this: pop pipe; send ping; schedule again.
528 boost::make_shared<Task>
529 ([this]() { snd_on_ev_auto_ping_now_timer_fired(); }));
530 /* Reminder: That has no effect (other than user recording stuff) until this method returns.
531 * So it cannot execute concurrently or anything. They'd need to do their poll()/epoll_wait() or do so
532 * indirectly by returning from the present boost.asio task (if they're running a boost.asio event loop). */
533
534 /* Set up the actual timer. The second call really does m_snd_auto_ping_timer.async_wait().
535 * Note that could fire immediately, even concurrently (if m_snd_auto_ping_period is somehow insanely short,
536 * and the timer resolution is amazing)... but it would only cause snd_on_ev_auto_ping_now_timer_fired()
537 * once they detect the pipe-readable event, which (again) can only happen after we return. */
538 m_snd_auto_ping_timer.expires_after(m_snd_auto_ping_period);
539 m_timer_worker.timer_async_wait(&m_snd_auto_ping_timer, m_snd_auto_ping_timer_fired_peer);
540
541 return true;
542} // Native_socket_stream::Impl::auto_ping()
543
545{
546 using util::Blob_const;
547 using util::Task;
548
549 /* This is an event handler! Specifically for the *m_snd_auto_ping_timer_fired_peer pipe reader being
550 * readable. To avoid infinite-loopiness, we'd best pop the thing that was written there. */
551 m_timer_worker.consume_timer_firing_signal(m_snd_auto_ping_timer_fired_peer);
552
553 // Now do the auto-ping itself.
554
555 if (m_snd_pending_err_code)
556 {
557 /* Concept does not require us to report any error via auto_ping() itself. It's for receiver's benefit anyway.
558 * The local user will discover it, assuming they have interest, via the next send_*() or *end_sending(). */
559 FLOW_LOG_WARNING("Socket stream [" << *this << "]: Auto-ping timer fired, but an error was "
560 "previously encountered in 2-way pipe; so will neither auto-ping nor schedule next auto-ping. "
561 "Error code details follow: [" << m_snd_pending_err_code << "] "
562 "[" << m_snd_pending_err_code.message() << "].");
563 return;
564 }
565 // else
566
567 if (m_snd_finished)
568 {
569 // This is liable to be quite common and not of much interest at the INFO level; though it's not that verbose.
570 FLOW_LOG_TRACE("Socket stream [" << *this << "]: "
571 "Auto-ping timer fired; but graceful-close API earlier instructed us to no-op. No-op.");
572 return;
573 }
574 // else
575
576 // This may be of some interest sufficient for INFO. @todo Reconsider due to non-trivial verbosity possibly.
577 FLOW_LOG_INFO("Socket stream [" << *this << "]: "
578 "Auto-ping timer fired; sending/queueing auto-ping; scheduling for next time; it may be "
579 "rescheduled if more user traffic occurs before then.");
580
581 // The next code is similar to the initial auto_ping(). Keeping comments light.
582
583 const Blob_const blob_with_ff{&S_META_BLOB_LENGTH_PING_SENTINEL, sizeof(S_META_BLOB_LENGTH_PING_SENTINEL)};
584
585 snd_sync_write_or_q_payload(Native_handle(), blob_with_ff, true);
586 if (m_snd_pending_err_code)
587 {
588 FLOW_LOG_WARNING("Socket stream [" << *this << "]: Wanted to send non-initial auto-ping but detected error "
589 "synchronously. "
590 "Error code details follow: [" << m_snd_pending_err_code << "] "
591 "[" << m_snd_pending_err_code.message() << "]. "
592 "Saved error code to return in next user send attempt if any; otherwise ignoring; "
593 "will not continue scheduling periodic auto-pings.");
594 return;
595 }
596 // else
597
598 m_snd_ev_wait_func(&m_snd_ev_wait_hndl_auto_ping_timer_fired_peer,
599 false, // Wait for read.
600 boost::make_shared<Task>
601 ([this]() { snd_on_ev_auto_ping_now_timer_fired(); }));
602 m_snd_auto_ping_timer.expires_after(m_snd_auto_ping_period);
603 m_timer_worker.timer_async_wait(&m_snd_auto_ping_timer, m_snd_auto_ping_timer_fired_peer);
604} // Native_socket_stream::Impl::snd_on_ev_auto_ping_now_timer_fired()
605
607 const util::Blob_const& orig_blob, bool avoid_qing)
608{
609 using flow::util::Blob;
610 using util::Blob_const;
611
612 // We comment liberally, but tactically, inline; but please read the strategy in the class doc header's impl section.
613
614 assert((!m_snd_pending_err_code) && "Pipe must not be pre-hosed by contract.");
615
616 size_t n_sent_or_zero;
617 if (m_snd_pending_payloads_q.empty())
618 {
619 FLOW_LOG_TRACE("Socket stream [" << *this << "]: Want to send low-level payload: "
620 "handle [" << hndl_or_null << "] with blob of size [" << orig_blob.size() << "] "
621 "located @ [" << orig_blob.data() << "]; no write is pending so proceeding immediately. "
622 "Will drop if all of it would-block? = [" << avoid_qing << "].");
623
624 n_sent_or_zero = snd_nb_write_low_lvl_payload(hndl_or_null, orig_blob, &m_snd_pending_err_code);
625 if (m_snd_pending_err_code) // It will *not* emit would-block (will just return 0 but no error).
626 {
627 assert(n_sent_or_zero == 0);
628 return true; // Pipe-direction-ending error encountered; outgoing-direction pipe is finished forevermore.
629 }
630 // else
631
632 if (n_sent_or_zero == orig_blob.size())
633 {
634 // Awesome: Mainstream case: We wrote the whole thing synchronously.
635 return true; // Outgoing-direction pipe flushed.
636 // ^-- No error. Logged about success in snd_nb_write_low_lvl_payload().
637 }
638 // else if (n_sent_or_zero < orig_blob.size()) { Fall through. n_sent_or_zero is significant. }
639 } // if (m_snd_pending_payloads_q.empty())
640 else // if (!m_snd_pending_payloads_q.empty())
641 {
642 // Other stuff is currently being asynchronously sent, so we can only queue our payload behind all that.
643 n_sent_or_zero = 0;
644 }
645
646 /* At this point some or all of the payload could not be sent (either because another async write-op is in progress,
647 * or not but it would-block if we tried to send the rest now).
648 * Per algorithm, we shall now have to queue it up to be sent (and if nothing is currently pending begin
649 * asynchronously sending it ASAP). The question now is what is "it" exactly: it needs to be exactly the payload
650 * except the parts that *were* just sent (if any).
651 *
652 * avoid_qing==true, as of this writing used for auto-pings only, affects
653 * the above as follows: If *all* of orig_blob would need to be queued (queue was already non-empty, or it
654 * was empty, and snd_nb_write_low_lvl_payload() yielded would-block for *all* of orig_blob.size()), then:
655 * simply pretend like it was sent fine; and continue like nothing happened. (See our doc header for
656 * rationale.) */
657
658 if (avoid_qing)
659 {
660 assert(hndl_or_null.null()
661 && "Internal bug? Do not ask to drop a payload with a native handle inside under any circumstances.");
662 if (n_sent_or_zero == 0)
663 {
664 /* This would happen at most every few sec (with auto-pings) and is definitely a rather interesting
665 * situation though not an error; INFO is suitable. */
666 const auto q_size = m_snd_pending_payloads_q.size();
667 FLOW_LOG_INFO("Socket stream [" << *this << "]: Wanted to send low-level payload: "
668 "blob of size [" << orig_blob.size() << "] located @ [" << orig_blob.data() << "]; "
669 "result was would-block for all of its bytes (either because blocked-queue was non-empty "
670 "already, or it was empty, but all of payload's bytes would-block at this time). "
671 "Therefore dropping payload (done for auto-pings at least). Out-queue size remains "
672 "[" << q_size << "].");
673
674 /* We won't enqueue it, so there's nothing more to do, but careful in deciding what to return:
675 * If the queue is empty, we promised we would return true. If the queue is not empty, we promised
676 * we would return false. Whether that's what we should do is subtly questionable, but as of this
677 * writing what we return when avoid_qing==true is immaterial (is ignored). */
678 return q_size == 0;
679 }
680 // else if (n_sent_or_zero > 0) (but not == orig_blob.size())
681
682 // This is even more interesting; definitely INFO as well.
683 FLOW_LOG_INFO("Socket stream [" << *this << "]: Wanted to send low-level payload: "
684 "blob of size [" << orig_blob.size() << "] located @ [" << orig_blob.data() << "]; "
685 "result was would-block for all but [" << n_sent_or_zero << "] of its bytes (blocked-queue "
686 "was empty, so nb-send was attmpted, and some -- but not all -- of payload's bytes "
687 "would-block at this time). We cannot \"get back\" the sent bytes and thus are forced "
688 "to queue the remaining ones (would have dropped payload if all the bytes would-block).");
689 // Fall-through.
690 } // if (avoid_qing)
691 // else if (!avoid_qing) { Fall through. }
692
693 auto new_low_lvl_payload = boost::movelib::make_unique<Snd_low_lvl_payload>();
694 if (n_sent_or_zero == 0)
695 {
696 new_low_lvl_payload->m_hndl_or_null = hndl_or_null;
697 }
698 // else { Leave it as null. Even if (!hndl_or_null.null()): 1+ bytes were sent OK => so was hndl_or_null. }
699
700 new_low_lvl_payload->m_blob = Blob(get_logger());
701
702 /* Allocate N bytes; copy N bytes from area referred to by new_blob. Start at 1st unsent byte (possibly 1st byte).
703 * This is the first and only place we copy the source blob (not counting the transmission into kernel buffer); we
704 * have tried our best to synchronously send all of `new_blob`, which would've avoided getting here and this copy.
705 * Now we have no choice. As discussed in the class doc header, probabilistically speaking we should rarely (if
706 * ever) get here (and do this annoying alloc, and copy, and later dealloc) under normal operation of both sides. */
707 const auto& new_blob = orig_blob + n_sent_or_zero;
708 new_low_lvl_payload->m_blob.assign_copy(new_blob);
709
710 FLOW_LOG_TRACE("Socket stream [" << *this << "]: Want to send pending-from-would-block low-level payload: "
711 "handle [" << new_low_lvl_payload->m_hndl_or_null << "] with "
712 "blob of size [" << new_low_lvl_payload->m_blob.size() << "] "
713 "located @ [" << new_blob.data() << "]; "
714 "created blob copy @ [" << new_low_lvl_payload->m_blob.const_buffer().data() << "]; "
715 "enqueued to out-queue which is now of size [" << (m_snd_pending_payloads_q.size() + 1) << "].");
716
717 m_snd_pending_payloads_q.emplace(std::move(new_low_lvl_payload)); // Push a new Snd_low_lvl_payload::Ptr.
718
719 if (m_snd_pending_payloads_q.size() == 1)
720 {
721 /* Queue was empty; now it isn't; so start the chain of async send head=>dequeue=>async send head=>dequeue=>....
722 * (In our case "async send head" means asking (via m_snd_ev_wait_func) user to inform (via callback we pass
723 * to m_snd_ev_wait_func) us when would-block has cleared, call snd_nb_write_low_lvl_payload() again....
724 * If we were operating directly as a boost.asio async loop then the first part would just be
725 * m_peer_socket->async_wait(); but we cannot do that; user does it for us, controlling what gets called when
726 * synchronously.) */
727 snd_async_write_q_head_payload();
728
729 /* That immediately failed => m_snd_pending_err_code is truthy
730 * => Pipe-direction-ending error encountered; outgoing-direction pipe is finished forevermore. => return true;
731 * That did not fail ("async"-wait for writable begins) => m_snd_pending_err_code is falsy
732 * => Outgoing-direction pipe has pending queued stuff. => return false; */
733 return bool(m_snd_pending_err_code);
734 }
735 // else
736 assert(!m_snd_pending_err_code);
737 return false; // Outgoing-direction pipe has (even more) pending queued stuff; nothing to do about it for now.
738} // Native_socket_stream::Impl::snd_sync_write_or_q_payload()
739
741 const util::Blob_const& blob, Error_code* err_code)
742{
743 using flow::util::Lock_guard;
745
746 // We comment liberally, but tactically, inline; but please read the strategy in the class doc header's impl section.
747
748 /* Result semantics (reminder of what we promised in contract):
749 * Partial or total success in non-blockingly sending blob+handle: >= 1; falsy *err_code.
750 * No success in sending blob+handle, because it would-block (not fatal): == 0; falsy *err_code.
751 * No success in sending blob+handle, because fatal error: == 0, truthy *err_code. */
752 size_t n_sent_or_zero = 0;
753
754 // m_*peer_socket = (the only) shared data between our- and opposite-direction code. Must lock (see class docs).
755 {
756 Lock_guard<decltype(m_peer_socket_mutex)> peer_socket_lock(m_peer_socket_mutex);
757
758 if (m_peer_socket)
759 {
760 if (hndl_or_null.null())
761 {
762 /* hndl_or_null is in fact null, so we can just use boost.asio's normal non-blocking send (non_blocking(true),
763 * write_some()). */
764
765 /* First set non-blocking mode. (Subtlety: We could use Linux extension per-call MSG_DONTWAIT flag instead;
766 * but this way is fully portable. For posterity: to do that, use m_peer_socket->send(), which is identical to
767 * write_some() but has an overload accepting flags to OS send() call, where one could supply MSG_DONTWAIT.
768 * boost.asio lacks a constant for it, but we could just use actual MSG_DONTWAIT; of course stylistically that's
769 * not as nice and suggests lesser portability.) */
770 if (!m_peer_socket->non_blocking()) // This is fast (it doesn't run system calls but uses a cached value).
771 {
772 FLOW_LOG_TRACE("Socket stream [" << *this << "]: Setting boost.asio peer socket non-blocking mode.");
773 m_peer_socket->non_blocking(true, *err_code); // Sets *err_code to success or the triggering error.
774 }
775 else // if (already non-blocking)
776 {
777 err_code->clear();
778 }
779
780 if (!*err_code)
781 {
782 assert(m_peer_socket->non_blocking());
783
784 FLOW_LOG_TRACE("Writing low-level blob directly via boost.asio (blob details logged above hopefully).");
785
786 n_sent_or_zero = m_peer_socket->write_some(blob, *err_code);
787 }
788 // else if (*err_code) { *err_code is truthy; n_sent_or_zero == 0; cool. }
789 } // if (hndl_or_null.null())
790 else // if (!hndl_or_null.null())
791 {
792 /* Per contract, nb_write_some_with_native_handle() is identical to setting non_blocking(true) and attempting
793 * write_some(orig_blob) -- except if it's able to send even 1 byte it'll also have sent through hndl_or_null.
794 * When we say identical we mean identical result semantics along with everything else. */
795 n_sent_or_zero = nb_write_some_with_native_handle(get_logger(), m_peer_socket.get(),
796 hndl_or_null, blob, err_code);
797 // That should have TRACE-logged stuff, so we won't (it's our function).
798 }
799
800 /* Almost home free; but our result semantics are a little different from the low-level-write functions'.
801 *
802 * Plus, if we just discovered the connection is hosed, do whatever's needed with that. */
803 assert(((!*err_code) && (n_sent_or_zero != 0))
804 || (*err_code && (n_sent_or_zero == 0)));
805 if (*err_code == boost::asio::error::would_block)
806 {
807 err_code->clear();
808 // *err_code is falsy; n_sent_or_zero == 0; cool.
809 }
810 else if (*err_code)
811 {
812 /* True-blue system error. Kill off *m_peer_socket (connection hosed). We could simply nullify it, which would
813 * give it back to the system (it's a resource), but see m_peer_socket_hosed doc header for explanation as
814 * to why we cannot, and why instead we transfer it to "death row" until dtor executes, at which
815 * point ::close(m_peer_socket_hosed->native_handle()) will actually happen. */
816
817 assert((!m_peer_socket_hosed) && "m_peer_socket_hosed must start as null and only become non-null once. Bug?");
818 m_peer_socket_hosed = std::move(m_peer_socket);
819 assert((!m_peer_socket) && "Shocking unique_ptr misbehavior!");
820
821 // *err_code is truthy; n_sent_or_zero == 0; cool.
822 }
823 // else if (!*err_code) { *err_code is falsy; n_sent_or_zero >= 1; cool. }
824 } // if (m_peer_socket)
825 else // if (!m_peer_socket)
826 {
828 }
829 } // Lock_guard peer_socket_lock(m_peer_socket_mutex)
830
831 assert((!*err_code)
832 || (n_sent_or_zero == 0)); // && *err_code
833
834 if (*err_code)
835 {
836 FLOW_LOG_TRACE("Sent nothing due to error [" << *err_code << "] [" << err_code->message() << "].");
837 }
838 else
839 {
840 FLOW_LOG_TRACE("Send: no error. Was able to send [" << n_sent_or_zero << "] of [" << blob.size() << "] bytes.");
841 if (!hndl_or_null.null())
842 {
843 FLOW_LOG_TRACE("Able to send the native handle? = [" << (n_sent_or_zero != 0) << "].");
844 }
845 }
846
847 return n_sent_or_zero;
848} // Native_socket_stream::Impl::snd_nb_write_low_lvl_payload()
849
851{
853 using util::Task;
854 using flow::util::Lock_guard;
855 using boost::asio::async_write;
856
857 // We comment liberally, but tactically, inline; but please read the strategy in the class doc header's impl section.
858
859 assert((!m_snd_pending_payloads_q.empty()) && "Contract is stuff is queued to be async-sent. Bug?");
860 assert((!m_snd_pending_err_code) && "Pipe must not be pre-hosed by contract.");
861
862 /* Conceptually we'd like to do m_peer_socket->async_wait(writable, F), where F() would perform
863 * snd_nb_write_low_lvl_payload() (nb-send over m_peer_socket). However this is the sync_io pattern, so
864 * the user will be performing the conceptual async_wait() for us. We must ask them to do so
865 * via m_snd_ev_wait_func(), giving them m_peer_socket's FD -- m_snd_ev_wait_hndl_peer_socket -- to wait-on. */
866
867 // m_*peer_socket = (the only) shared data between our- and opposite-direction code. Must lock (see class docs).
868 {
869 Lock_guard<decltype(m_peer_socket_mutex)> peer_socket_lock(m_peer_socket_mutex);
870
871 if (m_peer_socket)
872 {
873 /* @todo Previously, when we ran our own boost.asio loop in a separate thread mandatorily (non-sync_io pattern),
874 * we did either boost::asio::async_write() (stubborn write, no handle to send) or
875 * asio_local_stream_socket::async_write_with_native_handle (same but with handle to send).
876 * Either one would do the async_wait()->nb-write->async_wait()->... chain for us. Now with sync_io we
877 * manually split it up into our version of "async"-wait->nb-write->.... However other than the
878 * "async-wait" mechanism itself, the logic is the same. The to-do would be to perhaps provide
879 * those functions in such a way as to work with the sync_io way of "async"-waiting, without getting rid
880 * of the publicly useful non-sync_io API (some layered design, confined to
881 * asio_local_stream_socket::*). The logic would not need to be repeated and maintained in two places;
882 * as of this writing the non-sync_io version has become for direct public use (by user) only anyway;
883 * two birds one stone to have both use cases use the same core logic code. */
884
885 m_snd_ev_wait_func(&m_ev_wait_hndl_peer_socket,
886 true, // Wait for write.
887 // Once writable do this:
888 boost::make_shared<Task>
889 ([this]() { snd_on_ev_peer_socket_writable_or_error(); }));
890 return;
891 }
892 // else:
893 } // Lock_guard peer_socket_lock(m_peer_socket_mutex)
894
896
897 /* Style note: I (ygoldfel) was tempted to make snd_on_ev_peer_socket_writable_or_error() a closure right in here,
898 * which saves boiler-plate lines, but subjectively the reading flow seemed too gnarly here. Typically I would have
899 * done it that other way though; in fact originally that's how I had it, but it proved too gnarly over time.
900 * Among other things it's nice to have the on-sent handler appear right *below* the async-op that takes it as
901 * an arg; *above* is dicey to read. */
902} // Native_socket_stream::Impl::snd_async_write_q_head_payload()
903
905{
906 FLOW_LOG_TRACE("Socket stream [" << *this << "]: User-performed wait-for-writable finished (writable or error, "
907 "we do not know which yet). We endeavour to send->pop->send->... as much of the queue as we "
908 "can until would-block or total success.");
909
910 assert((!m_snd_pending_payloads_q.empty()) && "Send-queue should not be touched while async-write of head is going.");
911 assert((!m_snd_pending_err_code) && "Send error would only be detected by us. Bug?");
912
913 // Let's do as much as we can.
914 bool would_block = false;
915 do
916 {
917 auto& low_lvl_payload = *m_snd_pending_payloads_q.front();
918 auto& hndl_or_null = low_lvl_payload.m_hndl_or_null;
919 auto& low_lvl_blob = low_lvl_payload.m_blob;
920 auto low_lvl_blob_view = low_lvl_blob.const_buffer();
921
922 FLOW_LOG_TRACE("Socket stream [" << *this << "]: "
923 "Out-queue size is [" << m_snd_pending_payloads_q.size() << "]; "
924 "want to send handle [" << hndl_or_null << "] with "
925 "low-level blob of size [" << low_lvl_blob_view.size() << "] "
926 "located @ [" << low_lvl_blob_view.data() << "].");
927
928 const auto n_sent_or_zero
929 = snd_nb_write_low_lvl_payload(low_lvl_payload.m_hndl_or_null, low_lvl_blob_view, &m_snd_pending_err_code);
930 if (m_snd_pending_err_code)
931 {
932 continue; // Get out of the loop.
933 }
934 // else
935
936 if (n_sent_or_zero == low_lvl_blob_view.size())
937 {
938 // Everything was sent nicely!
939 m_snd_pending_payloads_q.pop(); // This should dealloc low_lvl_payload.m_blob in particular.
940 }
941 else // if (n_sent_or_zero != low_lvl_payload.m_blob.size())
942 {
943 /* Some or all of the payload could not be sent (it would-block if we tried to send the rest now).
944 * This is similar to snd_sync_write_or_q_payload(), but we needn't enqueue it, as it's already enqueued;
945 * just "edit" it in-place as needed. */
946 if (n_sent_or_zero != 0)
947 {
948 // Even if (!m_hndl_or_null.null()): 1 bytes were sent => so was the native handle.
949 low_lvl_payload.m_hndl_or_null = Native_handle();
950 /* Slide its .begin() to the right by n_sent_or_zero (might be no-op if was not writable after all).
951 * Note internally it's just a size_t +=; no realloc or anything. */
952 low_lvl_payload.m_blob.start_past_prefix_inc(n_sent_or_zero);
953 }
954 // else if (n_sent_or_zero == 0) { Nothing was sent, so no edits needed to low_lvl_payload. }
955 would_block = true; // Stop; would-block if we tried more.
956 }
957 }
958 while ((!m_snd_pending_payloads_q.empty()) && (!would_block) && (!m_snd_pending_err_code));
959
960 // Careful! This must be done before the `if` sequence below, as it can make m_snd_pending_err_code truthy after all.
961 if ((!m_snd_pending_err_code) && (!m_snd_pending_payloads_q.empty()))
962 {
963 FLOW_LOG_TRACE("Out-queue has not been emptied. Must keep async-send chain going.");
964
965 // Continue the chain (this guy "asynchronously" brought us here in the first place).
966 snd_async_write_q_head_payload();
967 /* To be clear: queue can now only become empty in "async" handler, not synchronously here.
968 *
969 * Reasoning sanity check: How could m_snd_pending_err_code become truthy here yet falsy (success)
970 * through the do/while() loop above? Answer: The writes could work; then after the last such write,
971 * but before snd_async_write_q_head_payload() -- which needs an m_*peer_socket member to indicate
972 * connection is still healthy, to initiate the user-executed wait-for-writable -- incoming-direction processing
973 * could have hosed m_*peer_socket. E.g., it tried to nb-receive and exposed newly-arrived error condition. */
974 }
975 /* ^-- if ((!m_snd_pending_err_code) && (!m_snd_pending_payloads_q.empty()))
976 * (but m_snd_pending_err_code may have become truthy inside). */
977
978 // Lastly deal with possibly having to fire async_end_sending() completion handler.
979
980 bool invoke_on_done = false;
981 if (m_snd_pending_err_code)
982 {
983 invoke_on_done = !m_snd_pending_on_last_send_done_func_or_empty.empty();
984 FLOW_LOG_WARNING("Socket stream [" << *this << "]: User-performed wait-for-writable reported completion; "
985 "wanted to nb-send any queued data and possibly initiated another wait-for-writable; "
986 "got error during an nb-send or when initiating wait; TRACE details above. "
987 "Error code details follow: "
988 "[" << m_snd_pending_err_code << "] [" << m_snd_pending_err_code.message() << "]. "
989 "Saved error code to return in next user send attempt if any. "
990 "Will run graceful-sends-close completion handler? = [" << invoke_on_done << "].");
991
992 assert((!m_snd_pending_payloads_q.empty()) && "Opportunistic sanity check.");
993 }
994 else if (m_snd_pending_payloads_q.empty()) // && (!m_snd_pending_err_code)
995 {
996 FLOW_LOG_TRACE("Out-queue has been emptied.");
997
998 if (!m_snd_pending_on_last_send_done_func_or_empty.empty())
999 {
1000 // INFO-log is okay, as this occurs at most once per *this.
1001 FLOW_LOG_INFO("Socket stream [" << *this << "]: "
1002 "We sent graceful-close and any preceding user messages with success. Will now inform user via "
1003 "graceful-sends-close completion handler.");
1004 invoke_on_done = true;
1005 }
1006 } // else if (m_snd_pending_payloads_q.empty() && (!m_snd_pending_err_code))
1007 // else if ((!m_snd_pending_payloads_q.empty()) && (!m_snd_pending_err_code)) { Async-wait started. }
1008
1009 if (invoke_on_done)
1010 {
1011 FLOW_LOG_TRACE("Socket stream [" << *this << "]: Executing end-sending completion handler now.");
1012 auto on_done_func = std::move(m_snd_pending_on_last_send_done_func_or_empty);
1013 m_snd_pending_on_last_send_done_func_or_empty.clear(); // For cleanliness, in case move() didn't do it.
1014
1015 on_done_func(m_snd_pending_err_code);
1016 FLOW_LOG_TRACE("Handler completed.");
1017 }
1018} // Native_socket_stream::Impl::snd_on_ev_peer_socket_writable_or_error()
1019
1021{
1022 return state_peer("send_meta_blob_max_size()") ? S_MAX_META_BLOB_LENGTH : 0;
1023}
1024
1026{
1027 return send_meta_blob_max_size();
1028}
1029
1030} // namespace ipc::transport::sync_io
proto_ver_t local_max_proto_ver_for_sending()
To be called at most once, this returns local_max_proto_ver from ctor the first time and S_VER_UNKNOW...
static constexpr proto_ver_t S_VER_UNKNOWN
A proto_ver_t value, namely a negative one, which is a reserved value indicating "unknown version"; i...
Error_code m_snd_pending_err_code
The first and only connection-hosing error condition detected when attempting to low-level-write on m...
bool async_end_sending(Error_code *sync_err_code, flow::async::Task_asio_err &&on_done_func)
See Native_socket_stream counterpart.
void snd_on_ev_peer_socket_writable_or_error()
Completion handler, from outside event loop via sync_io pattern, for the async-wait initiated by snd_...
Protocol_negotiator m_protocol_negotiator
Handles the protocol negotiation at the start of the pipe.
bool snd_sync_write_or_q_payload(Native_handle hndl_or_null, const util::Blob_const &orig_blob, bool avoid_qing)
Either synchronously sends hndl_or_null handle (if any) and orig_blob low-level blob over m_peer_sock...
void snd_on_ev_auto_ping_now_timer_fired()
Handler for the async-wait, via util::sync_io::Timer_event_emitter, of the auto-ping timer firing; if...
bool send_native_handle(Native_handle hndl_or_null, const util::Blob_const &meta_blob, Error_code *err_code)
See Native_socket_stream counterpart.
void snd_async_write_q_head_payload()
Initiates async-write over m_peer_socket of the low-level payload at the head of out-queue m_snd_pend...
size_t send_meta_blob_max_size() const
See Native_socket_stream counterpart.
bool async_end_sending_impl(Error_code *sync_err_code_ptr_or_null, flow::async::Task_asio_err &&on_done_func_or_empty)
*end_sending() body.
bool start_send_native_handle_ops(util::sync_io::Event_wait_func &&ev_wait_func)
See Native_socket_stream counterpart.
bool start_send_blob_ops(util::sync_io::Event_wait_func &&ev_wait_func)
See Native_socket_stream counterpart.
uint16_t low_lvl_payload_blob_length_t
The type used to encode the meta-blob length; this puts a cap on how long the meta-blobs can be.
bool send_blob(const util::Blob_const &blob, Error_code *err_code)
See Native_socket_stream counterpart.
bool auto_ping(util::Fine_duration period)
See Native_socket_stream counterpart.
size_t snd_nb_write_low_lvl_payload(Native_handle hndl_or_null, const util::Blob_const &blob, Error_code *err_code)
Utility that sends non-empty blob, and (unless null) hndl_or_null associated with its 1st byte,...
size_t send_blob_max_size() const
See Native_socket_stream counterpart.
static const size_t & S_MAX_META_BLOB_LENGTH
The maximum length of a blob that can be sent by this protocol.
bool start_send_native_handle_ops(Event_wait_func_t &&ev_wait_func)
Implements Native_handle_sender API per contract.
size_t send_meta_blob_max_size() const
Implements Native_handle_sender API per contract.
bool send_native_handle(Native_handle hndl_or_null, const util::Blob_const &meta_blob, Error_code *err_code=0)
Implements Native_handle_sender API per contract.
flow::log::Logger * get_logger() const
Returns logger (possibly null).
size_t nb_write_some_with_native_handle(flow::log::Logger *logger_ptr, Peer_socket *peer_socket_ptr, Native_handle payload_hndl, const util::Blob_const &payload_blob, Error_code *err_code)
boost.asio extension similar to peer_socket->non_blocking(true); auto n = peer_socket->write_some(pay...
void async_write_with_native_handle(flow::log::Logger *logger_ptr, Peer_socket *peer_socket_ptr, Native_handle payload_hndl, const util::Blob_const &payload_blob, Task_err &&on_sent_or_error)
boost.asio extension similar to boost::asio::async_write(Peer_socket&, Blob_const,...
@ S_SENDS_FINISHED_CANNOT_SEND
Will not send message: local user already ended sending via API marking this.
@ S_SYNC_IO_WOULD_BLOCK
A sync_io operation could not immediately complete; it will complete contingent on active async-wait ...
@ S_INVALID_ARGUMENT
User called an API with 1 or more arguments against the API spec.
@ S_LOW_LVL_TRANSPORT_HOSED_CANNOT_SEND
Unable to send outgoing traffic: an earlier-reported, or at least logged, system error had hosed the ...
sync_io-pattern counterparts to async-I/O-pattern object types in parent namespace ipc::transport.
util::Native_handle Native_handle
Convenience alias for the commonly used type util::Native_handle.
Function< void(Asio_waitable_native_handle *hndl_of_interest, bool ev_of_interest_snd_else_rcv, Task_ptr &&on_active_ev_func)> Event_wait_func
In sync_io pattern, concrete type storing user-supplied function invoked by pattern-implementing ipc:...
flow::async::Task Task
Short-hand for polymorphic function (a-la std::function<>) that takes no arguments and returns nothin...
Definition: util_fwd.hpp:122
flow::Fine_duration Fine_duration
Short-hand for Flow's Fine_duration.
Definition: util_fwd.hpp:117
boost::asio::const_buffer Blob_const
Short-hand for an immutable blob somewhere in memory, stored as exactly a void const * and a size_t.
Definition: util_fwd.hpp:134
flow::Error_code Error_code
Short-hand for flow::Error_code which is very common.
Definition: common.hpp:298
A monolayer-thin wrapper around a native handle, a/k/a descriptor a/k/a FD.
bool null() const
Returns true if and only if m_native_handle equals S_NULL_HANDLE.