23#include <flow/log/log.hpp>
24#include <flow/async/single_thread_task_loop.hpp>
25#include <boost/move/make_unique.hpp>
83template<
typename Core_t>
85 public flow::log::Log_context,
86 private boost::noncopyable
112 flow::async::Single_thread_task_loop* worker,
Core* sync_io);
139 flow::async::Task_asio_err_sz&& on_done_func);
151 flow::async::Task_asio_err_sz&& on_done_func);
178 using Ptr = boost::movelib::unique_ptr<User_request>;
208 flow::async::Task_asio_err_sz&& on_done_func);
282 mutable flow::util::Mutex_non_recursive
m_mutex;
302template<
typename Core_t>
305 flow::async::Single_thread_task_loop* worker,
307 flow::log::Log_context(logger_ptr,
Log_component::S_TRANSPORT),
314 using flow::util::Lock_guard;
330 m_sync_io.start_receive_blob_ops([
this](Asio_waitable_native_handle* hndl_of_interest,
331 bool ev_of_interest_snd_else_rcv,
337 FLOW_LOG_TRACE(
m_log_pfx <<
": Sync-IO receive-ops event-wait request: "
338 "descriptor [" << hndl_of_interest->native_handle() <<
"], "
339 "writable-else-readable [" << ev_of_interest_snd_else_rcv <<
"].");
342 assert(hndl_of_interest);
343 hndl_of_interest->async_wait(ev_of_interest_snd_else_rcv
344 ? Asio_waitable_native_handle::Base::wait_write
345 : Asio_waitable_native_handle::Base::wait_read,
346 [
this, on_active_ev_func = std::move(on_active_ev_func)]
351 if (err_code == boost::asio::error::operation_aborted)
366 (*on_active_ev_func)();
375template<
typename Core_t>
378 using flow::async::Single_thread_task_loop;
379 using flow::util::ostream_op_string;
390 Single_thread_task_loop one_thread(get_logger(), ostream_op_string(m_log_pfx,
"-rcv-temp_deinit"));
391 one_thread.start([&]()
393 FLOW_LOG_TRACE(
"Running head slot async-receive completion handler.");
395 FLOW_LOG_TRACE(
"User receive handler finished.");
397 while (!m_pending_user_requests_q.empty())
399 FLOW_LOG_TRACE(
"Running a queued async-receive completion handler.");
400 m_pending_user_requests_q.front()
402 m_pending_user_requests_q.pop();
403 FLOW_LOG_TRACE(
"User receive handler finished. Popped from user request deficit queue.");
409 assert(m_pending_user_requests_q.empty());
413template<
typename Core_t>
416 flow::async::Task_asio_err_sz&& on_done_func)
418 assert(target_hndl &&
"Native_socket_stream::async_receive_native_handle() must take non-null Native_handle ptr.");
419 async_receive_native_handle_impl(target_hndl, target_meta_blob, std::move(on_done_func));
427template<
typename Core_t>
429 flow::async::Task_asio_err_sz&& on_done_func)
431 async_receive_native_handle_impl(
nullptr, target_blob, std::move(on_done_func));
434template<
typename Core_t>
437 flow::async::Task_asio_err_sz&& on_done_func)
439 using flow::util::Lock_guard;
440 using boost::movelib::make_unique;
447 Lock_guard<
decltype(m_mutex)> lock(m_mutex);
478 FLOW_LOG_TRACE(m_log_pfx <<
": Incoming user async-receive request for "
479 "possible native handle and meta-blob (located @ [" << target_meta_blob.data() <<
"] of "
480 "max size [" << target_meta_blob.size() <<
"]). In worker now? = [" << m_worker.in_thread() <<
"].");
482 auto new_user_request = make_unique<User_request>();
483 new_user_request->m_target_hndl_ptr = target_hndl_or_null;
484 new_user_request->m_target_meta_blob = target_meta_blob;
485 new_user_request->m_on_done_func = std::move(on_done_func);
489 m_pending_user_requests_q.emplace(std::move(new_user_request));
490 FLOW_LOG_TRACE(
"At least 1 async-receive request is already in progress. "
491 "After registering the new async-receive request: Head slot is non-empty; and "
492 "subsequently-pending deficit queue has size [" << m_pending_user_requests_q.size() <<
"]. "
493 "Will sync-IO-receive to handle this request once it reaches the front of that queue.");
498 FLOW_LOG_TRACE(
"No async-receive request is currently in progress. Starting sync-IO-receive chain to service the "
499 "new request and any further-queued requests that might appear in the meantime.");
500 m_user_request = std::move(new_user_request);
511 if (m_user_request->m_target_hndl_ptr)
513 if constexpr(Core::S_TRANSMIT_NATIVE_HANDLES)
518 m_sync_io.async_receive_native_handle(m_user_request->m_target_hndl_ptr, m_user_request->m_target_meta_blob,
519 &sync_err_code, &sync_sz,
523 on_sync_io_rcv_done(err_code, sz);
528 assert(
false &&
"This code should never be reached.");
536 m_sync_io.async_receive_blob(m_user_request->m_target_meta_blob, &sync_err_code, &sync_sz,
538 { on_sync_io_rcv_done(err_code, sz); });
541 assert(ok &&
"We are by definition in PEER state, and ctor starts receive-ops, and we never start a "
542 "sync_io async-receive before ensuring previous one has executed; so that should never "
543 "return false. Bug somewhere?");
554 process_msg_or_error(sync_err_code, sync_sz);
556 FLOW_LOG_TRACE(
"Message was immediately available; synchronously returned to user; handler posted onto "
557 "async worker thread. Done until next request.");
560template<
typename Core_t>
563 using flow::util::Lock_guard;
568 assert(err_code != boost::asio::error::operation_aborted);
571 FLOW_LOG_TRACE(m_log_pfx <<
": Earlier async-wait => event active => rcv-mutex lock => "
572 "on-active-event-func => sync_io module => on-rcv-done handler => here. "
573 "Or else: rcv-mutex lock => sync_io module => no async-wait needed => here.");
607 assert(m_user_request);
609 assert(!m_user_request);
611 queue<typename User_request::Ptr> ex_pending_user_requests_q_or_none;
614 FLOW_LOG_TRACE(
"Error emitted by sync-IO => all [" << m_pending_user_requests_q.size() <<
"] pending "
615 "handlers (may well be none) will be invoked with error in addition to the head handler.");
617 ex_pending_user_requests_q_or_none = std::move(m_pending_user_requests_q);
618 assert(m_pending_user_requests_q.empty());
620 else if (!m_pending_user_requests_q.empty())
622 FLOW_LOG_TRACE(
"Success emitted by sync-IO => lead handler will be invoked; next pending request will "
623 "be serviced by sync-IO; pending request count has become (after popping from queue into lead "
624 "slot): [" << (m_pending_user_requests_q.size() - 1) <<
"].\n");
626 m_user_request = std::move(m_pending_user_requests_q.front());
628 m_pending_user_requests_q.pop();
632 FLOW_LOG_TRACE(
"Success emitted by sync-IO => lead handler will be invoked; no pending requests queued up.\n");
636 m_worker.post([
this, err_code, sz,
639 ex_user_request = boost::shared_ptr<User_request>(std::move(ex_user_request)),
640 ex_pending_user_requests_q_or_none
641 = boost::make_shared<
decltype(ex_pending_user_requests_q_or_none)>
642 (std::move(ex_pending_user_requests_q_or_none))]()
646 assert(ex_user_request);
647 FLOW_LOG_TRACE(m_log_pfx <<
": Invoking head handler.");
648 (ex_user_request->m_on_done_func)(err_code, sz);
649 FLOW_LOG_TRACE(
"Handler completed.");
650 if (!ex_pending_user_requests_q_or_none->empty())
655 FLOW_LOG_TRACE(m_log_pfx <<
": Invoking [" << ex_pending_user_requests_q_or_none->size() <<
"] "
656 "pending handlers in one shot (due to error).");
657 while (!ex_pending_user_requests_q_or_none->empty())
659 ex_pending_user_requests_q_or_none->front()->m_on_done_func(err_code, 0);
660 ex_pending_user_requests_q_or_none->pop();
661 FLOW_LOG_TRACE(
"In-queue handler finished.");
667 assert(!ex_user_request);
668 assert(ex_pending_user_requests_q_or_none.empty());
671template<
typename Core_t>
674 using flow::util::Lock_guard;
679 assert(err_code != boost::asio::error::operation_aborted);
682 FLOW_LOG_TRACE(m_log_pfx <<
": Earlier async-wait => event active => rcv-mutex lock => "
683 "on-active-event-func => sync_io module => here (on-rcv-done handler).");
694 process_msg_or_error(err_code, sz);
707 if (m_user_request->m_target_hndl_ptr)
709 if constexpr(Core::S_TRANSMIT_NATIVE_HANDLES)
714 m_sync_io.async_receive_native_handle(m_user_request->m_target_hndl_ptr, m_user_request->m_target_meta_blob,
715 &sync_err_code, &sync_sz,
718 on_sync_io_rcv_done(err_code, sz);
723 assert(
false &&
"This code should never be reached.");
731 m_sync_io.async_receive_blob(m_user_request->m_target_meta_blob, &sync_err_code, &sync_sz,
733 { on_sync_io_rcv_done(err_code, sz); });
735 assert(ok &&
"We are by definition in PEER state, and ctor starts receive-ops, and we never start a "
736 "sync_io async-receive before ensuring previous one has executed; so that should never "
737 "return false. Bug somewhere?");
748 process_msg_or_error(sync_err_code, sync_sz);
757template<
typename Core_t>
760 using flow::util::Lock_guard;
764 Lock_guard<
decltype(m_mutex)> lock(m_mutex);
765 return m_sync_io.idle_timer_run(timeout);
Internal-use type that adapts a given PEER-state sync_io::Native_handle_receiver or sync_io::Blob_rec...
flow::async::Single_thread_task_loop & m_worker
Single-thread worker pool for all internal async work. Referred to as thread W in comments.
flow::util::Mutex_non_recursive m_mutex
Protects m_user_request, m_pending_user_requests_q, and receive-ops data of m_sync_io.
std::queue< typename User_request::Ptr > m_pending_user_requests_q
Queue storing deficit async-receive requests queued up due to m_user_request being not null while mor...
bool idle_timer_run(util::Fine_duration timeout)
See Native_handle_receiver counterpart.
Async_adapter_receiver(flow::log::Logger *logger_ptr, util::String_view log_pfx, flow::async::Single_thread_task_loop *worker, Core *sync_io)
Constructs the adapter around sync_io::X object *sync_io.
const std::string m_log_pfx
See log_pfx arg of ctor.
void async_receive_native_handle_impl(Native_handle *target_hndl_or_null, const util::Blob_mutable &target_meta_blob, flow::async::Task_asio_err_sz &&on_done_func)
Body of async_receive_native_handle() and async_receive_blob(); with target_hndl null if and only if ...
User_request::Ptr m_user_request
The head slot containing the currently-being-serviced "deficit" async-receive request,...
~Async_adapter_receiver()
To be invoked after ->stop()ping *worker (from ctor), as well as flushing any still-queued tasks in i...
void on_sync_io_rcv_done(const Error_code &err_code, size_t sz)
Invoked via active-event API, handles the async completion of m_sync_io.async_receive_*() operation.
void async_receive_native_handle(Native_handle *target_hndl, const util::Blob_mutable &target_meta_blob, flow::async::Task_asio_err_sz &&on_done_func)
See Native_handle_sender counterpart.
void async_receive_blob(const util::Blob_mutable &target_blob, flow::async::Task_asio_err_sz &&on_done_func)
See Blob_receiver counterpart.
Core_t Core
The sync_io::X type being adapted into async-I/O-pattern X.
Core & m_sync_io
The core Core engine, implementing the sync_io pattern (see util::sync_io doc header).
void process_msg_or_error(const Error_code &err_code, size_t sz)
Invoked from thread U/W (async_receive_native_handle_impl()) or thread W (active-event API),...
Useful if using the sync_io pattern within a user event loop built on boost.asio (optionally with flo...
@ S_SYNC_IO_WOULD_BLOCK
A sync_io operation could not immediately complete; it will complete contingent on active async-wait ...
@ S_OBJECT_SHUTDOWN_ABORTED_COMPLETION_HANDLER
Async completion handler is being called prematurely, because underlying object is shutting down,...
sync_io-pattern counterparts to async-I/O-pattern object types in parent namespace ipc::transport.
boost::shared_ptr< Task > Task_ptr
Short-hand for ref-counted pointer to a Function<> that takes no arguments and returns nothing; in pa...
boost::asio::mutable_buffer Blob_mutable
Short-hand for an mutable blob somewhere in memory, stored as exactly a void* and a size_t.
flow::Fine_duration Fine_duration
Short-hand for Flow's Fine_duration.
flow::util::String_view String_view
Short-hand for Flow's String_view.
Log_component
The flow::log::Component payload enumeration containing various log components used by Flow-IPC inter...
flow::Error_code Error_code
Short-hand for flow::Error_code which is very common.
Data store representing a deficit user async-receive request: either one being currently handled by m...
Native_handle * m_target_hndl_ptr
See async_receive_native_handle() target_hndl. Null for async_receive_blob().
util::Blob_mutable m_target_meta_blob
See async_receive_native_handle() target_meta_blob. Or see async_receive_blob() target_blob.
boost::movelib::unique_ptr< User_request > Ptr
Short-hand for unique_ptr to this.
flow::async::Task_asio_err_sz m_on_done_func
See async_receive_native_handle() or async_receive_blob() on_done_func.
A monolayer-thin wrapper around a native handle, a/k/a descriptor a/k/a FD.