21#include "flow/error/error.hpp"
22#include <boost/move/make_unique.hpp>
34 flow::log::Log_context(sync_io_core_moved.get_logger(),
Log_component::S_TRANSPORT),
36 m_worker(boost::movelib::make_unique<flow::async::Single_thread_task_loop>
37 (sync_io_core_moved.get_logger(), sync_io_core_moved.nickname())),
40 m_sync_io(std::move(sync_io_core_moved))
52 {
return Asio_waitable_native_handle(*(
m_worker->task_engine())); });
53 assert(ok &&
"Did you break contract by passing-in a non-fresh sync_io core object to ctor?");
58 FLOW_LOG_TRACE(
"Socket stream [" << *
this <<
"]: Created (NULL state).");
73 using flow::util::ostream_op_string;
77 const auto log_pfx = ostream_op_string(
"Socket stream [", *
this,
']');
81 FLOW_LOG_TRACE(
"Socket stream [" << *
this <<
"]: Created (PEER state) directly from pre-opened native handle.");
86 Impl(std::move(sync_io_core_in_peer_state_moved), nullptr)
88 using flow::util::ostream_op_string;
92 const auto log_pfx = ostream_op_string(
"Socket stream [", *
this,
']');
96 FLOW_LOG_TRACE(
"Socket stream [" << *
this <<
"]: "
97 "Created (PEER state) by adopting fresh sync_io::Native_socket_stream core.");
102 using flow::async::Single_thread_task_loop;
103 using flow::util::ostream_op_string;
107 FLOW_LOG_INFO(
"Socket stream [" << *
this <<
"]: Shutting down. All our "
108 "internal async handlers will be canceled; and worker thread will be joined.");
156 FLOW_LOG_INFO(
"Socket stream [" << *
this <<
"]: Continuing shutdown. Next we will run pending handlers from some "
157 "other thread. In this user thread we will await those handlers' completion and then return.");
159 Single_thread_task_loop one_thread(get_logger(), ostream_op_string(
nickname(),
"-temp_deinit"));
160 one_thread.start([&]()
162 FLOW_LOG_INFO(
"Socket stream [" << *
this <<
"]: "
163 "In transient finisher thread: Shall run all pending internal handlers (typically none).");
165 const auto task_engine = m_worker->task_engine();
166 task_engine->restart();
167 const auto count = task_engine->poll();
170 FLOW_LOG_INFO(
"Socket stream [" << *
this <<
"]: "
171 "In transient finisher thread: Ran [" << count <<
"] internal handlers after all.");
177 FLOW_LOG_INFO(
"Transient finisher exiting. (Send-ops and receive-ops de-init may follow.)");
184 using flow::util::ostream_op_string;
187 flow::util::bind_ns::cref(absolute_name), _1);
190 if (!m_sync_io.sync_connect(absolute_name, err_code))
199 const auto log_pfx = ostream_op_string(
"Socket stream [", *
this,
']');
200 m_snd_sync_io_adapter.emplace(get_logger(), log_pfx, m_worker.get(), &m_sync_io);
201 m_rcv_sync_io_adapter.emplace(get_logger(), log_pfx, m_worker.get(), &m_sync_io);
211 return m_sync_io.remote_peer_process_credentials(err_code);
216 return m_sync_io.nickname();
221 return os <<
'[' << val.
nickname() <<
"]@" <<
static_cast<const void*
>(&val);
228 return m_snd_sync_io_adapter
229 ? (m_snd_sync_io_adapter->send_blob(blob, err_code),
true)
236 return m_snd_sync_io_adapter
237 ? (m_snd_sync_io_adapter->send_native_handle(hndl, meta_blob, err_code),
true)
243 using flow::async::Task_asio_err;
250 return m_snd_sync_io_adapter
251 ? m_snd_sync_io_adapter->async_end_sending(std::move(on_done_func))
257 return m_snd_sync_io_adapter
258 ? m_snd_sync_io_adapter->auto_ping(period)
269 return m_sync_io.send_blob_max_size();
276 flow::async::Task_asio_err_sz&& on_done_func)
278 return m_rcv_sync_io_adapter
279 ? (m_rcv_sync_io_adapter->async_receive_native_handle
280 (target_hndl, target_meta_blob, std::move(on_done_func)),
286 flow::async::Task_asio_err_sz&& on_done_func)
288 return m_rcv_sync_io_adapter
289 ? (m_rcv_sync_io_adapter->async_receive_blob(target_blob, std::move(on_done_func)),
true)
295 return m_rcv_sync_io_adapter
296 ? m_rcv_sync_io_adapter->idle_timer_run(timeout)
307 return m_sync_io.receive_blob_max_size();
315 using flow::util::ostream_op_string;
317 FLOW_LOG_TRACE(
"Socket stream [" << *
this <<
"]: Releasing idle-state object to new socket-stream core object.");
321 assert(m_snd_sync_io_adapter && m_rcv_sync_io_adapter &&
"By contract must be in PEER state.");
328 auto core = m_sync_io.release();
335 auto prev_worker = std::move(m_worker);
339 = boost::movelib::make_unique<flow::async::Single_thread_task_loop>
340 (core.get_logger(), core.nickname());
347 m_sync_io.replace_event_wait_handles([
this]() -> Asio_waitable_native_handle
348 {
return Asio_waitable_native_handle(*(m_worker->task_engine())); });
349 assert(ok &&
"Should work if m_sync_io.release() worked as advertised.");
358 m_rcv_sync_io_adapter.reset();
359 m_snd_sync_io_adapter.reset();
Internal, non-movable pImpl implementation of Native_socket_stream class.
size_t send_blob_max_size() const
See Native_socket_stream counterpart.
~Impl()
See Native_socket_stream counterpart.
bool send_blob(const util::Blob_const &blob, Error_code *err_code)
See Native_socket_stream counterpart.
bool idle_timer_run(util::Fine_duration timeout)
See Native_socket_stream counterpart.
size_t send_meta_blob_max_size() const
See Native_socket_stream counterpart.
bool end_sending()
See Native_socket_stream counterpart.
bool sync_connect(const Shared_name &absolute_name, Error_code *err_code)
See Native_socket_stream counterpart.
bool async_receive_native_handle(Native_handle *target_hndl, const util::Blob_mutable &target_meta_blob, flow::async::Task_asio_err_sz &&on_done_func)
See Native_socket_stream counterpart.
util::Process_credentials remote_peer_process_credentials(Error_code *err_code) const
See Native_socket_stream counterpart.
boost::movelib::unique_ptr< flow::async::Single_thread_task_loop > m_worker
Single-thread worker pool for all internal async work in both directions.
size_t receive_blob_max_size() const
See Native_socket_stream counterpart.
bool async_receive_blob(const util::Blob_mutable &target_blob, flow::async::Task_asio_err_sz &&on_done_func)
See Native_socket_stream counterpart.
sync_io::Native_socket_stream m_sync_io
The core Native_socket_stream engine, implementing the sync_io pattern (see util::sync_io doc header)...
bool send_native_handle(Native_handle hndl_or_null, const util::Blob_const &meta_blob, Error_code *err_code)
See Native_socket_stream counterpart.
std::optional< sync_io::Async_adapter_receiver< decltype(m_sync_io)> > m_rcv_sync_io_adapter
Null until PEER state, this handles ~all receive-ops logic in that state.
const std::string & nickname() const
See Native_socket_stream counterpart.
bool async_end_sending(flow::async::Task_asio_err &&on_done_func)
See Native_socket_stream counterpart.
std::optional< sync_io::Async_adapter_sender< decltype(m_sync_io)> > m_snd_sync_io_adapter
Null until PEER state, this handles ~all send-ops logic in that state.
Impl(flow::log::Logger *logger_ptr, util::String_view nickname_str)
See Native_socket_stream counterpart.
bool auto_ping(util::Fine_duration period)
See Native_socket_stream counterpart.
size_t receive_meta_blob_max_size() const
See Native_socket_stream counterpart.
Implements both Native_handle_sender and Native_handle_receiver concepts by using a stream-oriented U...
size_t send_blob_max_size() const
Implements Blob_sender API per contract.
const std::string & nickname() const
Returns nickname, a brief string suitable for logging.
bool async_end_sending(Task_err &&on_done_func)
Implements Native_handle_sender, Blob_sender API per contract.
friend std::ostream & operator<<(std::ostream &os, const Native_socket_stream &val)
Friend of Native_socket_stream.
size_t receive_blob_max_size() const
Implements Blob_receiver API per contract.
Implements both sync_io::Native_handle_sender and sync_io::Native_handle_receiver concepts by using a...
bool replace_event_wait_handles(const Create_ev_wait_hndl_func &create_ev_wait_hndl_func)
Implements Native_handle_sender and Native_handle_receiver APIs at the same time, per their concept c...
A process's credentials (PID, UID, GID as of this writing).
String-wrapping abstraction representing a name uniquely distinguishing a kernel-persistent entity fr...
Useful if using the sync_io pattern within a user event loop built on boost.asio (optionally with flo...
Flow-IPC module providing transmission of structured messages and/or low-level blobs (and more) betwe...
boost::shared_ptr< Task > Task_ptr
Short-hand for ref-counted pointer to a Function<> that takes no arguments and returns nothing; in pa...
boost::asio::mutable_buffer Blob_mutable
Short-hand for an mutable blob somewhere in memory, stored as exactly a void* and a size_t.
flow::Fine_duration Fine_duration
Short-hand for Flow's Fine_duration.
boost::asio::const_buffer Blob_const
Short-hand for an immutable blob somewhere in memory, stored as exactly a void const * and a size_t.
flow::util::String_view String_view
Short-hand for Flow's String_view.
Log_component
The flow::log::Component payload enumeration containing various log components used by Flow-IPC inter...
flow::Error_code Error_code
Short-hand for flow::Error_code which is very common.
A monolayer-thin wrapper around a native handle, a/k/a descriptor a/k/a FD.