Flow-IPC 1.0.1
Flow-IPC project: Full implementation reference.
asio_local_stream_socket.hpp
Go to the documentation of this file.
1/* Flow-IPC: Core
2 * Copyright 2023 Akamai Technologies, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the
5 * "License"); you may not use this file except in
6 * compliance with the License. You may obtain a copy
7 * of the License at
8 *
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in
12 * writing, software distributed under the License is
13 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
14 * CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing
16 * permissions and limitations under the License. */
17
18/// @file
19#pragma once
20
22#include "ipc/util/util_fwd.hpp"
23#include <flow/error/error.hpp>
24
26{
27
28// Template implementations.
29
30template<typename Task_err>
31void on_wait_writable_or_error(flow::log::Logger* logger_ptr,
32 const Error_code& sys_err_code,
33 Native_handle payload_hndl,
34 const util::Blob_const& payload_blob_ref,
35 Peer_socket* peer_socket_ptr,
36 Task_err&& on_sent_or_error)
37{
38 using util::blob_data;
39 using flow::util::buffers_dump_string;
40 using boost::asio::async_write;
41 using boost::asio::bind_executor;
42 using boost::asio::get_associated_executor;
43 using boost::asio::post;
44
45 assert(peer_socket_ptr);
46 auto& peer_socket = *peer_socket_ptr;
47 util::Blob_const payload_blob(payload_blob_ref); // So we can modify it.
48
49 FLOW_LOG_SET_CONTEXT(logger_ptr, Log_component::S_TRANSPORT);
50 FLOW_LOG_TRACE("Connected local peer socket was waiting to write from location @ [" << payload_blob.data() << "] "
51 "plus native handle [" << payload_hndl << "]; "
52 "handler called, either ready or error; will try to send if appropriate.");
53
54 if (sys_err_code)
55 {
56 FLOW_ERROR_SYS_ERROR_LOG_WARNING();
57 FLOW_LOG_WARNING("Connected local peer socket was waiting to write from "
58 "location @ [" << payload_blob.data() << "] plus native handle [" << payload_hndl << "]; "
59 "but an unrecoverable error occurred; will not retry; posting handler.");
60
61 const auto executor = get_associated_executor(on_sent_or_error);
62 post(peer_socket.get_executor(),
63 bind_executor(executor,
64 [sys_err_code, on_sent_or_error = std::move(on_sent_or_error)]()
65 {
66 on_sent_or_error(sys_err_code);
67 }));
68
69 // And that's it.
70 return;
71 }
72 // else
73
74 assert (!sys_err_code);
75 // No error. Let us send as much as possible of remaining payload_blob; and the native handle payload_hndl.
76
77 Error_code nb_err_code;
78 const size_t n_sent_or_zero
79 = nb_write_some_with_native_handle(logger_ptr, peer_socket_ptr, payload_hndl, payload_blob, &nb_err_code);
80
81 if (n_sent_or_zero == 0)
82 {
83 // Not even 1 byte of the blob was sent; and hence nor was payload_hndl (per contract of that function).
84
85 if (nb_err_code == boost::asio::error::would_block)
86 {
87 FLOW_LOG_TRACE("Async wait indicates writability, yet write attempt indicated would-block; unusual but not "
88 "an error condition; we will try again.");
89
90 // Just like the initial wait (omitting comments for brevity; see async_wait() in main function above).
91 peer_socket.async_wait
92 (Peer_socket::wait_write,
93 [logger_ptr, payload_hndl, payload_blob, peer_socket_ptr,
94 on_sent_or_error = std::move(on_sent_or_error)]
95 (const Error_code& sys_err_code) mutable
96 {
97 on_wait_writable_or_error(logger_ptr, sys_err_code, payload_hndl, payload_blob, peer_socket_ptr,
98 std::move(on_sent_or_error));
99 });
100 return;
101 }
102 // else
103
104 assert(nb_err_code);
105
106 // All other errors are fatal. Retrying makes no sense. Report the error and however much we'd sent of blob.
107 FLOW_LOG_WARNING("Connected local peer socket tried to write from "
108 "location @ [" << payload_blob.data() << "] plus native handle [" << payload_hndl << "]; "
109 "but an unrecoverable error occurred; will not retry; posting handler with error.");
110
111 const auto executor = get_associated_executor(on_sent_or_error);
112 post(peer_socket.get_executor(),
113 bind_executor(executor,
114 [get_logger, get_log_component, nb_err_code, on_sent_or_error = std::move(on_sent_or_error)]()
115 {
116 FLOW_LOG_TRACE("Handler started.");
117 on_sent_or_error(nb_err_code);
118 FLOW_LOG_TRACE("Handler finished.");
119 // And that's it. Async op finished.
120 }));
121 return;
122 } // if (n_sent_or_zero == 0)
123 // else if (n_sent_or_zero > 0)
124
125 assert (n_sent_or_zero > 0);
126
127 const size_t orig_blob_size = payload_blob.size();
128 payload_blob += n_sent_or_zero; // Shift the buffer (size goes down, start goes right).
129 if (payload_blob.size() == 0)
130 {
131 FLOW_LOG_TRACE("Blob fully sent; hence posting handler with success code.");
132
133 const auto executor = get_associated_executor(on_sent_or_error);
134 post(peer_socket.get_executor(),
135 bind_executor(executor,
136 [get_logger, get_log_component, on_sent_or_error = std::move(on_sent_or_error)]()
137 {
138 FLOW_LOG_TRACE("Handler started.");
139 on_sent_or_error(Error_code());
140 FLOW_LOG_TRACE("Handler finished.");
141 // And that's it. Async op finished.
142 }));
143 return;
144 }
145 // else
146
147 // Log much like when about to perform the original handle-and-blob wait; except handle is now sent off.
148 FLOW_LOG_TRACE("Continuing: Via connected local peer socket, will *only* send remaining "
149 "blob of size [" << payload_blob.size() << "] located @ [" << payload_blob.data() << "].");
150
151 // Verbose and slow (100% skipped unless log filter passes).
152 FLOW_LOG_DATA("Continuing: Blob contents are "
153 "[\n" << buffers_dump_string(payload_blob, " ") << "].");
154
155 /* Subtlety: Here we could easily retry the async_wait() again, then nb_write_some_with_native_handle(), etc.;
156 * as in the would-block case above. Why do we choose to use boost.asio
157 * instead? Answer: Well, A, we can, since the handle has been delivered.
158 * And on_wait_writable_or_error() formally expects that payload_hndl is not null, meaning it lacks a mode to
159 * nb_write_some_with_native_handle() only the blob. Well, why not add that ability?
160 * (It wouldn't be hard; sendmsg() -- the core of nb_write_some_with_native_handle() --
161 * works just fine without using the ancillary-data
162 * feature at all.) Answer: It's fine either way really. I (ygoldfel) feel a tiny
163 * bit better doing native/non-portable code only when necessary and reducing complexity of the native/non-portable
164 * code itself; in other words the extra branching could either go deeper into the native code flow, or a
165 * a bit higher up, and the latter appeals to me more. On the other hand this does increase the number of different
166 * types of transmission APIs we use; instead of sendmsg() used in 2 ways, we use sendmsg() in 1 way and boost.asio
167 * async_write() in 1 way. Potato, potahto....
168 *
169 * Side note: See how easy the following call is, not worrying about having to keep trying if it could only send
170 * a partial amount? async_write_with_native_handle() -- the thing we are implementing -- has those same semantics,
171 * which is why we're doing all that stuff in here, so our caller need not. */
172
173 async_write(peer_socket, payload_blob,
174 [get_logger, get_log_component, // Then we needn't do FLOW_LOG_SET_CONTEXT() inside before logging.
175 payload_blob, orig_blob_size, peer_socket_ptr,
176 on_sent_or_error = std::move(on_sent_or_error)]
177 (const Error_code& sys_err_code, size_t)
178 {
179 FLOW_LOG_TRACE("Connected local peer socket tried to write from location @ [" << payload_blob.data() << "] "
180 "plus NO native handle; handler called, either with success or error.");
181
182 if (sys_err_code)
183 {
184 FLOW_ERROR_SYS_ERROR_LOG_WARNING();
185 FLOW_LOG_WARNING("Connected local peer socket tried to write *only* blob "
186 "located @ [" << payload_blob.data() << "]; "
187 "but an unrecoverable error occurred; will not retry; posting handler with error.");
188 }
189
190 /* We must pass N to them, where N is the number of bytes in total, out of the *original* payload_blob, the entire
191 * async op -- not just this async_write() -- has been able to send.
192 * - We got here because the one and only non-would-block sendmsg() already was able to send
193 * `orig_blob_size - payload_blob.size()` bytes. Recall that payload_blob now is the original payload_blob
194 * shifted (`+`ed) by that many bytes.
195 * - Then we tried to do the async_write(), and that sent a further `n_sent` bytes (possibly 0).
196 * - Hence sum those 2 quantities to yield N. */
197
198 const auto executor = get_associated_executor(on_sent_or_error);
199 post(peer_socket_ptr->get_executor(),
200 bind_executor(executor,
201 [get_logger, get_log_component,
202 sys_err_code, on_sent_or_error = std::move(on_sent_or_error)]()
203 {
204 FLOW_LOG_TRACE("Handler started.");
205 on_sent_or_error(sys_err_code);
206 FLOW_LOG_TRACE("Handler finished.");
207 // And that's it. Async op finished.
208 }));
209 }); // async_write()
210} // on_wait_writable_or_error()
211
212template<bool TARGET_TBD,
213 typename Task_err_blob, typename Target_payload_blob_func, typename Should_interrupt_func>
214void on_wait_readable_or_error(flow::log::Logger* logger_ptr, const Error_code& async_err_code,
215 Peer_socket* peer_socket_ptr,
216 Should_interrupt_func&& should_interrupt_func,
217 Task_err_blob&& on_rcvd_or_error,
218 Target_payload_blob_func&& target_payload_blob_func,
219 util::Blob_mutable target_payload_blob,
220 size_t n_rcvd_so_far)
221{
222 using util::Blob_mutable;
223 using boost::asio::bind_executor;
224 using boost::asio::get_associated_executor;
225 using boost::asio::post;
226
227 FLOW_LOG_SET_CONTEXT(logger_ptr, Log_component::S_TRANSPORT);
228 if constexpr(TARGET_TBD)
229 {
230 FLOW_LOG_TRACE("Connected local peer socket was waiting to read (1st time -- target buffer undetermined); "
231 "handler called, either ready or error; will try to obtain target blob/receive if appropriate.");
232 assert(n_rcvd_so_far == 0);
233 }
234 else
235 {
236 FLOW_LOG_TRACE("Connected local peer socket was waiting to read (target buffer known); "
237 "handler called, either ready or error; will try to receive if appropriate. "
238 "Buffer location: @ [" << target_payload_blob.data() << "], "
239 "size [" << target_payload_blob.size() << "] (already received [" << n_rcvd_so_far << "]).");
240 }
241
242 auto& peer_socket = *peer_socket_ptr;
243
244 /* Note: As promised we disregard any executor bound to the callbacks target_payload_blob_func()
245 * and should_interrupt_func(). This could be a feature added later, though the perf implications would
246 * be questionable: Unlike on_rcvd_or_error() neither is the completion handler, so we'd probably need
247 * to (1) post() a wrapper of either; then (2) within that wrapper at the end post() the subsequent
248 * work (the code that currently executes synchronously right after these calls at the moment). */
249
250 /* Always, before placing anything into target buffer or anything else that would lead to completion handler,
251 * we must check for the operation being interrupted/canceled by user. Hence it's too early to even check
252 * async_err_code which could lead to completion handler. */
253 if (should_interrupt_func())
254 {
255 FLOW_LOG_TRACE("Interrupted locally. Not proceeding further; not invoking completion handler.");
256 return;
257 }
258 // else OK, not interrupted during async-gap. The floor is ours.
259
260 // Little helper to invoke completion handler.
261 const auto finish = [&](const Error_code& err_code, util::Blob_mutable blob)
262 {
263 const auto executor = get_associated_executor(on_rcvd_or_error);
264 post(peer_socket.get_executor(),
265 bind_executor(executor,
266 [err_code, blob, on_rcvd_or_error = std::move(on_rcvd_or_error)]()
267 {
268 on_rcvd_or_error(err_code, blob);
269 }));
270 };
271
272 auto sys_err_code = async_err_code; // Just so we can tactically modify it.
273 if (sys_err_code)
274 {
275 FLOW_ERROR_SYS_ERROR_LOG_WARNING();
276 finish(sys_err_code, Blob_mutable());
277 return;
278 }
279 // else wait results in no error; should be readable.
280
281 if constexpr(TARGET_TBD)
282 {
283 // First time we need to ask user's callback for the target buffer.
284 assert(target_payload_blob.size() == 0);
285 target_payload_blob = target_payload_blob_func();
286
287 // Handle corner case wherein they've decided not to read anything after all.
288 if (target_payload_blob.size() == 0)
289 {
290 FLOW_LOG_TRACE("Target blob has been determined: no read should proceed after all; degenerate case. "
291 "Posting handler.");
292
293 finish(Error_code(), target_payload_blob);
294 return;
295 }
296 // else
297
298 FLOW_LOG_TRACE("Target blob has been determined: location @ [" << target_payload_blob.data() << "], "
299 "size [" << target_payload_blob.size() << "].");
300
301 /* Finally can read non-blockingly -- since we know there are probably bytes to read, and where to read them,
302 * and we haven't been canceled. Though, we'll have to run non_blocking() first to set that mode if needed.
303 * (All of this is thread-unsafe in various ways, but we talked about all that in our doc header.) */
304 if (!peer_socket.non_blocking()) // This is an ultra-fast flag check (I verified): no sys call, no throwing.
305 {
306 peer_socket.non_blocking(true, sys_err_code);
307 if (sys_err_code)
308 {
309 FLOW_LOG_WARNING("Wanted to nb-read to location @ [" << target_payload_blob.data() << "], "
310 "size [" << target_payload_blob.size() << "], and 1+ bytes are reportedly available, but "
311 "had to set non-blocking mode, and that op failed.");
312 FLOW_ERROR_SYS_ERROR_LOG_WARNING();
313
314 finish(sys_err_code, target_payload_blob);
315 return;
316 }
317 // else
318 assert(peer_socket.non_blocking());
319 // We will *not* "undo" for reasons explained in our doc header as of this writing.
320 }
321 } // if constexpr(TARGET_TBD)
322 // else if constexpr(!TARGET_TBD) { Don't touch target_payload_blob_func in this instance of the template. }
323
324 assert(target_payload_blob.size() != 0);
325 // Okay, can really read now.
326
327 const auto remaining_target_payload_blob = target_payload_blob + n_rcvd_so_far; // We still need the original.
328 assert(remaining_target_payload_blob.size() != 0);
329
330 const size_t n_rcvd = peer_socket.read_some(remaining_target_payload_blob, sys_err_code);
331 if (sys_err_code && (sys_err_code != boost::asio::error::would_block))
332 {
333 assert(n_rcvd == 0);
334 FLOW_LOG_WARNING("Wanted to nb-read to location @ [" << remaining_target_payload_blob.data() << "], "
335 "size [" << remaining_target_payload_blob.size() << "], and 1+ bytes are reportedly "
336 "available, but the nb-read failed (could be graceful disconnect; details below).");
337 FLOW_ERROR_SYS_ERROR_LOG_WARNING();
338
339 finish(sys_err_code, target_payload_blob);
340 return;
341 }
342 // else no error, and we've read [0, N] bytes.
343
344 if (n_rcvd == remaining_target_payload_blob.size()) // Then done and done.
345 {
346 assert(!sys_err_code);
347 FLOW_LOG_TRACE("Successfully nb-read all expected data to location "
348 "@ [" << remaining_target_payload_blob.data() << "], "
349 "size [" << remaining_target_payload_blob.size() << "]. Posting handler.");
350
351 finish(Error_code(), target_payload_blob);
352 return;
353 }
354 // else we've read [0, N - 1] bytes; must async-read the rest, because else would likely get would-block right now.
355
356 FLOW_LOG_TRACE("Successfully nb-read some (not all) expected data to "
357 "location @ [" << remaining_target_payload_blob.data() << "], "
358 "size [" << remaining_target_payload_blob.size() << "]. "
359 "Remaining: [" << (remaining_target_payload_blob.size() - n_rcvd) << "] bytes. "
360 "Will now async-read this (async-wait being first part of that).");
361
362 n_rcvd_so_far += n_rcvd; // See below.
363 peer_socket.async_wait
364 (Peer_socket::wait_read, [logger_ptr, peer_socket_ptr, target_payload_blob, n_rcvd_so_far,
365 on_rcvd_or_error = std::move(on_rcvd_or_error),
366 should_interrupt_func = std::move(should_interrupt_func)]
367 (const Error_code& async_err_code) mutable
368 {
369 on_wait_readable_or_error<false> // false => See just below for meaning.
370 (logger_ptr, async_err_code, peer_socket_ptr, std::move(should_interrupt_func), std::move(on_rcvd_or_error),
371 0, // false => Ignore this.
372 target_payload_blob, // false => Read into this place (past the bytes in next arg).
373 n_rcvd_so_far); // Keep making progress -- this was incremented above.
374 });
375} // on_wait_readable_or_error()
376
377} // namespace ipc::transport::asio_local_stream_socket
Additional (versus boost.asio) APIs for advanced work with local stream (Unix domain) sockets includi...
size_t nb_write_some_with_native_handle(flow::log::Logger *logger_ptr, Peer_socket *peer_socket_ptr, Native_handle payload_hndl, const util::Blob_const &payload_blob, Error_code *err_code)
boost.asio extension similar to peer_socket->non_blocking(true); auto n = peer_socket->write_some(pay...
Protocol::socket Peer_socket
Short-hand for boost.asio Unix domain peer stream-socket (usually-connected-or-empty guy).
void on_wait_readable_or_error(flow::log::Logger *logger_ptr, const Error_code &async_err_code, Peer_socket *peer_socket_ptr, Should_interrupt_func &&should_interrupt_func, Task_err_blob &&on_rcvd_or_error, Target_payload_blob_func &&target_payload_blob_func, util::Blob_mutable target_payload_blob, size_t n_rcvd_so_far)
Helper of async_read_with_target_func() containing its core (asynchronously) recursive implementation...
void on_wait_writable_or_error(flow::log::Logger *logger_ptr, const Error_code &sys_err_code, Native_handle payload_hndl, const util::Blob_const &payload_blob_ref, Peer_socket *peer_socket_ptr, Task_err &&on_sent_or_error)
Helper of async_write_with_native_handle() used as the callback executed when waiting for writability...
const uint8_t * blob_data(const Blob_const &blob)
Syntactic-sugary helper that returns pointer to first byte in an immutable buffer,...
Definition: util.cpp:156
boost::asio::mutable_buffer Blob_mutable
Short-hand for an mutable blob somewhere in memory, stored as exactly a void* and a size_t.
Definition: util_fwd.hpp:134
boost::asio::const_buffer Blob_const
Short-hand for an immutable blob somewhere in memory, stored as exactly a void const * and a size_t.
Definition: util_fwd.hpp:128
flow::Error_code Error_code
Short-hand for flow::Error_code which is very common.
Definition: common.hpp:297
A monolayer-thin wrapper around a native handle, a/k/a descriptor a/k/a FD.