23#include <flow/error/error.hpp>
24#include <flow/common.hpp>
25#include <boost/move/make_unique.hpp>
40 flow::log::Log_context(logger_ptr,
Log_component::S_TRANSPORT),
41 m_absolute_name(absolute_name_arg),
42 m_worker(get_logger(),
45 flow::util::ostream_op_string(
"NSSA-", m_absolute_name)),
46 m_next_peer_socket(*(m_worker.task_engine()))
52 using flow::error::Runtime_error;
53 using flow::async::reset_thread_pinning;
54 using boost::system::system_error;
62 FLOW_LOG_TRACE(
"Acceptor [" << *
this <<
"]: Awaiting initial setup/listening in worker thread.");
65 reset_thread_pinning(get_logger());
67 auto const asio_engine =
m_worker.task_engine();
69 FLOW_LOG_INFO(
"Acceptor [" << *
this <<
"]: Starting (am in worker thread).");
72 assert((local_endpoint ==
Endpoint()) ==
bool(sys_err_code));
87 catch (
const system_error& exc)
90 FLOW_LOG_WARNING(
"Acceptor [" << *
this <<
"]: Unable to open/bind/listen native local stream socket; could "
91 "be due to address/name clash; details logged below.");
92 sys_err_code = exc.code();
93 FLOW_ERROR_SYS_ERROR_LOG_WARNING();
97 FLOW_LOG_INFO(
"Acceptor [" << *
this <<
"]: "
98 "Successfully made endpoint and open/bind/listen-ed on it. Ready for connections.");
112 assert(!sys_err_code);
125 *err_code = sys_err_code;
129 throw Runtime_error(sys_err_code, FLOW_UTIL_WHERE_AM_I_STR());
132 assert(!sys_err_code);
134 FLOW_LOG_INFO(
"Acceptor [" << *
this <<
"]: Ready for incoming connections.");
139 using flow::async::Single_thread_task_loop;
140 using flow::async::reset_thread_pinning;
141 using flow::util::ostream_op_string;
145 FLOW_LOG_INFO(
"Acceptor [" << *
this <<
"]: Shutting down. Next acceptor socket will close; all our internal "
146 "async handlers will be canceled; and worker thread thread will be joined.");
154 FLOW_LOG_INFO(
"Acceptor [" << *
this <<
"]: Continuing shutdown. Next we will run pending handlers from some "
155 "other thread. In this user thread we will await those handlers' completion and then return.");
156 Single_thread_task_loop one_thread(get_logger(), ostream_op_string(
"NSSADeinit-",
m_absolute_name));
158 one_thread.start([&]()
160 reset_thread_pinning(get_logger());
162 FLOW_LOG_INFO(
"Acceptor [" << *
this <<
"]: "
163 "In transient finisher thread: Shall run all pending internal handlers (typically none).");
165 const auto task_engine =
m_worker.task_engine();
166 task_engine->restart();
167 const auto count = task_engine->poll();
170 FLOW_LOG_INFO(
"Acceptor [" << *
this <<
"]: "
171 "In transient finisher thread: Ran [" << count <<
"] internal handlers after all.");
175 FLOW_LOG_INFO(
"Acceptor [" << *
this <<
"]: "
176 "In transient finisher thread: Shall run all pending user handlers (feeding operation-aborted).");
180 FLOW_LOG_TRACE(
"Running a queued async-accept completion handler.");
184 FLOW_LOG_TRACE(
"User accept handler finished. Popped from user request deficit queue.");
187 FLOW_LOG_INFO(
"Transient finisher exiting.");
195 using flow::util::ostream_op_string;
196 using std::holds_alternative;
199 if (sys_err_code == boost::asio::error::operation_aborted)
204 assert(sys_err_code != boost::asio::error::would_block);
206 FLOW_LOG_TRACE(
"Acceptor [" << *
this <<
"]: Incoming connection, or error when trying to accept one.");
213 if (sys_err_code == boost::asio::error::connection_aborted)
215 FLOW_LOG_WARNING(
"Incoming connection aborted halfway during connection; this is quite weird but "
216 "should not be fatal. Ignoring. Still listening.");
221 FLOW_LOG_WARNING(
"Acceptor [" << *
this <<
"]: The background accept failed fatally. "
222 "Closing acceptor; no longer listening. Details follow.");
223 FLOW_ERROR_SYS_ERROR_LOG_WARNING();
276 static_assert(
false,
"Should not have gotten to this line; should have required Linux; "
277 "the next thing assumes not-Win-<8.1.");
282 FLOW_LOG_TRACE(
"Acceptor [" << *
this <<
"]: "
283 "Ejected ownership of new incoming peer socket [" << native_peer_socket <<
"].");
286 = boost::movelib::make_unique<Peer>
290 std::move(native_peer_socket));
292 assert(native_peer_socket.
null());
312 FLOW_LOG_TRACE(
"Acceptor [" << *
this <<
"]: Starting the next background accept.");
323 using boost::movelib::make_unique;
325 using std::holds_alternative;
338 m_worker.post([
this, target_peer, on_done_func = std::move(on_done_func)]
343 FLOW_LOG_TRACE(
"Acceptor [" << *
this <<
"]: Handling async-accept request.");
345 auto new_req = make_unique<User_request>();
346 new_req->m_target_peer = target_peer;
347 new_req->m_on_done_func = std::move(on_done_func);
355 FLOW_LOG_TRACE(
"Acceptor [" << *
this <<
"]: New async-accept request pushed onto deficit queue; "
356 "but there is no surplus (no pending results). Will await results.");
366 if (holds_alternative<Error_code>(peer_or_err_code))
369 FLOW_LOG_TRACE(
"Acceptor [" << *
this <<
"]: New async-request pushed onto deficit queue; "
370 "and there is surplus in the form of a fatal error code. Will feed error to the request "
371 "*without* popping it from surplus queue (size remains 1).");
376 assert(holds_alternative<Peer_ptr>(peer_or_err_code));
378 FLOW_LOG_TRACE(
"Acceptor [" << *
this <<
"]: New async-request pushed onto deficit queue; "
379 "and there is surplus in the form of a new peer handle. Will feed handle to the request. "
382 Peer_ptr peer(std::move(get<Peer_ptr>(peer_or_err_code)));
392 using std::holds_alternative;
400 FLOW_LOG_TRACE(
"Acceptor [" << *
this <<
"]: Fatal error pushed onto surplus queue; "
401 "but there is no deficit (no pending requests). Will await async-accept request(s).");
411 FLOW_LOG_TRACE(
"Acceptor [" << *
this <<
"]: Fatal error pushed onto surplus queue; "
412 "and there is deficit (1+ pending requests). Will feed error to all pending requests *without* "
413 "popping surplus queue, whose size remains 1.");
420 using std::holds_alternative;
428 FLOW_LOG_TRACE(
"Acceptor [" << *
this <<
"]: New peer socket handle pushed onto surplus queue; "
429 "but there is no deficit (no pending requests). Will await async-accept request(s).");
440 FLOW_LOG_TRACE(
"Acceptor [" << *
this <<
"]: New peer socket handle pushed onto surplus queue; "
441 "and there is deficit (1+ pending requests). Will feed to next pending request, having "
442 "popped it from surplus queue (size is now 0).");
453 FLOW_LOG_TRACE(
"Acceptor [" << *
this <<
"]: Feeding to user async-accept request handler [" << idx <<
"]: "
454 "Error code [" << err_code <<
"] [" << err_code.message() <<
"].");
467 FLOW_LOG_TRACE(
"Acceptor [" << *
this <<
"]: Feeding to user async-accept request handler: "
468 "Socket stream [" << *peer <<
"]. User request queue size post-pop is "
471 *head_request->m_target_peer = std::move(*peer);
483 return os <<
"sh_name[" << val.
absolute_name() <<
"]@" <<
static_cast<const void*
>(&val);
A server object that binds to a Shared_name and listens for incoming Native_socket_stream connect att...
static const Shared_name & S_RESOURCE_TYPE_ID
Shared_name relative-folder fragment (no separators) identifying this resource type.
void async_accept_impl(Peer *target_peer, On_peer_accepted_func &&on_done_func)
Non-template impl of async_accept().
flow::async::Single_thread_task_loop m_worker
A single-threaded async task loop that starts in constructor and ends in destructor.
const Shared_name m_absolute_name
See absolute_name().
boost::movelib::unique_ptr< Peer > Peer_ptr
Short-hand for internally stored PEER-state sync_io::Native_socket_stream in m_pending_results_q.
std::queue< User_request::Ptr > m_pending_user_requests_q
Queue storing deficit async-accept requests queued up due to lacking pending ready peer socket handle...
void finalize_q_surplus_on_error()
In thread W, in steady state except for an Error_code just pushed to the back of m_pending_results_q ...
~Native_socket_stream_acceptor()
Destroys this acceptor which will stop listening in the background and cancel any pending completion ...
Native_socket_stream_acceptor(flow::log::Logger *logger_ptr, const Shared_name &absolute_name, Error_code *err_code=0)
Creates the Native_socket_stream_acceptor and immediately begins listening in the background,...
asio_local_stream_socket::Peer_socket m_next_peer_socket
Unix domain peer socket, always empty/unconnected while a background m_acceptor.async_accept() is pro...
void feed_error_result_to_deficit(const Error_code &err_code)
In thread W, gets back to steady state by feeding the given Error_code (which must be the sole elemen...
boost::movelib::unique_ptr< asio_local_stream_socket::Acceptor > m_acceptor
Unix domain socket acceptor.
flow::async::Task_asio_err On_peer_accepted_func
Short-hand for callback called on new peer-to-peer connection; or on unrecoverable error.
void finalize_q_surplus_on_success()
In thread W, in steady state, introduces the just-established peer socket handle into the state machi...
std::queue< std::variant< Peer_ptr, Error_code > > m_pending_results_q
Queue storing surplus finalized async-accept results queued up due to lacking async_accept() requests...
const Shared_name & absolute_name() const
Returns the full name/address to which the constructor bound, or attempted to bind,...
void feed_success_result_to_deficit(Peer_ptr &&peer)
In thread W, gets back to steady state by feeding the given just-connected peer socket (which must ha...
void on_next_peer_socket_or_error(const Error_code &sys_err_code)
Handler for incoming connection on m_acceptor.
Implements both sync_io::Native_handle_sender and sync_io::Native_handle_receiver concepts by using a...
String-wrapping abstraction representing a name uniquely distinguishing a kernel-persistent entity fr...
const std::string & str() const
Returns (sans copying) ref to immutable entire wrapped name string, suitable to pass into sys calls w...
Protocol::endpoint Endpoint
Short-hand for boost.asio Unix domain peer stream-socket endpoint.
Protocol::acceptor Acceptor
Short-hand for boost.asio Unix domain stream-socket acceptor (listening guy) socket.
Protocol::socket Peer_socket
Short-hand for boost.asio Unix domain peer stream-socket (usually-connected-or-empty guy).
Endpoint endpoint_at_shared_name(flow::log::Logger *logger_ptr, const Shared_name &absolute_name, Error_code *err_code)
Returns an Endpoint corresponding to the given absolute Shared_name, so that an Acceptor or Peer_sock...
@ S_OBJECT_SHUTDOWN_ABORTED_COMPLETION_HANDLER
Async completion handler is being called prematurely, because underlying object is shutting down,...
Flow-IPC module providing transmission of structured messages and/or low-level blobs (and more) betwe...
util::Shared_name Shared_name
Convenience alias for the commonly used type util::Shared_name.
std::ostream & operator<<(std::ostream &os, const Bipc_mq_handle &val)
Prints string representation of the given Bipc_mq_handle to the given ostream.
flow::util::String_view String_view
Short-hand for Flow's String_view.
Log_component
The flow::log::Component payload enumeration containing various log components used by Flow-IPC inter...
flow::Error_code Error_code
Short-hand for flow::Error_code which is very common.
A monolayer-thin wrapper around a native handle, a/k/a descriptor a/k/a FD.
bool null() const
Returns true if and only if m_native_handle equals S_NULL_HANDLE.