23#include <flow/error/error.hpp>
24#include <flow/common.hpp>
25#include <boost/move/make_unique.hpp>
40 flow::log::Log_context(logger_ptr,
Log_component::S_TRANSPORT),
41 m_absolute_name(absolute_name_arg),
42 m_worker(get_logger(), flow::util::ostream_op_string(*this)),
43 m_next_peer_socket(*(m_worker.task_engine()))
49 using flow::error::Runtime_error;
50 using boost::system::system_error;
58 FLOW_LOG_TRACE(
"Acceptor [" << *
this <<
"]: Awaiting initial setup/listening in worker thread.");
61 auto const asio_engine =
m_worker.task_engine();
63 FLOW_LOG_INFO(
"Acceptor [" << *
this <<
"]: Starting (am in worker thread).");
66 assert((local_endpoint ==
Endpoint()) ==
bool(sys_err_code));
81 catch (
const system_error& exc)
84 FLOW_LOG_WARNING(
"Acceptor [" << *
this <<
"]: Unable to open/bind/listen native local stream socket; could "
85 "be due to address/name clash; details logged below.");
86 sys_err_code = exc.code();
87 FLOW_ERROR_SYS_ERROR_LOG_WARNING();
91 FLOW_LOG_INFO(
"Acceptor [" << *
this <<
"]: "
92 "Successfully made endpoint and open/bind/listen-ed on it. Ready for connections.");
106 assert(!sys_err_code);
119 *err_code = sys_err_code;
123 throw Runtime_error(sys_err_code, FLOW_UTIL_WHERE_AM_I_STR());
126 assert(!sys_err_code);
128 FLOW_LOG_INFO(
"Acceptor [" << *
this <<
"]: Ready for incoming connections.");
133 using flow::async::Single_thread_task_loop;
137 FLOW_LOG_INFO(
"Acceptor [" << *
this <<
"]: Shutting down. Next acceptor socket will close; all our internal "
138 "async handlers will be canceled; and worker thread thread will be joined.");
146 FLOW_LOG_INFO(
"Acceptor [" << *
this <<
"]: Continuing shutdown. Next we will run pending handlers from some "
147 "other thread. In this user thread we will await those handlers' completion and then return.");
148 Single_thread_task_loop one_thread(get_logger(),
"temp_deinit");
149 one_thread.start([&]()
151 FLOW_LOG_INFO(
"Acceptor [" << *
this <<
"]: "
152 "In transient finisher thread: Shall run all pending internal handlers (typically none).");
154 const auto task_engine =
m_worker.task_engine();
155 task_engine->restart();
156 const auto count = task_engine->poll();
159 FLOW_LOG_INFO(
"Acceptor [" << *
this <<
"]: "
160 "In transient finisher thread: Ran [" << count <<
"] internal handlers after all.");
164 FLOW_LOG_INFO(
"Acceptor [" << *
this <<
"]: "
165 "In transient finisher thread: Shall run all pending user handlers (feeding operation-aborted).");
169 FLOW_LOG_TRACE(
"Running a queued async-accept completion handler.");
173 FLOW_LOG_TRACE(
"User accept handler finished. Popped from user request deficit queue.");
176 FLOW_LOG_INFO(
"Transient finisher exiting.");
184 using flow::util::ostream_op_string;
185 using std::holds_alternative;
188 if (sys_err_code == boost::asio::error::operation_aborted)
193 assert(sys_err_code != boost::asio::error::would_block);
195 FLOW_LOG_TRACE(
"Acceptor [" << *
this <<
"]: Incoming connection, or error when trying to accept one.");
202 if (sys_err_code == boost::asio::error::connection_aborted)
204 FLOW_LOG_WARNING(
"Incoming connection aborted halfway during connection; this is quite weird but "
205 "should not be fatal. Ignoring. Still listening.");
210 FLOW_LOG_WARNING(
"Acceptor [" << *
this <<
"]: The background accept failed fatally. "
211 "Closing acceptor; no longer listening. Details follow.");
212 FLOW_ERROR_SYS_ERROR_LOG_WARNING();
265 static_assert(
false,
"Should not have gotten to this line; should have required Linux; "
266 "the next thing assumes not-Win-<8.1.");
271 FLOW_LOG_TRACE(
"Acceptor [" << *
this <<
"]: "
272 "Ejected ownership of new incoming peer socket [" << native_peer_socket <<
"].");
275 = boost::movelib::make_unique<Peer>
279 std::move(native_peer_socket));
281 assert(native_peer_socket.
null());
301 FLOW_LOG_TRACE(
"Acceptor [" << *
this <<
"]: Starting the next background accept.");
312 using boost::movelib::make_unique;
314 using std::holds_alternative;
327 m_worker.post([
this, target_peer, on_done_func = std::move(on_done_func)]
332 FLOW_LOG_TRACE(
"Acceptor [" << *
this <<
"]: Handling async-accept request.");
334 auto new_req = make_unique<User_request>();
335 new_req->m_target_peer = target_peer;
336 new_req->m_on_done_func = std::move(on_done_func);
344 FLOW_LOG_TRACE(
"Acceptor [" << *
this <<
"]: New async-accept request pushed onto deficit queue; "
345 "but there is no surplus (no pending results). Will await results.");
355 if (holds_alternative<Error_code>(peer_or_err_code))
358 FLOW_LOG_TRACE(
"Acceptor [" << *
this <<
"]: New async-request pushed onto deficit queue; "
359 "and there is surplus in the form of a fatal error code. Will feed error to the request "
360 "*without* popping it from surplus queue (size remains 1).");
365 assert(holds_alternative<Peer_ptr>(peer_or_err_code));
367 FLOW_LOG_TRACE(
"Acceptor [" << *
this <<
"]: New async-request pushed onto deficit queue; "
368 "and there is surplus in the form of a new peer handle. Will feed handle to the request. "
371 Peer_ptr peer(std::move(get<Peer_ptr>(peer_or_err_code)));
381 using std::holds_alternative;
389 FLOW_LOG_TRACE(
"Acceptor [" << *
this <<
"]: Fatal error pushed onto surplus queue; "
390 "but there is no deficit (no pending requests). Will await async-accept request(s).");
400 FLOW_LOG_TRACE(
"Acceptor [" << *
this <<
"]: Fatal error pushed onto surplus queue; "
401 "and there is deficit (1+ pending requests). Will feed error to all pending requests *without* "
402 "popping surplus queue, whose size remains 1.");
409 using std::holds_alternative;
417 FLOW_LOG_TRACE(
"Acceptor [" << *
this <<
"]: New peer socket handle pushed onto surplus queue; "
418 "but there is no deficit (no pending requests). Will await async-accept request(s).");
429 FLOW_LOG_TRACE(
"Acceptor [" << *
this <<
"]: New peer socket handle pushed onto surplus queue; "
430 "and there is deficit (1+ pending requests). Will feed to next pending request, having "
431 "popped it from surplus queue (size is now 0).");
442 FLOW_LOG_TRACE(
"Acceptor [" << *
this <<
"]: Feeding to user async-accept request handler [" << idx <<
"]: "
443 "Error code [" << err_code <<
"] [" << err_code.message() <<
"].");
456 FLOW_LOG_TRACE(
"Acceptor [" << *
this <<
"]: Feeding to user async-accept request handler: "
457 "Socket stream [" << *peer <<
"]. User request queue size post-pop is "
460 *head_request->m_target_peer = std::move(*peer);
472 return os <<
"sh_name[" << val.
absolute_name() <<
"]@" <<
static_cast<const void*
>(&val);
A server object that binds to a Shared_name and listens for incoming Native_socket_stream connect att...
static const Shared_name & S_RESOURCE_TYPE_ID
Shared_name relative-folder fragment (no separators) identifying this resource type.
void async_accept_impl(Peer *target_peer, On_peer_accepted_func &&on_done_func)
Non-template impl of async_accept().
flow::async::Single_thread_task_loop m_worker
A single-threaded async task loop that starts in constructor and ends in destructor.
const Shared_name m_absolute_name
See absolute_name().
boost::movelib::unique_ptr< Peer > Peer_ptr
Short-hand for internally stored PEER-state sync_io::Native_socket_stream in m_pending_results_q.
std::queue< User_request::Ptr > m_pending_user_requests_q
Queue storing deficit async-accept requests queued up due to lacking pending ready peer socket handle...
void finalize_q_surplus_on_error()
In thread W, in steady state except for an Error_code just pushed to the back of m_pending_results_q ...
~Native_socket_stream_acceptor()
Destroys this acceptor which will stop listening in the background and cancel any pending completion ...
Native_socket_stream_acceptor(flow::log::Logger *logger_ptr, const Shared_name &absolute_name, Error_code *err_code=0)
Creates the Native_socket_stream_acceptor and immediately begins listening in the background,...
asio_local_stream_socket::Peer_socket m_next_peer_socket
Unix domain peer socket, always empty/unconnected while a background m_acceptor.async_accept() is pro...
void feed_error_result_to_deficit(const Error_code &err_code)
In thread W, gets back to steady state by feeding the given Error_code (which must be the sole elemen...
boost::movelib::unique_ptr< asio_local_stream_socket::Acceptor > m_acceptor
Unix domain socket acceptor.
flow::async::Task_asio_err On_peer_accepted_func
Short-hand for callback called on new peer-to-peer connection; or on unrecoverable error.
void finalize_q_surplus_on_success()
In thread W, in steady state, introduces the just-established peer socket handle into the state machi...
std::queue< std::variant< Peer_ptr, Error_code > > m_pending_results_q
Queue storing surplus finalized async-accept results queued up due to lacking async_accept() requests...
const Shared_name & absolute_name() const
Returns the full name/address to which the constructor bound, or attempted to bind,...
void feed_success_result_to_deficit(Peer_ptr &&peer)
In thread W, gets back to steady state by feeding the given just-connected peer socket (which must ha...
void on_next_peer_socket_or_error(const Error_code &sys_err_code)
Handler for incoming connection on m_acceptor.
Implements both sync_io::Native_handle_sender and sync_io::Native_handle_receiver concepts by using a...
String-wrapping abstraction representing a name uniquely distinguishing a kernel-persistent entity fr...
const std::string & str() const
Returns (sans copying) ref to immutable entire wrapped name string, suitable to pass into sys calls w...
Protocol::endpoint Endpoint
Short-hand for boost.asio Unix domain peer stream-socket endpoint.
Protocol::acceptor Acceptor
Short-hand for boost.asio Unix domain stream-socket acceptor (listening guy) socket.
Protocol::socket Peer_socket
Short-hand for boost.asio Unix domain peer stream-socket (usually-connected-or-empty guy).
Endpoint endpoint_at_shared_name(flow::log::Logger *logger_ptr, const Shared_name &absolute_name, Error_code *err_code)
Returns an Endpoint corresponding to the given absolute Shared_name, so that an Acceptor or Peer_sock...
@ S_OBJECT_SHUTDOWN_ABORTED_COMPLETION_HANDLER
Async completion handler is being called prematurely, because underlying object is shutting down,...
Flow-IPC module providing transmission of structured messages and/or low-level blobs (and more) betwe...
util::Shared_name Shared_name
Convenience alias for the commonly used type util::Shared_name.
std::ostream & operator<<(std::ostream &os, const Bipc_mq_handle &val)
Prints string representation of the given Bipc_mq_handle to the given ostream.
flow::util::String_view String_view
Short-hand for Flow's String_view.
Log_component
The flow::log::Component payload enumeration containing various log components used by Flow-IPC inter...
flow::Error_code Error_code
Short-hand for flow::Error_code which is very common.
A monolayer-thin wrapper around a native handle, a/k/a descriptor a/k/a FD.
bool null() const
Returns true if and only if m_native_handle equals S_NULL_HANDLE.