23#include <flow/error/error.hpp>
30template<
typename Task_err>
36 Task_err&& on_sent_or_error)
39 using flow::util::buffers_dump_string;
40 using boost::asio::async_write;
41 using boost::asio::bind_executor;
42 using boost::asio::get_associated_executor;
43 using boost::asio::post;
45 assert(peer_socket_ptr);
46 auto& peer_socket = *peer_socket_ptr;
49 FLOW_LOG_SET_CONTEXT(logger_ptr, Log_component::S_TRANSPORT);
50 FLOW_LOG_TRACE(
"Connected local peer socket was waiting to write from location @ [" << payload_blob.data() <<
"] "
51 "plus native handle [" << payload_hndl <<
"]; "
52 "handler called, either ready or error; will try to send if appropriate.");
56 FLOW_ERROR_SYS_ERROR_LOG_WARNING();
57 FLOW_LOG_WARNING(
"Connected local peer socket was waiting to write from "
58 "location @ [" << payload_blob.data() <<
"] plus native handle [" << payload_hndl <<
"]; "
59 "but an unrecoverable error occurred; will not retry; posting handler.");
61 const auto executor = get_associated_executor(on_sent_or_error);
62 post(peer_socket.get_executor(),
63 bind_executor(executor,
64 [sys_err_code, on_sent_or_error = std::move(on_sent_or_error)]()
66 on_sent_or_error(sys_err_code);
74 assert (!sys_err_code);
78 const size_t n_sent_or_zero
81 if (n_sent_or_zero == 0)
85 if (nb_err_code == boost::asio::error::would_block)
87 FLOW_LOG_TRACE(
"Async wait indicates writability, yet write attempt indicated would-block; unusual but not "
88 "an error condition; we will try again.");
91 peer_socket.async_wait
92 (Peer_socket::wait_write,
93 [logger_ptr, payload_hndl, payload_blob, peer_socket_ptr,
94 on_sent_or_error = std::move(on_sent_or_error)]
98 std::move(on_sent_or_error));
107 FLOW_LOG_WARNING(
"Connected local peer socket tried to write from "
108 "location @ [" << payload_blob.data() <<
"] plus native handle [" << payload_hndl <<
"]; "
109 "but an unrecoverable error occurred; will not retry; posting handler with error.");
111 const auto executor = get_associated_executor(on_sent_or_error);
112 post(peer_socket.get_executor(),
113 bind_executor(executor,
114 [get_logger, get_log_component, nb_err_code, on_sent_or_error = std::move(on_sent_or_error)]()
116 FLOW_LOG_TRACE(
"Handler started.");
117 on_sent_or_error(nb_err_code);
118 FLOW_LOG_TRACE(
"Handler finished.");
125 assert (n_sent_or_zero > 0);
127 const size_t orig_blob_size = payload_blob.size();
128 payload_blob += n_sent_or_zero;
129 if (payload_blob.size() == 0)
131 FLOW_LOG_TRACE(
"Blob fully sent; hence posting handler with success code.");
133 const auto executor = get_associated_executor(on_sent_or_error);
134 post(peer_socket.get_executor(),
135 bind_executor(executor,
136 [get_logger, get_log_component, on_sent_or_error = std::move(on_sent_or_error)]()
138 FLOW_LOG_TRACE(
"Handler started.");
140 FLOW_LOG_TRACE(
"Handler finished.");
148 FLOW_LOG_TRACE(
"Continuing: Via connected local peer socket, will *only* send remaining "
149 "blob of size [" << payload_blob.size() <<
"] located @ [" << payload_blob.data() <<
"].");
152 FLOW_LOG_DATA(
"Continuing: Blob contents are "
153 "[\n" << buffers_dump_string(payload_blob,
" ") <<
"].");
173 async_write(peer_socket, payload_blob,
174 [get_logger, get_log_component,
175 payload_blob, orig_blob_size, peer_socket_ptr,
176 on_sent_or_error = std::move(on_sent_or_error)]
179 FLOW_LOG_TRACE(
"Connected local peer socket tried to write from location @ [" << payload_blob.data() <<
"] "
180 "plus NO native handle; handler called, either with success or error.");
184 FLOW_ERROR_SYS_ERROR_LOG_WARNING();
185 FLOW_LOG_WARNING(
"Connected local peer socket tried to write *only* blob "
186 "located @ [" << payload_blob.data() <<
"]; "
187 "but an unrecoverable error occurred; will not retry; posting handler with error.");
198 const auto executor = get_associated_executor(on_sent_or_error);
199 post(peer_socket_ptr->get_executor(),
200 bind_executor(executor,
201 [get_logger, get_log_component,
202 sys_err_code, on_sent_or_error = std::move(on_sent_or_error)]()
204 FLOW_LOG_TRACE(
"Handler started.");
205 on_sent_or_error(sys_err_code);
206 FLOW_LOG_TRACE(
"Handler finished.");
212template<
bool TARGET_TBD,
213 typename Task_err_blob,
typename Target_payload_blob_func,
typename Should_interrupt_func>
216 Should_interrupt_func&& should_interrupt_func,
217 Task_err_blob&& on_rcvd_or_error,
218 Target_payload_blob_func&& target_payload_blob_func,
220 size_t n_rcvd_so_far)
223 using boost::asio::bind_executor;
224 using boost::asio::get_associated_executor;
225 using boost::asio::post;
227 FLOW_LOG_SET_CONTEXT(logger_ptr, Log_component::S_TRANSPORT);
228 if constexpr(TARGET_TBD)
230 FLOW_LOG_TRACE(
"Connected local peer socket was waiting to read (1st time -- target buffer undetermined); "
231 "handler called, either ready or error; will try to obtain target blob/receive if appropriate.");
232 assert(n_rcvd_so_far == 0);
236 FLOW_LOG_TRACE(
"Connected local peer socket was waiting to read (target buffer known); "
237 "handler called, either ready or error; will try to receive if appropriate. "
238 "Buffer location: @ [" << target_payload_blob.data() <<
"], "
239 "size [" << target_payload_blob.size() <<
"] (already received [" << n_rcvd_so_far <<
"]).");
242 auto& peer_socket = *peer_socket_ptr;
253 if (should_interrupt_func())
255 FLOW_LOG_TRACE(
"Interrupted locally. Not proceeding further; not invoking completion handler.");
263 const auto executor = get_associated_executor(on_rcvd_or_error);
264 post(peer_socket.get_executor(),
265 bind_executor(executor,
266 [err_code, blob, on_rcvd_or_error = std::move(on_rcvd_or_error)]()
268 on_rcvd_or_error(err_code, blob);
272 auto sys_err_code = async_err_code;
275 FLOW_ERROR_SYS_ERROR_LOG_WARNING();
281 if constexpr(TARGET_TBD)
284 assert(target_payload_blob.size() == 0);
285 target_payload_blob = target_payload_blob_func();
288 if (target_payload_blob.size() == 0)
290 FLOW_LOG_TRACE(
"Target blob has been determined: no read should proceed after all; degenerate case. "
298 FLOW_LOG_TRACE(
"Target blob has been determined: location @ [" << target_payload_blob.data() <<
"], "
299 "size [" << target_payload_blob.size() <<
"].");
304 if (!peer_socket.non_blocking())
306 peer_socket.non_blocking(
true, sys_err_code);
309 FLOW_LOG_WARNING(
"Wanted to nb-read to location @ [" << target_payload_blob.data() <<
"], "
310 "size [" << target_payload_blob.size() <<
"], and 1+ bytes are reportedly available, but "
311 "had to set non-blocking mode, and that op failed.");
312 FLOW_ERROR_SYS_ERROR_LOG_WARNING();
314 finish(sys_err_code, target_payload_blob);
318 assert(peer_socket.non_blocking());
324 assert(target_payload_blob.size() != 0);
327 const auto remaining_target_payload_blob = target_payload_blob + n_rcvd_so_far;
328 assert(remaining_target_payload_blob.size() != 0);
330 const size_t n_rcvd = peer_socket.read_some(remaining_target_payload_blob, sys_err_code);
331 if (sys_err_code && (sys_err_code != boost::asio::error::would_block))
334 FLOW_LOG_WARNING(
"Wanted to nb-read to location @ [" << remaining_target_payload_blob.data() <<
"], "
335 "size [" << remaining_target_payload_blob.size() <<
"], and 1+ bytes are reportedly "
336 "available, but the nb-read failed (could be graceful disconnect; details below).");
337 FLOW_ERROR_SYS_ERROR_LOG_WARNING();
339 finish(sys_err_code, target_payload_blob);
344 if (n_rcvd == remaining_target_payload_blob.size())
346 assert(!sys_err_code);
347 FLOW_LOG_TRACE(
"Successfully nb-read all expected data to location "
348 "@ [" << remaining_target_payload_blob.data() <<
"], "
349 "size [" << remaining_target_payload_blob.size() <<
"]. Posting handler.");
356 FLOW_LOG_TRACE(
"Successfully nb-read some (not all) expected data to "
357 "location @ [" << remaining_target_payload_blob.data() <<
"], "
358 "size [" << remaining_target_payload_blob.size() <<
"]. "
359 "Remaining: [" << (remaining_target_payload_blob.size() - n_rcvd) <<
"] bytes. "
360 "Will now async-read this (async-wait being first part of that).");
362 n_rcvd_so_far += n_rcvd;
363 peer_socket.async_wait
364 (Peer_socket::wait_read, [logger_ptr, peer_socket_ptr, target_payload_blob, n_rcvd_so_far,
365 on_rcvd_or_error = std::move(on_rcvd_or_error),
366 should_interrupt_func = std::move(should_interrupt_func)]
369 on_wait_readable_or_error<false>
370 (logger_ptr, async_err_code, peer_socket_ptr, std::move(should_interrupt_func), std::move(on_rcvd_or_error),
Additional (versus boost.asio) APIs for advanced work with local stream (Unix domain) sockets includi...
size_t nb_write_some_with_native_handle(flow::log::Logger *logger_ptr, Peer_socket *peer_socket_ptr, Native_handle payload_hndl, const util::Blob_const &payload_blob, Error_code *err_code)
boost.asio extension similar to peer_socket->non_blocking(true); auto n = peer_socket->write_some(pay...
Protocol::socket Peer_socket
Short-hand for boost.asio Unix domain peer stream-socket (usually-connected-or-empty guy).
void on_wait_readable_or_error(flow::log::Logger *logger_ptr, const Error_code &async_err_code, Peer_socket *peer_socket_ptr, Should_interrupt_func &&should_interrupt_func, Task_err_blob &&on_rcvd_or_error, Target_payload_blob_func &&target_payload_blob_func, util::Blob_mutable target_payload_blob, size_t n_rcvd_so_far)
Helper of async_read_with_target_func() containing its core (asynchronously) recursive implementation...
void on_wait_writable_or_error(flow::log::Logger *logger_ptr, const Error_code &sys_err_code, Native_handle payload_hndl, const util::Blob_const &payload_blob_ref, Peer_socket *peer_socket_ptr, Task_err &&on_sent_or_error)
Helper of async_write_with_native_handle() used as the callback executed when waiting for writability...
const uint8_t * blob_data(const Blob_const &blob)
Syntactic-sugary helper that returns pointer to first byte in an immutable buffer,...
boost::asio::mutable_buffer Blob_mutable
Short-hand for an mutable blob somewhere in memory, stored as exactly a void* and a size_t.
boost::asio::const_buffer Blob_const
Short-hand for an immutable blob somewhere in memory, stored as exactly a void const * and a size_t.
flow::Error_code Error_code
Short-hand for flow::Error_code which is very common.
A monolayer-thin wrapper around a native handle, a/k/a descriptor a/k/a FD.