23#include <flow/error/error.hpp> 
   30template<
typename Task_err>
 
   36                               Task_err&& on_sent_or_error)
 
   39  using flow::util::buffers_dump_string;
 
   40  using boost::asio::async_write;
 
   41  using boost::asio::bind_executor;
 
   42  using boost::asio::get_associated_executor;
 
   43  using boost::asio::post;
 
   45  assert(peer_socket_ptr);
 
   46  auto& peer_socket = *peer_socket_ptr;
 
   49  FLOW_LOG_SET_CONTEXT(logger_ptr, Log_component::S_TRANSPORT);
 
   50  FLOW_LOG_TRACE(
"Connected local peer socket was waiting to write from location @ [" << payload_blob.data() << 
"] " 
   51                 "plus native handle [" << payload_hndl << 
"]; " 
   52                 "handler called, either ready or error; will try to send if appropriate.");
 
   56    FLOW_ERROR_SYS_ERROR_LOG_WARNING();
 
   57    FLOW_LOG_WARNING(
"Connected local peer socket was waiting to write from " 
   58                     "location @ [" << payload_blob.data() << 
"] plus native handle [" << payload_hndl << 
"]; " 
   59                     "but an unrecoverable error occurred; will not retry; posting handler.");
 
   61    const auto executor = get_associated_executor(on_sent_or_error);
 
   62    post(peer_socket.get_executor(),
 
   63         bind_executor(executor,
 
   64                       [sys_err_code, on_sent_or_error = std::move(on_sent_or_error)]()
 
   66      on_sent_or_error(sys_err_code);
 
   74  assert (!sys_err_code);
 
   78  const size_t n_sent_or_zero
 
   81  if (n_sent_or_zero == 0)
 
   85    if (nb_err_code == boost::asio::error::would_block)
 
   87      FLOW_LOG_TRACE(
"Async wait indicates writability, yet write attempt indicated would-block; unusual but not " 
   88                     "an error condition; we will try again.");
 
   91      peer_socket.async_wait
 
   92        (Peer_socket::wait_write,
 
   93         [logger_ptr, payload_hndl, payload_blob, peer_socket_ptr,
 
   94          on_sent_or_error = std::move(on_sent_or_error)]
 
   98                                  std::move(on_sent_or_error));
 
  107    FLOW_LOG_WARNING(
"Connected local peer socket tried to write from " 
  108                     "location @ [" << payload_blob.data() << 
"] plus native handle [" << payload_hndl << 
"]; " 
  109                     "but an unrecoverable error occurred; will not retry; posting handler with error.");
 
  111    const auto executor = get_associated_executor(on_sent_or_error);
 
  112    post(peer_socket.get_executor(),
 
  113         bind_executor(executor,
 
  114                       [get_logger, get_log_component, nb_err_code, on_sent_or_error = std::move(on_sent_or_error)]()
 
  116      FLOW_LOG_TRACE(
"Handler started.");
 
  117      on_sent_or_error(nb_err_code);
 
  118      FLOW_LOG_TRACE(
"Handler finished.");
 
  125  assert (n_sent_or_zero > 0);
 
  127  const size_t orig_blob_size = payload_blob.size();
 
  128  payload_blob += n_sent_or_zero; 
 
  129  if (payload_blob.size() == 0)
 
  131    FLOW_LOG_TRACE(
"Blob fully sent; hence posting handler with success code.");
 
  133    const auto executor = get_associated_executor(on_sent_or_error);
 
  134    post(peer_socket.get_executor(),
 
  135         bind_executor(executor,
 
  136                       [get_logger, get_log_component, on_sent_or_error = std::move(on_sent_or_error)]()
 
  138      FLOW_LOG_TRACE(
"Handler started.");
 
  140      FLOW_LOG_TRACE(
"Handler finished.");
 
  148  FLOW_LOG_TRACE(
"Continuing: Via connected local peer socket, will *only* send remaining " 
  149                 "blob of size [" << payload_blob.size() << 
"] located @ [" << payload_blob.data() << 
"].");
 
  152  FLOW_LOG_DATA(
"Continuing: Blob contents are " 
  153                "[\n" << buffers_dump_string(payload_blob, 
"  ") << 
"].");
 
  173  async_write(peer_socket, payload_blob,
 
  174              [get_logger, get_log_component, 
 
  175               payload_blob, orig_blob_size, peer_socket_ptr,
 
  176               on_sent_or_error = std::move(on_sent_or_error)]
 
  179    FLOW_LOG_TRACE(
"Connected local peer socket tried to write from location @ [" << payload_blob.data() << 
"] " 
  180                   "plus NO native handle; handler called, either with success or error.");
 
  184      FLOW_ERROR_SYS_ERROR_LOG_WARNING();
 
  185      FLOW_LOG_WARNING(
"Connected local peer socket tried to write *only* blob " 
  186                       "located @ [" << payload_blob.data() << 
"]; " 
  187                       "but an unrecoverable error occurred; will not retry; posting handler with error.");
 
  198    const auto executor = get_associated_executor(on_sent_or_error);
 
  199    post(peer_socket_ptr->get_executor(),
 
  200         bind_executor(executor,
 
  201                       [get_logger, get_log_component,
 
  202                        sys_err_code, on_sent_or_error = std::move(on_sent_or_error)]()
 
  204      FLOW_LOG_TRACE(
"Handler started.");
 
  205      on_sent_or_error(sys_err_code);
 
  206      FLOW_LOG_TRACE(
"Handler finished.");
 
  212template<
bool TARGET_TBD,
 
  213         typename Task_err_blob, 
typename Target_payload_blob_func, 
typename Should_interrupt_func>
 
  216                               Should_interrupt_func&& should_interrupt_func,
 
  217                               Task_err_blob&& on_rcvd_or_error,
 
  218                               Target_payload_blob_func&& target_payload_blob_func,
 
  220                               size_t n_rcvd_so_far)
 
  223  using boost::asio::bind_executor;
 
  224  using boost::asio::get_associated_executor;
 
  225  using boost::asio::post;
 
  227  FLOW_LOG_SET_CONTEXT(logger_ptr, Log_component::S_TRANSPORT);
 
  228  if constexpr(TARGET_TBD)
 
  230    FLOW_LOG_TRACE(
"Connected local peer socket was waiting to read (1st time -- target buffer undetermined); " 
  231                   "handler called, either ready or error; will try to obtain target blob/receive if appropriate.");
 
  232    assert(n_rcvd_so_far == 0);
 
  236    FLOW_LOG_TRACE(
"Connected local peer socket was waiting to read (target buffer known); " 
  237                   "handler called, either ready or error; will try to receive if appropriate.  " 
  238                   "Buffer location: @ [" << target_payload_blob.data() << 
"], " 
  239                   "size [" << target_payload_blob.size() << 
"] (already received [" << n_rcvd_so_far << 
"]).");
 
  242  auto& peer_socket = *peer_socket_ptr;
 
  253  if (should_interrupt_func())
 
  255    FLOW_LOG_TRACE(
"Interrupted locally.  Not proceeding further; not invoking completion handler.");
 
  263    const auto executor = get_associated_executor(on_rcvd_or_error);
 
  264    post(peer_socket.get_executor(),
 
  265         bind_executor(executor,
 
  266                       [err_code, blob, on_rcvd_or_error = std::move(on_rcvd_or_error)]()
 
  268      on_rcvd_or_error(err_code, blob);
 
  272  auto sys_err_code = async_err_code; 
 
  275    FLOW_ERROR_SYS_ERROR_LOG_WARNING();
 
  281  if constexpr(TARGET_TBD)
 
  284    assert(target_payload_blob.size() == 0);
 
  285    target_payload_blob = target_payload_blob_func();
 
  288    if (target_payload_blob.size() == 0)
 
  290      FLOW_LOG_TRACE(
"Target blob has been determined: no read should proceed after all; degenerate case.  " 
  298    FLOW_LOG_TRACE(
"Target blob has been determined: location @ [" << target_payload_blob.data() << 
"], " 
  299                   "size [" << target_payload_blob.size() << 
"].");
 
  304    if (!peer_socket.non_blocking()) 
 
  306      peer_socket.non_blocking(
true, sys_err_code);
 
  309        FLOW_LOG_WARNING(
"Wanted to nb-read to location @ [" << target_payload_blob.data() << 
"], " 
  310                         "size [" << target_payload_blob.size() << 
"], and 1+ bytes are reportedly available, but " 
  311                         "had to set non-blocking mode, and that op failed.");
 
  312        FLOW_ERROR_SYS_ERROR_LOG_WARNING();
 
  314        finish(sys_err_code, target_payload_blob);
 
  318      assert(peer_socket.non_blocking());
 
  324  assert(target_payload_blob.size() != 0);
 
  327  const auto remaining_target_payload_blob = target_payload_blob + n_rcvd_so_far; 
 
  328  assert(remaining_target_payload_blob.size() != 0);
 
  330  const size_t n_rcvd = peer_socket.read_some(remaining_target_payload_blob, sys_err_code);
 
  331  if (sys_err_code && (sys_err_code != boost::asio::error::would_block))
 
  334    FLOW_LOG_WARNING(
"Wanted to nb-read to location @ [" << remaining_target_payload_blob.data() << 
"], " 
  335                     "size [" << remaining_target_payload_blob.size() << 
"], and 1+ bytes are reportedly " 
  336                     "available, but the nb-read failed (could be graceful disconnect; details below).");
 
  337    FLOW_ERROR_SYS_ERROR_LOG_WARNING();
 
  339    finish(sys_err_code, target_payload_blob);
 
  344  if (n_rcvd == remaining_target_payload_blob.size()) 
 
  346    assert(!sys_err_code);
 
  347    FLOW_LOG_TRACE(
"Successfully nb-read all expected data to location " 
  348                   "@ [" << remaining_target_payload_blob.data() << 
"], " 
  349                   "size [" << remaining_target_payload_blob.size() << 
"].  Posting handler.");
 
  356  FLOW_LOG_TRACE(
"Successfully nb-read some (not all) expected data to " 
  357                 "location @ [" << remaining_target_payload_blob.data() << 
"], " 
  358                 "size [" << remaining_target_payload_blob.size() << 
"].  " 
  359                 "Remaining: [" << (remaining_target_payload_blob.size() - n_rcvd) << 
"] bytes.  " 
  360                 "Will now async-read this (async-wait being first part of that).");
 
  362  n_rcvd_so_far += n_rcvd; 
 
  363  peer_socket.async_wait
 
  364    (Peer_socket::wait_read, [logger_ptr, peer_socket_ptr, target_payload_blob, n_rcvd_so_far,
 
  365                              on_rcvd_or_error = std::move(on_rcvd_or_error),
 
  366                              should_interrupt_func = std::move(should_interrupt_func)]
 
  369    on_wait_readable_or_error<false> 
 
  370      (logger_ptr, async_err_code, peer_socket_ptr, std::move(should_interrupt_func), std::move(on_rcvd_or_error),
 
Additional (versus boost.asio) APIs for advanced work with local stream (Unix domain) sockets includi...
size_t nb_write_some_with_native_handle(flow::log::Logger *logger_ptr, Peer_socket *peer_socket_ptr, Native_handle payload_hndl, const util::Blob_const &payload_blob, Error_code *err_code)
boost.asio extension similar to peer_socket->non_blocking(true); auto n = peer_socket->write_some(pay...
Protocol::socket Peer_socket
Short-hand for boost.asio Unix domain peer stream-socket (usually-connected-or-empty guy).
void on_wait_readable_or_error(flow::log::Logger *logger_ptr, const Error_code &async_err_code, Peer_socket *peer_socket_ptr, Should_interrupt_func &&should_interrupt_func, Task_err_blob &&on_rcvd_or_error, Target_payload_blob_func &&target_payload_blob_func, util::Blob_mutable target_payload_blob, size_t n_rcvd_so_far)
Helper of async_read_with_target_func() containing its core (asynchronously) recursive implementation...
void on_wait_writable_or_error(flow::log::Logger *logger_ptr, const Error_code &sys_err_code, Native_handle payload_hndl, const util::Blob_const &payload_blob_ref, Peer_socket *peer_socket_ptr, Task_err &&on_sent_or_error)
Helper of async_write_with_native_handle() used as the callback executed when waiting for writability...
const uint8_t * blob_data(const Blob_const &blob)
Syntactic-sugary helper that returns pointer to first byte in an immutable buffer,...
boost::asio::mutable_buffer Blob_mutable
Short-hand for an mutable blob somewhere in memory, stored as exactly a void* and a size_t.
boost::asio::const_buffer Blob_const
Short-hand for an immutable blob somewhere in memory, stored as exactly a void const * and a size_t.
flow::Error_code Error_code
Short-hand for flow::Error_code which is very common.
A monolayer-thin wrapper around a native handle, a/k/a descriptor a/k/a FD.