23#include <flow/error/error.hpp> 
   24#include <flow/common.hpp> 
   25#include <boost/move/make_unique.hpp> 
   40  flow::log::Log_context(logger_ptr, 
Log_component::S_TRANSPORT),
 
   41  m_absolute_name(absolute_name_arg),
 
   42  m_worker(get_logger(), 
 
   45           flow::util::ostream_op_string(
"NSSA-", m_absolute_name)),
 
   46  m_next_peer_socket(*(m_worker.task_engine())) 
 
   52  using flow::error::Runtime_error;
 
   53  using flow::async::reset_thread_pinning;
 
   54  using boost::system::system_error;
 
   62  FLOW_LOG_TRACE(
"Acceptor [" << *
this << 
"]: Awaiting initial setup/listening in worker thread.");
 
   65    reset_thread_pinning(get_logger()); 
 
   67    auto const asio_engine = 
m_worker.task_engine();
 
   69    FLOW_LOG_INFO(
"Acceptor [" << *
this << 
"]: Starting (am in worker thread).");
 
   72    assert((local_endpoint == 
Endpoint()) == 
bool(sys_err_code));
 
   87    catch (
const system_error& exc)
 
   90      FLOW_LOG_WARNING(
"Acceptor [" << *
this << 
"]: Unable to open/bind/listen native local stream socket; could " 
   91                       "be due to address/name clash; details logged below.");
 
   92      sys_err_code = exc.code();
 
   93      FLOW_ERROR_SYS_ERROR_LOG_WARNING();
 
   97    FLOW_LOG_INFO(
"Acceptor [" << *
this << 
"]: " 
   98                  "Successfully made endpoint and open/bind/listen-ed on it.  Ready for connections.");
 
  112    assert(!sys_err_code); 
 
  125      *err_code = sys_err_code;
 
  129    throw Runtime_error(sys_err_code, FLOW_UTIL_WHERE_AM_I_STR());
 
  132  assert(!sys_err_code);
 
  134  FLOW_LOG_INFO(
"Acceptor [" << *
this << 
"]: Ready for incoming connections.");
 
  139  using flow::async::Single_thread_task_loop;
 
  140  using flow::async::reset_thread_pinning;
 
  141  using flow::util::ostream_op_string;
 
  145  FLOW_LOG_INFO(
"Acceptor [" << *
this << 
"]: Shutting down.  Next acceptor socket will close; all our internal " 
  146                "async handlers will be canceled; and worker thread thread will be joined.");
 
  154  FLOW_LOG_INFO(
"Acceptor [" << *
this << 
"]: Continuing shutdown.  Next we will run pending handlers from some " 
  155                "other thread.  In this user thread we will await those handlers' completion and then return.");
 
  156  Single_thread_task_loop one_thread(get_logger(), ostream_op_string(
"NSSADeinit-", 
m_absolute_name));
 
  158  one_thread.start([&]()
 
  160    reset_thread_pinning(get_logger()); 
 
  162    FLOW_LOG_INFO(
"Acceptor [" << *
this << 
"]: " 
  163                  "In transient finisher thread: Shall run all pending internal handlers (typically none).");
 
  165    const auto task_engine = 
m_worker.task_engine();
 
  166    task_engine->restart();
 
  167    const auto count = task_engine->poll();
 
  170      FLOW_LOG_INFO(
"Acceptor [" << *
this << 
"]: " 
  171                    "In transient finisher thread: Ran [" << count << 
"] internal handlers after all.");
 
  175    FLOW_LOG_INFO(
"Acceptor [" << *
this << 
"]: " 
  176                  "In transient finisher thread: Shall run all pending user handlers (feeding operation-aborted).");
 
  180      FLOW_LOG_TRACE(
"Running a queued async-accept completion handler.");
 
  184      FLOW_LOG_TRACE(
"User accept handler finished.  Popped from user request deficit queue.");
 
  187    FLOW_LOG_INFO(
"Transient finisher exiting.");
 
  195  using flow::util::ostream_op_string;
 
  196  using std::holds_alternative;
 
  199  if (sys_err_code == boost::asio::error::operation_aborted)
 
  204  assert(sys_err_code != boost::asio::error::would_block); 
 
  206  FLOW_LOG_TRACE(
"Acceptor [" << *
this << 
"]: Incoming connection, or error when trying to accept one.");
 
  213    if (sys_err_code == boost::asio::error::connection_aborted)
 
  215      FLOW_LOG_WARNING(
"Incoming connection aborted halfway during connection; this is quite weird but " 
  216                       "should not be fatal.  Ignoring.  Still listening.");
 
  221      FLOW_LOG_WARNING(
"Acceptor [" << *
this << 
"]: The background accept failed fatally.  " 
  222                       "Closing acceptor; no longer listening.  Details follow.");
 
  223      FLOW_ERROR_SYS_ERROR_LOG_WARNING();
 
  276    static_assert(
false, 
"Should not have gotten to this line; should have required Linux; " 
  277                           "the next thing assumes not-Win-<8.1.");
 
  282    FLOW_LOG_TRACE(
"Acceptor [" << *
this << 
"]: " 
  283                   "Ejected ownership of new incoming peer socket [" << native_peer_socket << 
"].");
 
  286      = boost::movelib::make_unique<Peer>
 
  290           std::move(native_peer_socket));
 
  292    assert(native_peer_socket.
null());
 
  312  FLOW_LOG_TRACE(
"Acceptor [" << *
this << 
"]: Starting the next background accept.");
 
  323  using boost::movelib::make_unique;
 
  325  using std::holds_alternative;
 
  338  m_worker.post([
this, target_peer, on_done_func = std::move(on_done_func)]
 
  343    FLOW_LOG_TRACE(
"Acceptor [" << *
this << 
"]: Handling async-accept request.");
 
  345    auto new_req = make_unique<User_request>();
 
  346    new_req->m_target_peer = target_peer;
 
  347    new_req->m_on_done_func = std::move(on_done_func);
 
  355      FLOW_LOG_TRACE(
"Acceptor [" << *
this << 
"]: New async-accept request pushed onto deficit queue; " 
  356                     "but there is no surplus (no pending results).  Will await results.");
 
  366    if (holds_alternative<Error_code>(peer_or_err_code))
 
  369      FLOW_LOG_TRACE(
"Acceptor [" << *
this << 
"]: New async-request pushed onto deficit queue; " 
  370                     "and there is surplus in the form of a fatal error code.  Will feed error to the request " 
  371                     "*without* popping it from surplus queue (size remains 1).");
 
  376      assert(holds_alternative<Peer_ptr>(peer_or_err_code));
 
  378      FLOW_LOG_TRACE(
"Acceptor [" << *
this << 
"]: New async-request pushed onto deficit queue; " 
  379                     "and there is surplus in the form of a new peer handle.  Will feed handle to the request.  " 
  382      Peer_ptr peer(std::move(get<Peer_ptr>(peer_or_err_code)));
 
  392  using std::holds_alternative;
 
  400    FLOW_LOG_TRACE(
"Acceptor [" << *
this << 
"]: Fatal error pushed onto surplus queue; " 
  401                   "but there is no deficit (no pending requests).  Will await async-accept request(s).");
 
  411  FLOW_LOG_TRACE(
"Acceptor [" << *
this << 
"]: Fatal error pushed onto surplus queue; " 
  412                 "and there is deficit (1+ pending requests).  Will feed error to all pending requests *without* " 
  413                 "popping surplus queue, whose size remains 1.");
 
  420  using std::holds_alternative;
 
  428    FLOW_LOG_TRACE(
"Acceptor [" << *
this << 
"]: New peer socket handle pushed onto surplus queue; " 
  429                   "but there is no deficit (no pending requests).  Will await async-accept request(s).");
 
  440  FLOW_LOG_TRACE(
"Acceptor [" << *
this << 
"]: New peer socket handle pushed onto surplus queue; " 
  441                 "and there is deficit (1+ pending requests).  Will feed to next pending request, having " 
  442                 "popped it from surplus queue (size is now 0).");
 
  453    FLOW_LOG_TRACE(
"Acceptor [" << *
this << 
"]: Feeding to user async-accept request handler [" << idx << 
"]: " 
  454                   "Error code [" << err_code << 
"] [" << err_code.message() << 
"].");
 
  467  FLOW_LOG_TRACE(
"Acceptor [" << *
this << 
"]: Feeding to user async-accept request handler: " 
  468                 "Socket stream [" << *peer << 
"].  User request queue size post-pop is " 
  471  *head_request->m_target_peer = std::move(*peer);
 
  483  return os << 
"sh_name[" << val.
absolute_name() << 
"]@" << 
static_cast<const void*
>(&val);
 
A server object that binds to a Shared_name and listens for incoming Native_socket_stream connect att...
static const Shared_name & S_RESOURCE_TYPE_ID
Shared_name relative-folder fragment (no separators) identifying this resource type.
void async_accept_impl(Peer *target_peer, On_peer_accepted_func &&on_done_func)
Non-template impl of async_accept().
flow::async::Single_thread_task_loop m_worker
A single-threaded async task loop that starts in constructor and ends in destructor.
const Shared_name m_absolute_name
See absolute_name().
boost::movelib::unique_ptr< Peer > Peer_ptr
Short-hand for internally stored PEER-state sync_io::Native_socket_stream in m_pending_results_q.
std::queue< User_request::Ptr > m_pending_user_requests_q
Queue storing deficit async-accept requests queued up due to lacking pending ready peer socket handle...
void finalize_q_surplus_on_error()
In thread W, in steady state except for an Error_code just pushed to the back of m_pending_results_q ...
~Native_socket_stream_acceptor()
Destroys this acceptor which will stop listening in the background and cancel any pending completion ...
Native_socket_stream_acceptor(flow::log::Logger *logger_ptr, const Shared_name &absolute_name, Error_code *err_code=0)
Creates the Native_socket_stream_acceptor and immediately begins listening in the background,...
asio_local_stream_socket::Peer_socket m_next_peer_socket
Unix domain peer socket, always empty/unconnected while a background m_acceptor.async_accept() is pro...
void feed_error_result_to_deficit(const Error_code &err_code)
In thread W, gets back to steady state by feeding the given Error_code (which must be the sole elemen...
boost::movelib::unique_ptr< asio_local_stream_socket::Acceptor > m_acceptor
Unix domain socket acceptor.
flow::async::Task_asio_err On_peer_accepted_func
Short-hand for callback called on new peer-to-peer connection; or on unrecoverable error.
void finalize_q_surplus_on_success()
In thread W, in steady state, introduces the just-established peer socket handle into the state machi...
std::queue< std::variant< Peer_ptr, Error_code > > m_pending_results_q
Queue storing surplus finalized async-accept results queued up due to lacking async_accept() requests...
const Shared_name & absolute_name() const
Returns the full name/address to which the constructor bound, or attempted to bind,...
void feed_success_result_to_deficit(Peer_ptr &&peer)
In thread W, gets back to steady state by feeding the given just-connected peer socket (which must ha...
void on_next_peer_socket_or_error(const Error_code &sys_err_code)
Handler for incoming connection on m_acceptor.
Implements both sync_io::Native_handle_sender and sync_io::Native_handle_receiver concepts by using a...
String-wrapping abstraction representing a name uniquely distinguishing a kernel-persistent entity fr...
const std::string & str() const
Returns (sans copying) ref to immutable entire wrapped name string, suitable to pass into sys calls w...
Protocol::endpoint Endpoint
Short-hand for boost.asio Unix domain peer stream-socket endpoint.
Protocol::acceptor Acceptor
Short-hand for boost.asio Unix domain stream-socket acceptor (listening guy) socket.
Protocol::socket Peer_socket
Short-hand for boost.asio Unix domain peer stream-socket (usually-connected-or-empty guy).
Endpoint endpoint_at_shared_name(flow::log::Logger *logger_ptr, const Shared_name &absolute_name, Error_code *err_code)
Returns an Endpoint corresponding to the given absolute Shared_name, so that an Acceptor or Peer_sock...
@ S_OBJECT_SHUTDOWN_ABORTED_COMPLETION_HANDLER
Async completion handler is being called prematurely, because underlying object is shutting down,...
Flow-IPC module providing transmission of structured messages and/or low-level blobs (and more) betwe...
util::Shared_name Shared_name
Convenience alias for the commonly used type util::Shared_name.
std::ostream & operator<<(std::ostream &os, const Bipc_mq_handle &val)
Prints string representation of the given Bipc_mq_handle to the given ostream.
flow::util::String_view String_view
Short-hand for Flow's String_view.
Log_component
The flow::log::Component payload enumeration containing various log components used by Flow-IPC inter...
flow::Error_code Error_code
Short-hand for flow::Error_code which is very common.
A monolayer-thin wrapper around a native handle, a/k/a descriptor a/k/a FD.
bool null() const
Returns true if and only if m_native_handle equals S_NULL_HANDLE.