23#include <flow/error/error.hpp>
24#include <flow/common.hpp>
25#include <boost/move/make_unique.hpp>
40 flow::log::Log_context(logger_ptr,
Log_component::S_TRANSPORT),
41 m_absolute_name(absolute_name_arg),
42 m_worker(get_logger(), flow::util::ostream_op_string(*this)),
43 m_next_peer_socket(*(m_worker.task_engine()))
49 using flow::error::Runtime_error;
50 using boost::system::system_error;
58 FLOW_LOG_TRACE(
"Acceptor [" << *
this <<
"]: Awaiting initial setup/listening in worker thread.");
61 auto const asio_engine =
m_worker.task_engine();
63 FLOW_LOG_INFO(
"Acceptor [" << *
this <<
"]: Starting (am in worker thread).");
66 assert((local_endpoint ==
Endpoint()) ==
bool(sys_err_code));
81 catch (
const system_error& exc)
84 FLOW_LOG_WARNING(
"Acceptor [" << *
this <<
"]: Unable to open/bind/listen native local stream socket; could "
85 "be due to address/name clash; details logged below.");
86 sys_err_code = exc.code();
87 FLOW_ERROR_SYS_ERROR_LOG_WARNING();
91 FLOW_LOG_INFO(
"Acceptor [" << *
this <<
"]: "
92 "Successfully made endpoint and open/bind/listen-ed on it. Ready for connections.");
106 assert(!sys_err_code);
119 *err_code = sys_err_code;
123 throw Runtime_error(sys_err_code, FLOW_UTIL_WHERE_AM_I_STR());
126 assert(!sys_err_code);
128 FLOW_LOG_INFO(
"Acceptor [" << *
this <<
"]: Ready for incoming connections.");
133 using flow::async::Single_thread_task_loop;
137 FLOW_LOG_INFO(
"Acceptor [" << *
this <<
"]: Shutting down. Next acceptor socket will close; all our internal "
138 "async handlers will be canceled; and worker thread thread will be joined.");
146 FLOW_LOG_INFO(
"Acceptor [" << *
this <<
"]: Continuing shutdown. Next we will run pending handlers from some "
147 "other thread. In this user thread we will await those handlers' completion and then return.");
148 Single_thread_task_loop one_thread(get_logger(),
"temp_deinit");
149 one_thread.start([&]()
151 FLOW_LOG_INFO(
"Acceptor [" << *
this <<
"]: "
152 "In transient finisher thread: Shall run all pending internal handlers (typically none).");
154 const auto task_engine =
m_worker.task_engine();
155 task_engine->restart();
156 const auto count = task_engine->poll();
159 FLOW_LOG_INFO(
"Acceptor [" << *
this <<
"]: "
160 "In transient finisher thread: Ran [" << count <<
"] internal handlers after all.");
164 FLOW_LOG_INFO(
"Acceptor [" << *
this <<
"]: "
165 "In transient finisher thread: Shall run all pending user handlers (feeding operation-aborted).");
169 FLOW_LOG_TRACE(
"Running a queued async-accept completion handler.");
173 FLOW_LOG_TRACE(
"User accept handler finished. Popped from user request deficit queue.");
176 FLOW_LOG_INFO(
"Transient finisher exiting.");
184 using flow::util::ostream_op_string;
185 using std::holds_alternative;
188 if (sys_err_code == boost::asio::error::operation_aborted)
193 assert(sys_err_code != boost::asio::error::would_block);
195 FLOW_LOG_TRACE(
"Acceptor [" << *
this <<
"]: Incoming connection, or error when trying to accept one.");
202 if (sys_err_code == boost::asio::error::connection_aborted)
204 FLOW_LOG_WARNING(
"Incoming connection aborted halfway during connection; this is quite weird but "
205 "should not be fatal. Ignoring. Still listening.");
210 FLOW_LOG_WARNING(
"Acceptor [" << *
this <<
"]: The background accept failed fatally. "
211 "Closing acceptor; no longer listening. Details follow.");
212 FLOW_ERROR_SYS_ERROR_LOG_WARNING();
265# error "Should not have gotten to this line; should have required Linux; the next thing assumes not-Win-<8.1."
270 FLOW_LOG_TRACE(
"Acceptor [" << *
this <<
"]: "
271 "Ejected ownership of new incoming peer socket [" << native_peer_socket <<
"].");
274 = boost::movelib::make_unique<Peer>
278 std::move(native_peer_socket));
280 assert(native_peer_socket.
null());
300 FLOW_LOG_TRACE(
"Acceptor [" << *
this <<
"]: Starting the next background accept.");
311 using boost::movelib::make_unique;
313 using std::holds_alternative;
326 m_worker.post([
this, target_peer, on_done_func = std::move(on_done_func)]
331 FLOW_LOG_TRACE(
"Acceptor [" << *
this <<
"]: Handling async-accept request.");
333 auto new_req = make_unique<User_request>();
334 new_req->m_target_peer = target_peer;
335 new_req->m_on_done_func = std::move(on_done_func);
343 FLOW_LOG_TRACE(
"Acceptor [" << *
this <<
"]: New async-accept request pushed onto deficit queue; "
344 "but there is no surplus (no pending results). Will await results.");
354 if (holds_alternative<Error_code>(peer_or_err_code))
357 FLOW_LOG_TRACE(
"Acceptor [" << *
this <<
"]: New async-request pushed onto deficit queue; "
358 "and there is surplus in the form of a fatal error code. Will feed error to the request "
359 "*without* popping it from surplus queue (size remains 1).");
364 assert(holds_alternative<Peer_ptr>(peer_or_err_code));
366 FLOW_LOG_TRACE(
"Acceptor [" << *
this <<
"]: New async-request pushed onto deficit queue; "
367 "and there is surplus in the form of a new peer handle. Will feed handle to the request. "
370 Peer_ptr peer(std::move(get<Peer_ptr>(peer_or_err_code)));
380 using std::holds_alternative;
388 FLOW_LOG_TRACE(
"Acceptor [" << *
this <<
"]: Fatal error pushed onto surplus queue; "
389 "but there is no deficit (no pending requests). Will await async-accept request(s).");
399 FLOW_LOG_TRACE(
"Acceptor [" << *
this <<
"]: Fatal error pushed onto surplus queue; "
400 "and there is deficit (1+ pending requests). Will feed error to all pending requests *without* "
401 "popping surplus queue, whose size remains 1.");
408 using std::holds_alternative;
416 FLOW_LOG_TRACE(
"Acceptor [" << *
this <<
"]: New peer socket handle pushed onto surplus queue; "
417 "but there is no deficit (no pending requests). Will await async-accept request(s).");
428 FLOW_LOG_TRACE(
"Acceptor [" << *
this <<
"]: New peer socket handle pushed onto surplus queue; "
429 "and there is deficit (1+ pending requests). Will feed to next pending request, having "
430 "popped it from surplus queue (size is now 0).");
441 FLOW_LOG_TRACE(
"Acceptor [" << *
this <<
"]: Feeding to user async-accept request handler [" << idx <<
"]: "
442 "Error code [" << err_code <<
"] [" << err_code.message() <<
"].");
455 FLOW_LOG_TRACE(
"Acceptor [" << *
this <<
"]: Feeding to user async-accept request handler: "
456 "Socket stream [" << *peer <<
"]. User request queue size post-pop is "
459 *head_request->m_target_peer = std::move(*peer);
471 return os <<
"sh_name[" << val.
absolute_name() <<
"]@" <<
static_cast<const void*
>(&val);
A server object that binds to a Shared_name and listens for incoming Native_socket_stream connect att...
static const Shared_name & S_RESOURCE_TYPE_ID
Shared_name relative-folder fragment (no separators) identifying this resource type.
void async_accept_impl(Peer *target_peer, On_peer_accepted_func &&on_done_func)
Non-template impl of async_accept().
flow::async::Single_thread_task_loop m_worker
A single-threaded async task loop that starts in constructor and ends in destructor.
const Shared_name m_absolute_name
See absolute_name().
boost::movelib::unique_ptr< Peer > Peer_ptr
Short-hand for internally stored PEER-state sync_io::Native_socket_stream in m_pending_results_q.
std::queue< User_request::Ptr > m_pending_user_requests_q
Queue storing deficit async-accept requests queued up due to lacking pending ready peer socket handle...
void finalize_q_surplus_on_error()
In thread W, in steady state except for an Error_code just pushed to the back of m_pending_results_q ...
~Native_socket_stream_acceptor()
Destroys this acceptor which will stop listening in the background and cancel any pending completion ...
Native_socket_stream_acceptor(flow::log::Logger *logger_ptr, const Shared_name &absolute_name, Error_code *err_code=0)
Creates the Native_socket_stream_acceptor and immediately begins listening in the background,...
asio_local_stream_socket::Peer_socket m_next_peer_socket
Unix domain peer socket, always empty/unconnected while a background m_acceptor.async_accept() is pro...
void feed_error_result_to_deficit(const Error_code &err_code)
In thread W, gets back to steady state by feeding the given Error_code (which must be the sole elemen...
boost::movelib::unique_ptr< asio_local_stream_socket::Acceptor > m_acceptor
Unix domain socket acceptor.
flow::async::Task_asio_err On_peer_accepted_func
Short-hand for callback called on new peer-to-peer connection; or on unrecoverable error.
void finalize_q_surplus_on_success()
In thread W, in steady state, introduces the just-established peer socket handle into the state machi...
std::queue< std::variant< Peer_ptr, Error_code > > m_pending_results_q
Queue storing surplus finalized async-accept results queued up due to lacking async_accept() requests...
const Shared_name & absolute_name() const
Returns the full name/address to which the constructor bound, or attempted to bind,...
void feed_success_result_to_deficit(Peer_ptr &&peer)
In thread W, gets back to steady state by feeding the given just-connected peer socket (which must ha...
void on_next_peer_socket_or_error(const Error_code &sys_err_code)
Handler for incoming connection on m_acceptor.
Implements both sync_io::Native_handle_sender and sync_io::Native_handle_receiver concepts by using a...
String-wrapping abstraction representing a name uniquely distinguishing a kernel-persistent entity fr...
const std::string & str() const
Returns (sans copying) ref to immutable entire wrapped name string, suitable to pass into sys calls w...
Protocol::endpoint Endpoint
Short-hand for boost.asio Unix domain peer stream-socket endpoint.
Protocol::acceptor Acceptor
Short-hand for boost.asio Unix domain stream-socket acceptor (listening guy) socket.
Protocol::socket Peer_socket
Short-hand for boost.asio Unix domain peer stream-socket (usually-connected-or-empty guy).
Endpoint endpoint_at_shared_name(flow::log::Logger *logger_ptr, const Shared_name &absolute_name, Error_code *err_code)
Returns an Endpoint corresponding to the given absolute Shared_name, so that an Acceptor or Peer_sock...
@ S_OBJECT_SHUTDOWN_ABORTED_COMPLETION_HANDLER
Async completion handler is being called prematurely, because underlying object is shutting down,...
Flow-IPC module providing transmission of structured messages and/or low-level blobs (and more) betwe...
util::Shared_name Shared_name
Convenience alias for the commonly used type util::Shared_name.
std::ostream & operator<<(std::ostream &os, const Bipc_mq_handle &val)
Prints string representation of the given Bipc_mq_handle to the given ostream.
flow::util::String_view String_view
Short-hand for Flow's String_view.
Log_component
The flow::log::Component payload enumeration containing various log components used by Flow-IPC inter...
flow::Error_code Error_code
Short-hand for flow::Error_code which is very common.
A monolayer-thin wrapper around a native handle, a/k/a descriptor a/k/a FD.
bool null() const
Returns true if and only if m_native_handle equals S_NULL_HANDLE.