21#include "flow/error/error.hpp" 
   22#include <boost/move/make_unique.hpp> 
   34  flow::log::Log_context(sync_io_core_moved.get_logger(), 
Log_component::S_TRANSPORT),
 
   36  m_worker(boost::movelib::make_unique<flow::async::Single_thread_task_loop>
 
   37             (sync_io_core_moved.get_logger(), sync_io_core_moved.nickname())),
 
   40  m_sync_io(std::move(sync_io_core_moved))
 
   52                                         { 
return Asio_waitable_native_handle(*(
m_worker->task_engine())); });
 
   53  assert(ok && 
"Did you break contract by passing-in a non-fresh sync_io core object to ctor?");
 
   58  FLOW_LOG_TRACE(
"Socket stream [" << *
this << 
"]: Created (NULL state).");
 
   73  using flow::util::ostream_op_string;
 
   77  const auto log_pfx = ostream_op_string(
"Socket stream [", *
this, 
']');
 
   81  FLOW_LOG_TRACE(
"Socket stream [" << *
this << 
"]: Created (PEER state) directly from pre-opened native handle.");
 
   86  Impl(std::move(sync_io_core_in_peer_state_moved), nullptr)
 
   88  using flow::util::ostream_op_string;
 
   92  const auto log_pfx = ostream_op_string(
"Socket stream [", *
this, 
']');
 
   96  FLOW_LOG_TRACE(
"Socket stream [" << *
this << 
"]: " 
   97                 "Created (PEER state) by adopting fresh sync_io::Native_socket_stream core.");
 
  102  using flow::async::Single_thread_task_loop;
 
  103  using flow::util::ostream_op_string;
 
  107  FLOW_LOG_INFO(
"Socket stream [" << *
this << 
"]: Shutting down.  All our " 
  108                "internal async handlers will be canceled; and worker thread will be joined.");
 
  156  FLOW_LOG_INFO(
"Socket stream [" << *
this << 
"]: Continuing shutdown.  Next we will run pending handlers from some " 
  157                "other thread.  In this user thread we will await those handlers' completion and then return.");
 
  159  Single_thread_task_loop one_thread(get_logger(), ostream_op_string(
nickname(), 
"-temp_deinit"));
 
  160  one_thread.start([&]()
 
  162    FLOW_LOG_INFO(
"Socket stream [" << *
this << 
"]: " 
  163                  "In transient finisher thread: Shall run all pending internal handlers (typically none).");
 
  165    const auto task_engine = m_worker->task_engine();
 
  166    task_engine->restart();
 
  167    const auto count = task_engine->poll();
 
  170      FLOW_LOG_INFO(
"Socket stream [" << *
this << 
"]: " 
  171                    "In transient finisher thread: Ran [" << count << 
"] internal handlers after all.");
 
  177    FLOW_LOG_INFO(
"Transient finisher exiting.  (Send-ops and receive-ops de-init may follow.)");
 
  184  using flow::util::ostream_op_string;
 
  186  FLOW_ERROR_EXEC_AND_THROW_ON_ERROR(
bool, 
sync_connect, absolute_name, _1);
 
  189  if (!m_sync_io.sync_connect(absolute_name, err_code))
 
  198    const auto log_pfx = ostream_op_string(
"Socket stream [", *
this, 
']');
 
  199    m_snd_sync_io_adapter.emplace(get_logger(), log_pfx, m_worker.get(), &m_sync_io);
 
  200    m_rcv_sync_io_adapter.emplace(get_logger(), log_pfx, m_worker.get(), &m_sync_io);
 
  210  return m_sync_io.remote_peer_process_credentials(err_code);
 
  215  return m_sync_io.nickname();
 
  220  return os << 
'[' << val.
nickname() << 
"]@" << 
static_cast<const void*
>(&val);
 
  227  return m_snd_sync_io_adapter
 
  228           ? (m_snd_sync_io_adapter->send_blob(blob, err_code), 
true) 
 
  235  return m_snd_sync_io_adapter
 
  236           ? (m_snd_sync_io_adapter->send_native_handle(hndl, meta_blob, err_code), 
true) 
 
  242  using flow::async::Task_asio_err;
 
  249  return m_snd_sync_io_adapter
 
  250           ? m_snd_sync_io_adapter->async_end_sending(std::move(on_done_func))
 
  256  return m_snd_sync_io_adapter
 
  257           ? m_snd_sync_io_adapter->auto_ping(period)
 
  268  return m_sync_io.send_blob_max_size();
 
  275                                                             flow::async::Task_asio_err_sz&& on_done_func)
 
  277  return m_rcv_sync_io_adapter
 
  278           ? (m_rcv_sync_io_adapter->async_receive_native_handle
 
  279                (target_hndl, target_meta_blob, std::move(on_done_func)),
 
  285                                                    flow::async::Task_asio_err_sz&& on_done_func)
 
  287  return m_rcv_sync_io_adapter
 
  288           ? (m_rcv_sync_io_adapter->async_receive_blob(target_blob, std::move(on_done_func)), 
true) 
 
  294  return m_rcv_sync_io_adapter
 
  295           ? m_rcv_sync_io_adapter->idle_timer_run(timeout)
 
  306  return m_sync_io.receive_blob_max_size();
 
  314  using flow::util::ostream_op_string;
 
  316  FLOW_LOG_TRACE(
"Socket stream [" << *
this << 
"]: Releasing idle-state object to new socket-stream core object.");
 
  320  assert(m_snd_sync_io_adapter && m_rcv_sync_io_adapter && 
"By contract must be in PEER state.");
 
  327  auto core = m_sync_io.release();
 
  334  auto prev_worker = std::move(m_worker);
 
  338    = boost::movelib::make_unique<flow::async::Single_thread_task_loop>
 
  339        (core.get_logger(), core.nickname());
 
  346  m_sync_io.replace_event_wait_handles([
this]() -> Asio_waitable_native_handle
 
  347                                         { 
return Asio_waitable_native_handle(*(m_worker->task_engine())); });
 
  348  assert(ok && 
"Should work if m_sync_io.release() worked as advertised.");
 
  357  m_rcv_sync_io_adapter.reset();
 
  358  m_snd_sync_io_adapter.reset();
 
Internal, non-movable pImpl implementation of Native_socket_stream class.
size_t send_blob_max_size() const
See Native_socket_stream counterpart.
~Impl()
See Native_socket_stream counterpart.
bool send_blob(const util::Blob_const &blob, Error_code *err_code)
See Native_socket_stream counterpart.
bool idle_timer_run(util::Fine_duration timeout)
See Native_socket_stream counterpart.
size_t send_meta_blob_max_size() const
See Native_socket_stream counterpart.
bool end_sending()
See Native_socket_stream counterpart.
bool sync_connect(const Shared_name &absolute_name, Error_code *err_code)
See Native_socket_stream counterpart.
bool async_receive_native_handle(Native_handle *target_hndl, const util::Blob_mutable &target_meta_blob, flow::async::Task_asio_err_sz &&on_done_func)
See Native_socket_stream counterpart.
util::Process_credentials remote_peer_process_credentials(Error_code *err_code) const
See Native_socket_stream counterpart.
boost::movelib::unique_ptr< flow::async::Single_thread_task_loop > m_worker
Single-thread worker pool for all internal async work in both directions.
size_t receive_blob_max_size() const
See Native_socket_stream counterpart.
bool async_receive_blob(const util::Blob_mutable &target_blob, flow::async::Task_asio_err_sz &&on_done_func)
See Native_socket_stream counterpart.
sync_io::Native_socket_stream m_sync_io
The core Native_socket_stream engine, implementing the sync_io pattern (see util::sync_io doc header)...
bool send_native_handle(Native_handle hndl_or_null, const util::Blob_const &meta_blob, Error_code *err_code)
See Native_socket_stream counterpart.
std::optional< sync_io::Async_adapter_receiver< decltype(m_sync_io)> > m_rcv_sync_io_adapter
Null until PEER state, this handles ~all receive-ops logic in that state.
const std::string & nickname() const
See Native_socket_stream counterpart.
bool async_end_sending(flow::async::Task_asio_err &&on_done_func)
See Native_socket_stream counterpart.
std::optional< sync_io::Async_adapter_sender< decltype(m_sync_io)> > m_snd_sync_io_adapter
Null until PEER state, this handles ~all send-ops logic in that state.
Impl(flow::log::Logger *logger_ptr, util::String_view nickname_str)
See Native_socket_stream counterpart.
bool auto_ping(util::Fine_duration period)
See Native_socket_stream counterpart.
size_t receive_meta_blob_max_size() const
See Native_socket_stream counterpart.
Implements both Native_handle_sender and Native_handle_receiver concepts by using a stream-oriented U...
size_t send_blob_max_size() const
Implements Blob_sender API per contract.
const std::string & nickname() const
Returns nickname, a brief string suitable for logging.
bool sync_connect(const Shared_name &absolute_name, Error_code *err_code=0)
To be invoked in NULL state only, it synchronously and non-blockingly attempts to connect to an oppos...
bool async_end_sending(Task_err &&on_done_func)
Implements Native_handle_sender, Blob_sender API per contract.
friend std::ostream & operator<<(std::ostream &os, const Native_socket_stream &val)
Friend of Native_socket_stream.
size_t receive_blob_max_size() const
Implements Blob_receiver API per contract.
Implements both sync_io::Native_handle_sender and sync_io::Native_handle_receiver concepts by using a...
bool replace_event_wait_handles(const Create_ev_wait_hndl_func &create_ev_wait_hndl_func)
Implements Native_handle_sender and Native_handle_receiver APIs at the same time, per their concept c...
A process's credentials (PID, UID, GID as of this writing).
String-wrapping abstraction representing a name uniquely distinguishing a kernel-persistent entity fr...
Useful if using the sync_io pattern within a user event loop built on boost.asio (optionally with flo...
Flow-IPC module providing transmission of structured messages and/or low-level blobs (and more) betwe...
boost::shared_ptr< Task > Task_ptr
Short-hand for ref-counted pointer to a Function<> that takes no arguments and returns nothing; in pa...
boost::asio::mutable_buffer Blob_mutable
Short-hand for an mutable blob somewhere in memory, stored as exactly a void* and a size_t.
flow::Fine_duration Fine_duration
Short-hand for Flow's Fine_duration.
boost::asio::const_buffer Blob_const
Short-hand for an immutable blob somewhere in memory, stored as exactly a void const * and a size_t.
flow::util::String_view String_view
Short-hand for Flow's String_view.
Log_component
The flow::log::Component payload enumeration containing various log components used by Flow-IPC inter...
flow::Error_code Error_code
Short-hand for flow::Error_code which is very common.
A monolayer-thin wrapper around a native handle, a/k/a descriptor a/k/a FD.