24#include <flow/log/log.hpp>
25#include <flow/async/single_thread_task_loop.hpp>
72template<
typename Core_t>
74 public flow::log::Log_context,
75 private boost::noncopyable
101 flow::async::Single_thread_task_loop* worker,
Core* sync_io);
198 mutable flow::util::Mutex_non_recursive
m_mutex;
218template<
typename Core_t>
221 flow::async::Single_thread_task_loop* worker,
223 flow::log::Log_context(logger_ptr,
Log_component::S_TRANSPORT),
230 using flow::util::Lock_guard;
246 m_sync_io.start_send_blob_ops([
this](Asio_waitable_native_handle* hndl_of_interest,
247 bool ev_of_interest_snd_else_rcv,
253 FLOW_LOG_TRACE(
m_log_pfx <<
": Sync-IO send-ops event-wait request: "
254 "descriptor [" << hndl_of_interest->native_handle() <<
"], "
255 "writable-else-readable [" << ev_of_interest_snd_else_rcv <<
"].");
258 assert(hndl_of_interest);
259 hndl_of_interest->async_wait(ev_of_interest_snd_else_rcv
260 ? Asio_waitable_native_handle::Base::wait_write
261 : Asio_waitable_native_handle::Base::wait_read,
262 [
this, on_active_ev_func = std::move(on_active_ev_func)]
267 if (err_code == boost::asio::error::operation_aborted)
282 (*on_active_ev_func)();
291template<
typename Core_t>
294 using flow::async::Single_thread_task_loop;
295 using flow::util::ostream_op_string;
305 if (!m_end_sending_on_done_func_or_empty.empty())
307 Single_thread_task_loop one_thread(get_logger(), ostream_op_string(m_log_pfx,
"-snd-temp_deinit"));
308 one_thread.start([&]()
310 FLOW_LOG_INFO(m_log_pfx <<
": In transient snd-finisher thread: "
311 "Shall run pending graceful-sends-close completion handler.");
313 FLOW_LOG_INFO(
"Transient finisher exiting.");
318template<
typename Core_t>
321 using flow::util::Lock_guard;
328 Lock_guard<
decltype(m_mutex)> lock(m_mutex);
332 m_sync_io.send_blob(blob, err_code);
333 assert(ok &&
"We are by definition in PEER state, and ctor starts send-ops; so that should never "
334 "return false. Bug somewhere?");
337template<
typename Core_t>
341 using flow::util::Lock_guard;
357 Lock_guard<
decltype(m_mutex)> lock(m_mutex);
361 m_sync_io.send_native_handle(hndl, meta_blob, err_code);
362 assert(ok &&
"We are by definition in PEER state, and ctor starts send-ops; so that should never "
363 "return false. Bug somewhere?");
366template<
typename Core_t>
369 using flow::util::Lock_guard;
376 Lock_guard<
decltype(m_mutex)> lock(m_mutex);
378 if (on_done_func_or_empty.empty())
380 return m_sync_io.end_sending();
384 if (!m_end_sending_on_done_func_or_empty.empty())
386 FLOW_LOG_WARNING(m_log_pfx <<
": async_end_sending(F) invoked; but we have a saved "
387 "completion handler -- which has not yet fired -- for it already, so it must be a dupe-call. "
396 m_end_sending_on_done_func_or_empty = std::move(on_done_func_or_empty);
399 const bool ok = m_sync_io.async_end_sending(&sync_err_code, [
this](
const Error_code& err_code)
403 on_sync_io_end_sending_done(err_code);
409 m_end_sending_on_done_func_or_empty.clear();
422 FLOW_LOG_INFO(m_log_pfx <<
": Sync-IO async-end-sending completed immediately. "
423 "Posting handler onto async-worker thread.");
425 on_sync_io_end_sending_done(sync_err_code);
431template<
typename Core_t>
434 using flow::util::Lock_guard;
439 assert(err_code != boost::asio::error::operation_aborted);
441 FLOW_LOG_TRACE(m_log_pfx <<
":: Earlier async-wait => event active => snd-mutex lock => "
442 "on-active-event-func => sync_io module => here (on-end-sending-done handler). Or else "
443 "snd-mutex lock => no async-wait needed => here... (ditto).");
450 m_worker.post([
this, err_code, on_done_func = std::move(m_end_sending_on_done_func_or_empty)]()
453 FLOW_LOG_TRACE(m_log_pfx <<
": Invoking on-end-sending-done handler.");
454 on_done_func(err_code);
455 FLOW_LOG_TRACE(
"Handler completed.");
458 m_end_sending_on_done_func_or_empty.clear();
461template<
typename Core_t>
464 using flow::util::Lock_guard;
468 Lock_guard<
decltype(m_mutex)> lock(m_mutex);
469 return m_sync_io.auto_ping(period);
Internal-use type that adapts a given PEER-state sync_io::Native_handle_sender or sync_io::Blob_sende...
~Async_adapter_sender()
To be invoked after ->stop()ping *worker (from ctor), as well as flushing any still-queued tasks in i...
void send_native_handle(Native_handle hndl_or_null, const util::Blob_const &meta_blob, Error_code *err_code)
See Native_handle_sender counterpart.
flow::async::Task_asio_err m_end_sending_on_done_func_or_empty
The on_done_func argument to async_end_sending(), possibly .empty() if originally user invoked end_se...
Core_t Core
The sync_io::X type being adapted into async-I/O-pattern X.
void on_sync_io_end_sending_done(const Error_code &err_code)
Handles the completion of m_sync_io.async_end_sending() operation whether synchronously or asynchrono...
bool async_end_sending(flow::async::Task_asio_err &&on_done_func_or_empty)
See Native_handle_sender counterpart; or leave on_done_func_or_empty.empty() for Native_handle_sender...
void send_blob(const util::Blob_const &blob, Error_code *err_code)
See Blob_sender counterpart.
flow::util::Mutex_non_recursive m_mutex
Protects m_end_sending_on_done_func_or_empty and, more importantly, send-ops data of m_sync_io.
flow::async::Single_thread_task_loop & m_worker
Single-thread worker pool for all internal async work. Referred to as thread W in comments.
Core & m_sync_io
The core Core engine, implementing the sync_io pattern (see util::sync_io doc header).
const std::string m_log_pfx
See log_pfx arg of ctor.
bool auto_ping(util::Fine_duration period)
See Native_handle_sender counterpart.
Async_adapter_sender(flow::log::Logger *logger_ptr, util::String_view log_pfx, flow::async::Single_thread_task_loop *worker, Core *sync_io)
Constructs the adapter around sync_io::X object *sync_io.
Useful if using the sync_io pattern within a user event loop built on boost.asio (optionally with flo...
@ S_SYNC_IO_WOULD_BLOCK
A sync_io operation could not immediately complete; it will complete contingent on active async-wait ...
@ S_OBJECT_SHUTDOWN_ABORTED_COMPLETION_HANDLER
Async completion handler is being called prematurely, because underlying object is shutting down,...
sync_io-pattern counterparts to async-I/O-pattern object types in parent namespace ipc::transport.
boost::shared_ptr< Task > Task_ptr
Short-hand for ref-counted pointer to a Function<> that takes no arguments and returns nothing; in pa...
flow::Fine_duration Fine_duration
Short-hand for Flow's Fine_duration.
boost::asio::const_buffer Blob_const
Short-hand for an immutable blob somewhere in memory, stored as exactly a void const * and a size_t.
flow::util::String_view String_view
Short-hand for Flow's String_view.
Log_component
The flow::log::Component payload enumeration containing various log components used by Flow-IPC inter...
flow::Error_code Error_code
Short-hand for flow::Error_code which is very common.
A monolayer-thin wrapper around a native handle, a/k/a descriptor a/k/a FD.