Flow-IPC 1.0.2
Flow-IPC project: Full implementation reference.
native_socket_stream_impl_rcv.cpp
Go to the documentation of this file.
1/* Flow-IPC: Core
2 * Copyright 2023 Akamai Technologies, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the
5 * "License"); you may not use this file except in
6 * compliance with the License. You may obtain a copy
7 * of the License at
8 *
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in
12 * writing, software distributed under the License is
13 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
14 * CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing
16 * permissions and limitations under the License. */
17
18/// @file
21
23{
24
25// Native_socket_stream::Impl implementations (::rcv_*() and receive-API methods only).
26
28{
29 return start_ops<Op::S_RCV>(std::move(ev_wait_func));
30}
31
33{
34 return start_receive_native_handle_ops(std::move(ev_wait_func));
35}
36
38 const util::Blob_mutable& target_meta_blob,
39 Error_code* sync_err_code, size_t* sync_sz,
40 flow::async::Task_asio_err_sz&& on_done_func)
41{
42 /* Subtlety: async_receive_blob() will pass target_hndl==nullptr to _impl(); but we do not allow this.
43 * When using us as a Native_handle_receiver they must be ready to receive a Native_handle, even if the other
44 * side chooses to not send one (then *target_hndl shall be set to equal Native_handle(), a null handle).
45 *
46 * So now internally our code can tell whether this is from a Blob_receiver or a Native_handle_receiver role
47 * and emit error::Code::S_BLOB_RECEIVER_GOT_NON_BLOB if target_hndl==0, yet the other side sent an actual handle. */
48 assert(target_hndl && "Native_socket_stream::async_receive_native_handle() must take non-null Native_handle ptr.");
49
50 return async_receive_native_handle_impl(target_hndl, target_meta_blob, sync_err_code, sync_sz,
51 std::move(on_done_func));
52}
53
55 Error_code* sync_err_code, size_t* sync_sz,
56 flow::async::Task_asio_err_sz&& on_done_func)
57{
58 return async_receive_native_handle_impl(nullptr, target_blob, sync_err_code, sync_sz, std::move(on_done_func));
59}
60
62 const util::Blob_mutable& target_meta_blob,
63 Error_code* sync_err_code_ptr, size_t* sync_sz,
64 flow::async::Task_asio_err_sz&& on_done_func)
65{
67
68 if ((!op_started<Op::S_RCV>("async_receive_native_handle()")) || (!state_peer("async_receive_native_handle()")))
69 {
70 return false;
71 }
72 // else
73
74 /* We comment liberally, but tactically, inline; but please read the strategy in the class doc header's impl section.
75 *
76 * Briefly though: If you understand send_native_handle() impl, then this will be easy in comparison:
77 * no more than one async_receive_native_handle_impl() can be outstanding at a time; they are not allowed to
78 * invoke it until we invoke their completion handler. (Rationale is shown in detail elsewhere.)
79 * So there is no queuing of requests (deficit); nor reading more messages beyond what has been requested
80 * (surplus).
81 *
82 * That said, we must inform them of completion via on_done_func(), whereas send_native_handle() has no
83 * such requirement; easy enough -- we just save it as needed. In that sense it is much like
84 * async_end_sending().
85 *
86 * ...No, it's not easy in comparison actually. Sure, there is no queuing of requests, but the outgoing-direction
87 * algorithm makes the low-level payloads and then just sends them out -- what's inside doesn't matter.
88 * Incoming-direction is harder, because we need to read payload 1, interpret it, possibly read payload 2.
89 * So there is a bunch of tactical nonsense about async-waiting and then resuming from the same spot and so on. */
90
91 if (m_rcv_user_request)
92 {
93 FLOW_LOG_WARNING("Socket stream [" << *this << "]: Async-receive requested, but the preceding such request "
94 "is still in progress; the message has not arrived yet. "
95 "Likely a user error, but who are we to judge? Ignoring.");
96 return false;
97 }
98 // else
99
100 Error_code sync_err_code;
101
102 FLOW_LOG_TRACE("Socket stream [" << *this << "]: User async-receive request for "
103 "possible native handle and meta-blob (located @ [" << target_meta_blob.data() << "] of "
104 "max size [" << target_meta_blob.size() << "]).");
105
106 if (m_rcv_pending_err_code)
107 {
108 FLOW_LOG_WARNING("Socket stream [" << *this << "]: User async-receive request for "
109 "possible native handle and meta-blob (located @ [" << target_meta_blob.data() << "] of "
110 "max size [" << target_meta_blob.size() << "]): Error already encountered earlier. Emitting "
111 "via sync-args.");
112
113 sync_err_code = m_rcv_pending_err_code;
114 *sync_sz = 0;
115 }
116 else // if (!m_rcv_pending_err_code)
117 {
118 // Background can be found by following the comment on this concept constant.
120 "Socket streams allow trying to receive into a buffer that could underflow "
121 "with the largest *possible* message -- as long as the actual message "
122 "(if any) happens to be small enough to fit. Otherwise a pipe-hosing "
123 "error shall occur at receipt time.");
124
125 m_rcv_user_request.emplace();
126 m_rcv_user_request->m_target_hndl_ptr = target_hndl_or_null;
127 m_rcv_user_request->m_target_meta_blob = target_meta_blob;
128 m_rcv_user_request->m_on_done_func = std::move(on_done_func);
129
130 rcv_read_msg(&sync_err_code, sync_sz);
131 } // else if (!m_rcv_pending_err_code)
132
133 if ((!sync_err_code) || (sync_err_code != error::Code::S_SYNC_IO_WOULD_BLOCK))
134 {
135 FLOW_LOG_TRACE("Async-request completed synchronously (result "
136 "[" << sync_err_code << "] [" << sync_err_code.message() << "]); emitting synchronously and "
137 "disregarding handler.");
138 m_rcv_user_request.reset(); // No-op if we didn't set it up due to m_rcv_pending_err_code being truthy already.
139 }
140 // else { Other stuff logged enough. }
141
142 // Standard error-reporting semantics.
143 if ((!sync_err_code_ptr) && sync_err_code)
144 {
145 throw flow::error::Runtime_error(sync_err_code, "Native_socket_stream::Impl::async_receive_native_handle_impl()");
146 }
147 // else
148 sync_err_code_ptr && (*sync_err_code_ptr = sync_err_code);
149 // And if (!sync_err_code_ptr) + no error => no throw.
150
151 return true;
152} // Native_socket_stream::Impl::async_receive_native_handle_impl()
153
155{
157 using util::Task;
158 using boost::chrono::round;
159 using boost::chrono::milliseconds;
160
161 // We comment liberally, but tactically, inline; but please read the strategy in the class doc header's impl section.
162
163 assert(timeout.count() > 0);
164
165 if ((!op_started<Op::S_RCV>("idle_timer_run()")) || (!state_peer("idle_timer_run()")))
166 {
167 return false;
168 }
169 // else
170
171 // According to concept requirements we shall no-op/return false if duplicately called.
172 if (m_rcv_idle_timeout != Fine_duration::zero())
173 {
174 FLOW_LOG_WARNING("Socket stream [" << *this << "]: User wants to start idle timer, but they have already "
175 "started it before. Therefore ignoring.");
176 return false;
177 }
178 // else
179
180 m_rcv_idle_timeout = timeout; // Remember this, both as flag (non-zero()) and to know to when to schedule it.
181 // Now we will definitely return true (even if an error is already pending). This matches concept requirements.
182
183 if (m_rcv_pending_err_code)
184 {
185 FLOW_LOG_INFO("Socket stream [" << *this << "]: User wants to start idle timer, but an error has already "
186 "been found and emitted earlier. It's moot; ignoring.");
187 return true;
188 }
189 // else
190
191 FLOW_LOG_INFO("Socket stream [" << *this << "]: User wants to start idle-timer with timeout "
192 "[" << round<milliseconds>(m_rcv_idle_timeout) << "]. Scheduling (will be rescheduled as needed).");
193
194 /* Per requirements in concept, start it now; reschedule similarly each time there is activity.
195 *
196 * The mechanics of timer-scheduling are identical to those in auto_ping() and are explained there;
197 * keeping comments light here. */
198
199 m_rcv_ev_wait_func(&m_rcv_ev_wait_hndl_idle_timer_fired_peer,
200 false, // Wait for read.
201 boost::make_shared<Task>
202 ([this]() { rcv_on_ev_idle_timer_fired(); }));
203
204 m_rcv_idle_timer.expires_after(m_rcv_idle_timeout);
205 m_timer_worker.timer_async_wait(&m_rcv_idle_timer, m_rcv_idle_timer_fired_peer);
206
207 return true;
208} // Native_socket_stream::Impl::idle_timer_run()
209
211{
212 /* This is an event handler! Specifically for the *m_rcv_idle_timer_fired_peer pipe reader being
213 * readable. To avoid infinite-loopiness, we'd best pop the thing that was written there. */
214 m_timer_worker.consume_timer_firing_signal(m_rcv_idle_timer_fired_peer);
215
216 if (m_rcv_pending_err_code)
217 {
218 FLOW_LOG_TRACE("Socket stream [" << *this << "]: Idle timer fired: There's been 0 traffic past idle timeout. "
219 "However an error has already been found and emitted earlier. Therefore ignoring.");
220 return;
221 }
222 // else
223
224 m_rcv_pending_err_code = error::Code::S_RECEIVER_IDLE_TIMEOUT;
225
226 FLOW_LOG_WARNING("Socket stream [" << *this << "]: Idle timer fired: There's been 0 traffic past idle timeout. "
227 "Will not proceed with any further low-level receiving. If a user async-receive request is "
228 "pending (is it? = [" << bool(m_rcv_user_request) << "]) will emit to completion handler.");
229
230 if (m_rcv_user_request)
231 {
232 // Prevent stepping on our own toes: move/clear it first / invoke handler second.
233 const auto on_done_func = std::move(m_rcv_user_request->m_on_done_func);
234 m_rcv_user_request.reset();
235 on_done_func(m_rcv_pending_err_code, 0);
236 FLOW_LOG_TRACE("Socket stream [" << *this << "]: Handler completed.");
237 }
238
239 /* That's it. If m_rcv_user_request (we've just nullified it) then async read chain will be finished forever as
240 * soon as the user informs us of readability (if it ever does) -- we will detect there's an error
241 * in m_rcv_pending_err_code already (and hence no m_rcv_user_request). */
242} // Native_socket_stream::Impl::rcv_on_ev_idle_timer_fired()
243
245{
247
248 if (m_rcv_idle_timeout == Fine_duration::zero())
249 {
250 return;
251 }
252 // else
253
254 /* idle_timer_run() has enabled the idle timer feature, and we've been called indicating we just read something,
255 * and therefore it is time to reschedule the idle timer. */
256
257 const auto n_canceled = m_rcv_idle_timer.expires_after(m_rcv_idle_timeout);
258
259 if (n_canceled == 0)
260 {
261 // This is a fun, rare coincidence that is worth an INFO message.
262 FLOW_LOG_INFO("Socket stream [" << *this << "]: Finished reading a message, which means "
263 "we just received traffic, which means the idle timer should be rescheduled. However "
264 "when trying to reschedule it, we found we were too late: it was very recently queued to "
265 "be invoked in the near future. An idle timeout error shall be emitted very soon.");
266 }
267 else // if (n_canceled >= 1)
268 {
269 assert((n_canceled == 1) && "We only issue 1 timer async_wait() at a time.");
270
271 /* m_timer_worker will m_rcv_idle_timer.async_wait(F), where F() will signal through pipe,
272 * making *m_rcv_idle_timer_fired_peer readable. We've already used m_rcv_ev_wait_func() to start
273 * wait on it being readable and invoke rcv_on_ev_idle_timer_fired() in that case; but we've
274 * canceled the previous .async_wait() that would make it readable; so just redo that part. */
275 m_timer_worker.timer_async_wait(&m_rcv_idle_timer, m_rcv_idle_timer_fired_peer);
276 }
277} // Native_socket_stream::Impl::rcv_not_idle()
278
279void Native_socket_stream::Impl::rcv_read_msg(Error_code* sync_err_code, size_t* sync_sz)
280{
281 using util::Task;
282 using util::Blob_mutable;
283 using flow::util::Lock_guard;
284
285 FLOW_LOG_TRACE("Socket stream [" << *this << "]: Async-receive: Start of payload 1: "
286 "Trying nb-read of payload 1 (handle if any, meta-blob-length) synchronously.");
287 assert(!m_rcv_pending_err_code);
288
289 Native_handle target_hndl; // Target this even if target_hndl_or_null is null (to check for a certain error).
290 const auto n_rcvd_or_zero
291 = rcv_nb_read_low_lvl_payload(&target_hndl,
292 Blob_mutable(&m_rcv_target_meta_length, sizeof(m_rcv_target_meta_length)),
293 &m_rcv_pending_err_code);
294 if (!m_rcv_pending_err_code)
295 {
296 if (n_rcvd_or_zero != 0)
297 {
298 FLOW_LOG_TRACE("Got some or all of payload 1.");
299 rcv_on_handle_finalized(target_hndl, n_rcvd_or_zero, sync_err_code, sync_sz);
300 return;
301 }
302 // else
303
304 FLOW_LOG_TRACE("Got nothing but would-block. Awaiting readability.");
305
306 /* Conceptually we'd like to do m_peer_socket->async_wait(readable, F), where F() would perform
307 * rcv_nb_read_low_lvl_payload() (nb-receive over m_peer_socket). However this is the sync_io pattern, so
308 * the user will be performing the conceptual async_wait() for us. We must ask them to do so
309 * via m_rcv_ev_wait_func(), giving them m_peer_socket's FD -- m_ev_wait_hndl_peer_socket -- to wait-on. */
310
311 // m_*peer_socket = (the only) shared data between our- and opposite-direction code. Must lock (see their docs).
312 {
313 Lock_guard<decltype(m_peer_socket_mutex)> peer_socket_lock(m_peer_socket_mutex);
314
315 if (m_peer_socket)
316 {
317 m_rcv_ev_wait_func(&m_ev_wait_hndl_peer_socket,
318 false, // Wait for read.
319 // Once writable do this:
320 boost::make_shared<Task>([this]()
321 {
322 rcv_on_ev_peer_socket_readable_or_error(Rcv_msg_state::S_MSG_START, 0 /* ignored for S_MSG_START */);
323 }));
324
325 *sync_err_code = error::Code::S_SYNC_IO_WOULD_BLOCK;
326 *sync_sz = 0;
327 return;
328 }
329 // else:
330 } // Lock_guard peer_socket_lock(m_peer_socket_mutex)
331
332 FLOW_LOG_WARNING("Socket stream [" << *this << "]: User async-receive request: "
333 "was about to await readability but discovered opposite-direction socket-hosing error; "
334 "emitting error via completion handler (or via sync-args).");
335
337 } // if (!m_rcv_pending_err_code) (but may have become truthy inside, and has unless we `return`ed inside)
338
339 assert(m_rcv_pending_err_code);
340
341 FLOW_LOG_TRACE("Nb-read, or async-read following nb-read encountering would-block, detected error (details "
342 "above); will emit via completion handler (or via sync-args).");
343
344 *sync_err_code = m_rcv_pending_err_code;
345 *sync_sz = 0;
346} // Native_socket_stream::Impl::rcv_read_msg()
347
349 Error_code* sync_err_code, size_t* sync_sz)
350{
351 using util::Blob_mutable;
352 using util::Task;
353 using flow::util::Lock_guard;
354
355 assert(m_rcv_user_request);
356 assert(!m_rcv_pending_err_code);
357 assert(n_rcvd != 0);
358
359 const bool proto_negotiating
360 = m_protocol_negotiator.negotiated_proto_ver() == Protocol_negotiator::S_VER_UNKNOWN;
361
362 if (proto_negotiating && (!hndl_or_null.null()))
363 {
364 FLOW_LOG_WARNING("Socket stream [" << *this << "]: Expecting protocol-negotiation (first) in-message "
365 "to contain *only* a meta-blob: but received Native_handle is non-null which is "
366 "unexpected; emitting error via completion handler (or via sync-args).");
367#ifndef NDEBUG
368 const bool ok =
369#endif
370 m_protocol_negotiator.compute_negotiated_proto_ver(Protocol_negotiator::S_VER_UNKNOWN, &m_rcv_pending_err_code);
371 assert(ok && "Protocol_negotiator breaking contract? Bug?");
372 assert(m_rcv_pending_err_code
373 && "Protocol_negotiator should have emitted error given intentionally bad version.");
374 }
375 else if ((!hndl_or_null.null()) && (!m_rcv_user_request->m_target_hndl_ptr))
376 // && (!proto_negotiating)
377 {
378 FLOW_LOG_WARNING("Socket stream [" << *this << "]: User async-receive request for "
379 "*only* a meta-blob: but received Native_handle is non-null which is "
380 "unexpected; emitting error via completion handler (or via sync-args).");
381 m_rcv_pending_err_code = error::Code::S_BLOB_RECEIVER_GOT_NON_BLOB;
382 } // if (hndl_or_null && (!m_rcv_user_request->m_target_hndl_ptr))
383 else // if (no prob with hndl_or_null or m_target_hndl_ptr)
384 {
385 // Finalize the user's Native_handle target variable if applicable.
386 if (m_rcv_user_request->m_target_hndl_ptr
387 // If proto_negotiating, hndl_or_null is null; and anyway m_target_hndl_ptr is not yet in play.
388 && (!proto_negotiating))
389 {
390 *m_rcv_user_request->m_target_hndl_ptr = hndl_or_null;
391 }
392
393 if (n_rcvd == sizeof(m_rcv_target_meta_length))
394 {
395 // Got the entire payload 1, not just some of it including handle-if-any.
396 rcv_on_head_payload(sync_err_code, sync_sz);
397 return;
398 }
399 // else
400
401 /* Still have to finish reading into m_rcv_target_meta_length. We've already hit would-block so,
402 * much like in rcv_read_msg() (keeping comments light): */
403
404 {
405 Lock_guard<decltype(m_peer_socket_mutex)> peer_socket_lock(m_peer_socket_mutex);
406
407 if (m_peer_socket)
408 {
409 m_rcv_ev_wait_func(&m_ev_wait_hndl_peer_socket,
410 false, // Wait for read.
411 // Once writable do this:
412 boost::make_shared<Task>([this, n_rcvd]()
413 {
414 rcv_on_ev_peer_socket_readable_or_error(Rcv_msg_state::S_HEAD_PAYLOAD,
415 sizeof(m_rcv_target_meta_length) - n_rcvd);
416 }));
417
418 *sync_err_code = error::Code::S_SYNC_IO_WOULD_BLOCK;
419 *sync_sz = 0;
420 return;
421 }
422 // else:
423 } // Lock_guard peer_socket_lock(m_peer_socket_mutex)
424
425 FLOW_LOG_WARNING("Socket stream [" << *this << "]: User async-receive request: "
426 "was about to await readability but discovered opposite-direction socket-hosing error; "
427 "emitting error via completion handler (or via sync-args).");
428
430 } // if (no prob with hndl_or_null or m_target_hndl_ptr) (but another problem may have occurred inside)
431
432 assert(m_rcv_pending_err_code);
433
434 // WARNINGs above are enough; no TRACE here.
435
436 *sync_err_code = m_rcv_pending_err_code;
437 *sync_sz = 0;
438} // Native_socket_stream::Impl::rcv_on_handle_finalized()
439
441{
442 using util::Blob_mutable;
443
444 assert(m_rcv_user_request);
445 assert(!m_rcv_pending_err_code);
446
447 bool proto_negotiating
448 = m_protocol_negotiator.negotiated_proto_ver() == Protocol_negotiator::S_VER_UNKNOWN;
449
450 if (proto_negotiating)
451 {
452 /* Protocol_negotiator handles everything (invalid value, incompatible range...); we just know
453 * the encoding is to shove the version number into what is normally the length field. */
454#ifndef NDEBUG
455 const bool ok =
456#endif
457 m_protocol_negotiator.compute_negotiated_proto_ver
458 (static_cast<Protocol_negotiator::proto_ver_t>(m_rcv_target_meta_length),
459 &m_rcv_pending_err_code);
460 assert(ok && "Protocol_negotiator breaking contract? Bug?");
461 proto_negotiating = false; // Just in case (maintainability).
462
463 if (m_rcv_pending_err_code)
464 {
465 // Protocol negotiation failed. Do what we'd do due to, say, graceful-close below.
466 *sync_err_code = m_rcv_pending_err_code;
467 *sync_sz = 0;
468 return;
469 }
470 // else: Succeeded; do what we'd do due to, say, receiving auto-ping below.
471
472 FLOW_LOG_TRACE("Socket stream [" << *this << "]: Received all of negotiation payload; passed. "
473 "Ignoring other than registering non-idle activity. Proceeding with the next message read.");
474
475 rcv_not_idle(); // Register activity <= end of complete message, no error.
476 rcv_read_msg(sync_err_code, sync_sz);
477 return;
478 }
479 // else if (!proto_negotiating): Normal payload 1 handling.
480
481 if (m_rcv_target_meta_length == S_META_BLOB_LENGTH_PING_SENTINEL)
482 {
483 FLOW_LOG_TRACE("Socket stream [" << *this << "]: Received all of payload 1; length prefix "
484 "contains special value indicating a ping. Ignoring other than registering non-idle "
485 "activity. Proceeding with the next message read.");
486
487 rcv_not_idle(); // Register activity <= end of complete message, no error.
488 rcv_read_msg(sync_err_code, sync_sz);
489 return;
490 }
491 // else
492
493 const auto user_target_size = m_rcv_user_request->m_target_meta_blob.size();
494 if (m_rcv_target_meta_length != 0) // && (not ping)
495 {
496 if (m_rcv_target_meta_length <= user_target_size)
497 {
498 FLOW_LOG_TRACE("Socket stream [" << *this << "]: Received all of payload 1; length prefix "
499 "[" << m_rcv_target_meta_length <<"] is positive (and not indicative of ping). "
500 "Reading payload 2.");
501 rcv_read_blob(Rcv_msg_state::S_META_BLOB_PAYLOAD,
502 Blob_mutable(m_rcv_user_request->m_target_meta_blob.data(),
503 size_t(m_rcv_target_meta_length)),
504 sync_err_code, sync_sz);
505 return;
506 }
507 // else if (m_rcv_target_meta_length > user_target_size):
508
509 FLOW_LOG_WARNING("Received all of payload 1; length prefix "
510 "[" << m_rcv_target_meta_length <<"] is positive (and not indicative of ping); "
511 "however it exceeds user target blob size [" << user_target_size << "] and would "
512 "overflow. Treating similarly to a graceful-close but with a bad error code and "
513 "this warning. Will not proceed with any further low-level receiving; will invoke "
514 "handler (failure).");
516 }
517 else // if (m_rcv_target_meta_length == 0)
518 {
519 if (m_rcv_user_request->m_target_hndl_ptr && (!m_rcv_user_request->m_target_hndl_ptr->null()))
520 {
521 FLOW_LOG_TRACE("Socket stream [" << *this << "]: Received all of payload 1; length 0 + non-null handle => "
522 "handle received with no meta-blob. Will register non-idle activity; "
523 "and invoke handler (success).");
524 // m_rcv_pending_err_code remains falsy.
525
526 rcv_not_idle(); // Register activity <= end of complete message, no error.
527 }
528 else
529 {
530 // Once per connection at most, so INFO log level is OK.
531 FLOW_LOG_INFO("Socket stream [" << *this << "]: User message received: Graceful-close-of-incoming-pipe "
532 "message. Will not proceed with any further low-level receiving. "
533 "Will invoke handler (graceful-close error).");
535 }
536 } // else if (m_rcv_target_meta_length == 0)
537
538 *sync_err_code = m_rcv_pending_err_code; // Truthy (graceful-close) or falsy (got handle + no meta-blob).
539 *sync_sz = 0;
540} // Native_socket_stream::Impl::rcv_on_head_payload()
541
543{
544 using util::Blob_mutable;
545
546 if (m_rcv_pending_err_code)
547 {
548 FLOW_LOG_WARNING("Socket stream [" << *this << "]: User's wait-for-readable finished (readable or error, "
549 "we do not know which yet); would resume processing depending on what we were doing before; "
550 "however an error was detected in the meantime (as of this writing: idle timeout). "
551 "Stopping read chain.");
552 assert((!m_rcv_user_request)
553 && "If rcv-error emitted during low-level async-wait, we should have fed it to any pending async-receive.");
554 return;
555 }
556 // else
557
558 assert(m_rcv_user_request);
559
560 // Will potentially emit these (if and only if message-read completes due to this successful async-wait).
561 Error_code sync_err_code;
562 size_t sync_sz;
563
564 FLOW_LOG_TRACE("Socket stream [" << *this << "]: User-performed wait-for-readable finished (readable or error, "
565 "we do not know which yet). Resuming processing depending on what we were doing before.");
566
567 switch (msg_state)
568 {
569 case Rcv_msg_state::S_MSG_START:
570 FLOW_LOG_TRACE("Socket stream [" << *this << "]: In state MSG_START: Reading from byte 0/handle if any.");
571 rcv_read_msg(&sync_err_code, &sync_sz);
572 break;
573
574 case Rcv_msg_state::S_HEAD_PAYLOAD:
575 FLOW_LOG_TRACE("Socket stream [" << *this << "]: In state HEAD_PAYLOAD: "
576 "Reading meta-length/ping/graceful-close specifier: [" << n_left << "] bytes left.");
577 rcv_read_blob(Rcv_msg_state::S_HEAD_PAYLOAD,
578 Blob_mutable(static_cast<uint8_t*>(static_cast<void*>(&m_rcv_target_meta_length))
579 + sizeof(m_rcv_target_meta_length) - n_left,
580 n_left),
581 &sync_err_code, &sync_sz);
582 break;
583
584 case Rcv_msg_state::S_META_BLOB_PAYLOAD:
585 FLOW_LOG_TRACE("Socket stream [" << *this << "]: In state META_BLOB_PAYLOAD: "
586 "Reading meta-blob: [" << n_left << "] bytes left.");
587 rcv_read_blob(Rcv_msg_state::S_META_BLOB_PAYLOAD,
588 Blob_mutable(static_cast<uint8_t*>(m_rcv_user_request->m_target_meta_blob.data())
589 + size_t(m_rcv_target_meta_length) - n_left,
590 n_left),
591 &sync_err_code, &sync_sz);
592 } // switch (msg_state) (Compiler should catch any missed enum value.)
593
594 if (sync_err_code == error::Code::S_SYNC_IO_WOULD_BLOCK)
595 {
596 // Another async-wait is pending now. We've logged enough. Live to fight another day.
597 return;
598 }
599 // else
600
601 FLOW_LOG_TRACE("Socket stream [" << *this << "]: Async-op result ready after successful async-wait. "
602 "Executing handler now.");
603
604 // Prevent stepping on our own toes: move/clear it first / invoke handler second.
605 const auto on_done_func = std::move(m_rcv_user_request->m_on_done_func);
606 m_rcv_user_request.reset();
607 on_done_func(sync_err_code, sync_sz);
608 FLOW_LOG_TRACE("Socket stream [" << *this << "]: Handler completed.");
609} // Native_socket_stream::Impl::rcv_on_ev_peer_socket_readable_or_error()
610
612 Error_code* sync_err_code, size_t* sync_sz)
613{
614 using util::Task;
615 using flow::util::Lock_guard;
616
617 assert(!m_rcv_pending_err_code);
618 assert(m_rcv_user_request);
619
620 const auto n_rcvd_or_zero = rcv_nb_read_low_lvl_payload(nullptr, target_blob, &m_rcv_pending_err_code);
621 if (!m_rcv_pending_err_code)
622 {
623 if (n_rcvd_or_zero == target_blob.size())
624 {
625 switch (msg_state)
626 {
627 case Rcv_msg_state::S_HEAD_PAYLOAD:
628 rcv_on_head_payload(sync_err_code, sync_sz);
629 return;
630 case Rcv_msg_state::S_META_BLOB_PAYLOAD:
631 {
632 FLOW_LOG_TRACE("Socket stream [" << *this << "]: Received all of payload 2 (meta-blob of length "
633 "[" << m_rcv_target_meta_length << "]). Will register non-idle activity; "
634 "and invoke handler (or report via sync-args).");
635
636 rcv_not_idle(); // Register activity <= end of complete message, no error.
637
638 assert(!*sync_err_code);
639 *sync_sz = size_t(m_rcv_target_meta_length);
640
641 return;
642 }
643 case Rcv_msg_state::S_MSG_START:
644 assert(false && "rcv_read_blob() shall be used only for S_*_PAYLOAD phases.");
645 }
646 assert(false && "Should not get here.");
647 } // if (n_rcvd_or_zero == target_blob.size())
648 // else if (n_rcvd_or_zero != target_blob.size())
649
650 FLOW_LOG_TRACE("Do not have all of requested payload; got would-block. Awaiting readability.");
651
652 // So, much like in rcv_read_msg() (keeping comments light): */
653 {
654 Lock_guard<decltype(m_peer_socket_mutex)> peer_socket_lock(m_peer_socket_mutex);
655
656 if (m_peer_socket)
657 {
658 m_rcv_ev_wait_func(&m_ev_wait_hndl_peer_socket,
659 false, // Wait for read.
660 // Once writable do this:
661 boost::make_shared<Task>([this, msg_state,
662 n_left = target_blob.size() - n_rcvd_or_zero]()
663 {
664 rcv_on_ev_peer_socket_readable_or_error(msg_state, n_left);
665 }));
666
667 *sync_err_code = error::Code::S_SYNC_IO_WOULD_BLOCK;
668 *sync_sz = 0;
669 return;
670 }
671 // else:
672 } // Lock_guard peer_socket_lock(m_peer_socket_mutex)
673
675 } // if (!m_rcv_pending_err_code) (but may have become truthy inside, and has unless we `return`ed inside)
676
677 assert(m_rcv_pending_err_code);
678
679 *sync_err_code = m_rcv_pending_err_code;
680 *sync_sz = 0;
681} // Native_socket_stream::Impl::rcv_read_blob()
682
684 const util::Blob_mutable& target_payload_blob,
685 Error_code* err_code)
686{
687 using flow::util::Lock_guard;
689
690 assert(err_code);
691
692 // We comment liberally, but tactically, inline; but please read the strategy in the class doc header's impl section.
693
694 /* Result semantics (reminder of what we promised in contract):
695 * Partial or total success in non-blockingly receiving blob and handle-or-none: > 0; falsy *err_code.
696 * No success in receiving blob+handle, because it would-block (not fatal): == 0; falsy *err_code.
697 * No success in receiving blob+handle, because fatal error: == 0, truthy *err_code. */
698 size_t n_rcvd_or_zero = 0;
699
700 // m_*peer_socket = (the only) shared data between our- and opposite-direction code. Must lock (see class docs).
701 {
702 Lock_guard<decltype(m_peer_socket_mutex)> peer_socket_lock(m_peer_socket_mutex);
703
704 if (m_peer_socket)
705 {
706 if (target_payload_hndl_or_null)
707 {
708 /* Per contract, nb_read_some_with_native_handle() is identical to setting non_blocking(true) and attempting
709 * read_some(orig_blob) -- except if it's able to receive even 1 byte it'll also have set the target handle
710 * to either null (none present) or non-null.
711 * When we say identical we mean identical result semantics along with everything else. */
712 n_rcvd_or_zero = nb_read_some_with_native_handle(get_logger(), m_peer_socket.get(),
713 target_payload_hndl_or_null, target_payload_blob, err_code);
714 // That should have TRACE-logged stuff, so we won't (it's our function).
715 } // if (target_payload_hndl_or_null)
716 else // if (!target_payload_hndl_or_null)
717 {
718 /* No interest in receiving a handle, so we can just use boost.asio's normal non-blocking read
719 * (non_blocking(true), read_some()). */
720
721 // First set non-blocking mode... same deal as in snd_nb_write_low_lvl_payload(); keeping comments light.
722 if (!m_peer_socket->non_blocking())
723 {
724 FLOW_LOG_TRACE("Socket stream [" << *this << "]: Setting boost.asio peer socket non-blocking mode.");
725 m_peer_socket->non_blocking(true, *err_code); // Sets *err_code to success or the triggering error.
726 }
727 else // if (already non-blocking)
728 {
729 err_code->clear();
730 }
731
732 if (!*err_code)
733 {
734 assert(m_peer_socket->non_blocking());
735
736 FLOW_LOG_TRACE("Reading low-level blob directly via boost.asio (blob details logged above hopefully).");
737 n_rcvd_or_zero = m_peer_socket->read_some(target_payload_blob, *err_code);
738 }
739 // else if (*err_code) { *err_code is truthy; n_rcvd_or_zero == 0; cool. }
740 }
741
742 /* Almost home free; but our result semantics are a little different from the low-level-read functions'.
743 *
744 * Plus, if we just discovered the connection is hosed, do whatever's needed with that. */
745 assert(((!*err_code) && (n_rcvd_or_zero != 0))
746 || (*err_code && (n_rcvd_or_zero == 0)));
747 if (*err_code == boost::asio::error::would_block)
748 {
749 err_code->clear();
750 // *err_code is falsy; n_rcvd_or_zero == 0; cool.
751 }
752 else if (*err_code)
753 {
754 // True-blue system error. Kill off *m_peer_socket: might as well give it back to the system (it's a resource).
755
756 // Closes peer socket to the (hosed anyway) connection; including ::close(m_peer_socket->native_handle()).
757 m_peer_socket.reset();
758
759 // *err_code is truthy; n_rcvd_or_zero == 0; cool.
760 }
761 // else if (!*err_code) { *err_code is falsy; n_rcvd_or_zero >= 1; cool. }
762 } // if (m_peer_socket)
763 else // if (!m_peer_socket)
764 {
766 }
767 } // Lock_guard peer_socket_lock(m_peer_socket_mutex)
768
769 assert((!*err_code)
770 || (n_rcvd_or_zero == 0)); // && *err_code
771
772 if (*err_code)
773 {
774 FLOW_LOG_TRACE("Received nothing due to error [" << *err_code << "] [" << err_code->message() << "].");
775 }
776 else
777 {
778 FLOW_LOG_TRACE("Receive: no error. Was able to receive [" << n_rcvd_or_zero << "] of "
779 "[" << target_payload_blob.size() << "] bytes.");
780 if (target_payload_hndl_or_null)
781 {
782 if (n_rcvd_or_zero != 0)
783 {
784 FLOW_LOG_TRACE("Interest in native handle; was able to establish its presence or absence; "
785 "present? = [" << (!target_payload_hndl_or_null->null()) << "].");
786 }
787 } // if (target_payload_hndl_or_null)
788 else
789 {
790 FLOW_LOG_TRACE("No interest in native handle.");
791 }
792 } // else if (!*err_code)
793
794 return n_rcvd_or_zero;
795} // Native_socket_stream::Impl::rcv_nb_read_low_lvl_payload()
796
798{
799 return state_peer("receive_meta_blob_max_size()") ? S_MAX_META_BLOB_LENGTH : 0;
800}
801
803{
805}
806
807} // namespace ipc::transport::sync_io
int16_t proto_ver_t
Type sufficient to store a protocol version; positive values identify newer versions of a protocol; w...
static constexpr proto_ver_t S_VER_UNKNOWN
A proto_ver_t value, namely a negative one, which is a reserved value indicating "unknown version"; i...
void rcv_read_blob(Rcv_msg_state msg_state, const util::Blob_mutable &target_blob, Error_code *sync_err_code, size_t *sync_sz)
A somewhat-general utility that continues read chain with the aim to complete the present in-message,...
Rcv_msg_state
Used to organize tje incoming-direction state machine tactically, this indicates what part of payload...
bool start_receive_blob_ops(util::sync_io::Event_wait_func &&ev_wait_func)
See Native_socket_stream counterpart.
size_t rcv_nb_read_low_lvl_payload(Native_handle *target_payload_hndl_or_null, const util::Blob_mutable &target_payload_blob, Error_code *err_code)
Utility that synchronously, non-blockingly attempts to read over m_peer_socket into the target blob a...
void rcv_on_head_payload(Error_code *sync_err_code, size_t *sync_sz)
Reacts to payload 1 having been completely received.
bool start_receive_native_handle_ops(util::sync_io::Event_wait_func &&ev_wait_func)
See Native_socket_stream counterpart.
bool async_receive_blob(const util::Blob_mutable &target_blob, Error_code *sync_err_code, size_t *sync_sz, flow::async::Task_asio_err_sz &&on_done_func)
See Native_socket_stream counterpart.
void rcv_on_ev_idle_timer_fired()
Handler for the async-wait, via util::sync_io::Timer_event_emitter, of the idle timer firing; if stil...
void rcv_on_handle_finalized(Native_handle hndl_or_null, size_t n_rcvd, Error_code *sync_err_code, size_t *sync_sz)
Helper of rcv_read_msg() – it could have been inlined instead of a method but for readability concern...
bool async_receive_native_handle(Native_handle *target_hndl, const util::Blob_mutable &target_meta_blob, Error_code *sync_err_code, size_t *sync_sz, flow::async::Task_asio_err_sz &&on_done_func)
See Native_socket_stream counterpart.
void rcv_read_msg(Error_code *sync_err_code, size_t *sync_sz)
Begins read chain (completing it as synchronously as possible, async-completing the rest) for the nex...
bool idle_timer_run(util::Fine_duration timeout)
See Native_socket_stream counterpart.
size_t receive_blob_max_size() const
See Native_socket_stream counterpart.
void rcv_not_idle()
No-ops if idle_timer_run() is not engaged; otherwise reacts to non-idleness of the in-pipe by resched...
bool async_receive_native_handle_impl(Native_handle *target_hndl_or_null, const util::Blob_mutable &target_meta_blob, Error_code *sync_err_code, size_t *sync_sz, flow::async::Task_asio_err_sz &&on_done_func)
Body of both async_receive_native_handle() and async_receive_blob().
void rcv_on_ev_peer_socket_readable_or_error(Rcv_msg_state msg_state, size_t n_left)
Completion handler, from outside event loop via sync_io pattern, for the async-wait initiated by vari...
size_t receive_meta_blob_max_size() const
See Native_socket_stream counterpart.
static const size_t & S_MAX_META_BLOB_LENGTH
The maximum length of a blob that can be sent by this protocol.
static constexpr bool S_BLOB_UNDERFLOW_ALLOWED
Implements concept API; namely it is true.
flow::log::Logger * get_logger() const
Returns logger (possibly null).
bool start_receive_native_handle_ops(Event_wait_func_t &&ev_wait_func)
Implements Native_handle_receiver API per contract.
size_t receive_meta_blob_max_size() const
Implements Native_handle_receiver API per contract.
size_t nb_read_some_with_native_handle(flow::log::Logger *logger_ptr, Peer_socket *peer_socket_ptr, Native_handle *target_payload_hndl_ptr, const util::Blob_mutable &target_payload_blob, Error_code *err_code, int message_flags)
boost.asio extension similar to peer_socket->non_blocking(true); auto n = peer_socket->read_some(targ...
@ S_LOW_LVL_TRANSPORT_HOSED_CANNOT_RECEIVE
Unable to receive incoming traffic: an earlier-reported, or at least logged, system error had hosed t...
@ S_SYNC_IO_WOULD_BLOCK
A sync_io operation could not immediately complete; it will complete contingent on active async-wait ...
@ S_RECEIVES_FINISHED_CANNOT_RECEIVE
Will not receive message: either opposing user sent graceful-close via API.
@ S_RECEIVER_IDLE_TIMEOUT
No messages (optional auto-pings or otherwise) have been received; optionally configured timeout exce...
@ S_MESSAGE_SIZE_EXCEEDS_USER_STORAGE
User protocol-code mismatch: local user-provided storage cannot fit entire message received from oppo...
@ S_BLOB_RECEIVER_GOT_NON_BLOB
User protocol-code mismatch: local user expected blob only and no native handle; received at least th...
sync_io-pattern counterparts to async-I/O-pattern object types in parent namespace ipc::transport.
Function< void(Asio_waitable_native_handle *hndl_of_interest, bool ev_of_interest_snd_else_rcv, Task_ptr &&on_active_ev_func)> Event_wait_func
In sync_io pattern, concrete type storing user-supplied function invoked by pattern-implementing ipc:...
flow::async::Task Task
Short-hand for polymorphic function (a-la std::function<>) that takes no arguments and returns nothin...
Definition: util_fwd.hpp:122
boost::asio::mutable_buffer Blob_mutable
Short-hand for an mutable blob somewhere in memory, stored as exactly a void* and a size_t.
Definition: util_fwd.hpp:140
flow::Fine_duration Fine_duration
Short-hand for Flow's Fine_duration.
Definition: util_fwd.hpp:117
flow::Error_code Error_code
Short-hand for flow::Error_code which is very common.
Definition: common.hpp:298
A monolayer-thin wrapper around a native handle, a/k/a descriptor a/k/a FD.
bool null() const
Returns true if and only if m_native_handle equals S_NULL_HANDLE.