23#include <flow/error/error.hpp> 
   24#include <flow/common.hpp> 
   25#include <boost/move/make_unique.hpp> 
   40  flow::log::Log_context(logger_ptr, 
Log_component::S_TRANSPORT),
 
   41  m_absolute_name(absolute_name_arg),
 
   42  m_worker(get_logger(), flow::util::ostream_op_string(*this)), 
 
   43  m_next_peer_socket(*(m_worker.task_engine())) 
 
   49  using flow::error::Runtime_error;
 
   50  using boost::system::system_error;
 
   58  FLOW_LOG_TRACE(
"Acceptor [" << *
this << 
"]: Awaiting initial setup/listening in worker thread.");
 
   61    auto const asio_engine = 
m_worker.task_engine();
 
   63    FLOW_LOG_INFO(
"Acceptor [" << *
this << 
"]: Starting (am in worker thread).");
 
   66    assert((local_endpoint == 
Endpoint()) == 
bool(sys_err_code));
 
   81    catch (
const system_error& exc)
 
   84      FLOW_LOG_WARNING(
"Acceptor [" << *
this << 
"]: Unable to open/bind/listen native local stream socket; could " 
   85                       "be due to address/name clash; details logged below.");
 
   86      sys_err_code = exc.code();
 
   87      FLOW_ERROR_SYS_ERROR_LOG_WARNING();
 
   91    FLOW_LOG_INFO(
"Acceptor [" << *
this << 
"]: " 
   92                  "Successfully made endpoint and open/bind/listen-ed on it.  Ready for connections.");
 
  106    assert(!sys_err_code); 
 
  119      *err_code = sys_err_code;
 
  123    throw Runtime_error(sys_err_code, FLOW_UTIL_WHERE_AM_I_STR());
 
  126  assert(!sys_err_code);
 
  128  FLOW_LOG_INFO(
"Acceptor [" << *
this << 
"]: Ready for incoming connections.");
 
  133  using flow::async::Single_thread_task_loop;
 
  137  FLOW_LOG_INFO(
"Acceptor [" << *
this << 
"]: Shutting down.  Next acceptor socket will close; all our internal " 
  138                "async handlers will be canceled; and worker thread thread will be joined.");
 
  146  FLOW_LOG_INFO(
"Acceptor [" << *
this << 
"]: Continuing shutdown.  Next we will run pending handlers from some " 
  147                "other thread.  In this user thread we will await those handlers' completion and then return.");
 
  148  Single_thread_task_loop one_thread(get_logger(), 
"temp_deinit");
 
  149  one_thread.start([&]()
 
  151    FLOW_LOG_INFO(
"Acceptor [" << *
this << 
"]: " 
  152                  "In transient finisher thread: Shall run all pending internal handlers (typically none).");
 
  154    const auto task_engine = 
m_worker.task_engine();
 
  155    task_engine->restart();
 
  156    const auto count = task_engine->poll();
 
  159      FLOW_LOG_INFO(
"Acceptor [" << *
this << 
"]: " 
  160                    "In transient finisher thread: Ran [" << count << 
"] internal handlers after all.");
 
  164    FLOW_LOG_INFO(
"Acceptor [" << *
this << 
"]: " 
  165                  "In transient finisher thread: Shall run all pending user handlers (feeding operation-aborted).");
 
  169      FLOW_LOG_TRACE(
"Running a queued async-accept completion handler.");
 
  173      FLOW_LOG_TRACE(
"User accept handler finished.  Popped from user request deficit queue.");
 
  176    FLOW_LOG_INFO(
"Transient finisher exiting.");
 
  184  using flow::util::ostream_op_string;
 
  185  using std::holds_alternative;
 
  188  if (sys_err_code == boost::asio::error::operation_aborted)
 
  193  assert(sys_err_code != boost::asio::error::would_block); 
 
  195  FLOW_LOG_TRACE(
"Acceptor [" << *
this << 
"]: Incoming connection, or error when trying to accept one.");
 
  202    if (sys_err_code == boost::asio::error::connection_aborted)
 
  204      FLOW_LOG_WARNING(
"Incoming connection aborted halfway during connection; this is quite weird but " 
  205                       "should not be fatal.  Ignoring.  Still listening.");
 
  210      FLOW_LOG_WARNING(
"Acceptor [" << *
this << 
"]: The background accept failed fatally.  " 
  211                       "Closing acceptor; no longer listening.  Details follow.");
 
  212      FLOW_ERROR_SYS_ERROR_LOG_WARNING();
 
  265    static_assert(
false, 
"Should not have gotten to this line; should have required Linux; " 
  266                           "the next thing assumes not-Win-<8.1.");
 
  271    FLOW_LOG_TRACE(
"Acceptor [" << *
this << 
"]: " 
  272                   "Ejected ownership of new incoming peer socket [" << native_peer_socket << 
"].");
 
  275      = boost::movelib::make_unique<Peer>
 
  279           std::move(native_peer_socket));
 
  281    assert(native_peer_socket.
null());
 
  301  FLOW_LOG_TRACE(
"Acceptor [" << *
this << 
"]: Starting the next background accept.");
 
  312  using boost::movelib::make_unique;
 
  314  using std::holds_alternative;
 
  327  m_worker.post([
this, target_peer, on_done_func = std::move(on_done_func)]
 
  332    FLOW_LOG_TRACE(
"Acceptor [" << *
this << 
"]: Handling async-accept request.");
 
  334    auto new_req = make_unique<User_request>();
 
  335    new_req->m_target_peer = target_peer;
 
  336    new_req->m_on_done_func = std::move(on_done_func);
 
  344      FLOW_LOG_TRACE(
"Acceptor [" << *
this << 
"]: New async-accept request pushed onto deficit queue; " 
  345                     "but there is no surplus (no pending results).  Will await results.");
 
  355    if (holds_alternative<Error_code>(peer_or_err_code))
 
  358      FLOW_LOG_TRACE(
"Acceptor [" << *
this << 
"]: New async-request pushed onto deficit queue; " 
  359                     "and there is surplus in the form of a fatal error code.  Will feed error to the request " 
  360                     "*without* popping it from surplus queue (size remains 1).");
 
  365      assert(holds_alternative<Peer_ptr>(peer_or_err_code));
 
  367      FLOW_LOG_TRACE(
"Acceptor [" << *
this << 
"]: New async-request pushed onto deficit queue; " 
  368                     "and there is surplus in the form of a new peer handle.  Will feed handle to the request.  " 
  371      Peer_ptr peer(std::move(get<Peer_ptr>(peer_or_err_code)));
 
  381  using std::holds_alternative;
 
  389    FLOW_LOG_TRACE(
"Acceptor [" << *
this << 
"]: Fatal error pushed onto surplus queue; " 
  390                   "but there is no deficit (no pending requests).  Will await async-accept request(s).");
 
  400  FLOW_LOG_TRACE(
"Acceptor [" << *
this << 
"]: Fatal error pushed onto surplus queue; " 
  401                 "and there is deficit (1+ pending requests).  Will feed error to all pending requests *without* " 
  402                 "popping surplus queue, whose size remains 1.");
 
  409  using std::holds_alternative;
 
  417    FLOW_LOG_TRACE(
"Acceptor [" << *
this << 
"]: New peer socket handle pushed onto surplus queue; " 
  418                   "but there is no deficit (no pending requests).  Will await async-accept request(s).");
 
  429  FLOW_LOG_TRACE(
"Acceptor [" << *
this << 
"]: New peer socket handle pushed onto surplus queue; " 
  430                 "and there is deficit (1+ pending requests).  Will feed to next pending request, having " 
  431                 "popped it from surplus queue (size is now 0).");
 
  442    FLOW_LOG_TRACE(
"Acceptor [" << *
this << 
"]: Feeding to user async-accept request handler [" << idx << 
"]: " 
  443                   "Error code [" << err_code << 
"] [" << err_code.message() << 
"].");
 
  456  FLOW_LOG_TRACE(
"Acceptor [" << *
this << 
"]: Feeding to user async-accept request handler: " 
  457                 "Socket stream [" << *peer << 
"].  User request queue size post-pop is " 
  460  *head_request->m_target_peer = std::move(*peer);
 
  472  return os << 
"sh_name[" << val.
absolute_name() << 
"]@" << 
static_cast<const void*
>(&val);
 
A server object that binds to a Shared_name and listens for incoming Native_socket_stream connect att...
static const Shared_name & S_RESOURCE_TYPE_ID
Shared_name relative-folder fragment (no separators) identifying this resource type.
void async_accept_impl(Peer *target_peer, On_peer_accepted_func &&on_done_func)
Non-template impl of async_accept().
flow::async::Single_thread_task_loop m_worker
A single-threaded async task loop that starts in constructor and ends in destructor.
const Shared_name m_absolute_name
See absolute_name().
boost::movelib::unique_ptr< Peer > Peer_ptr
Short-hand for internally stored PEER-state sync_io::Native_socket_stream in m_pending_results_q.
std::queue< User_request::Ptr > m_pending_user_requests_q
Queue storing deficit async-accept requests queued up due to lacking pending ready peer socket handle...
void finalize_q_surplus_on_error()
In thread W, in steady state except for an Error_code just pushed to the back of m_pending_results_q ...
~Native_socket_stream_acceptor()
Destroys this acceptor which will stop listening in the background and cancel any pending completion ...
Native_socket_stream_acceptor(flow::log::Logger *logger_ptr, const Shared_name &absolute_name, Error_code *err_code=0)
Creates the Native_socket_stream_acceptor and immediately begins listening in the background,...
asio_local_stream_socket::Peer_socket m_next_peer_socket
Unix domain peer socket, always empty/unconnected while a background m_acceptor.async_accept() is pro...
void feed_error_result_to_deficit(const Error_code &err_code)
In thread W, gets back to steady state by feeding the given Error_code (which must be the sole elemen...
boost::movelib::unique_ptr< asio_local_stream_socket::Acceptor > m_acceptor
Unix domain socket acceptor.
flow::async::Task_asio_err On_peer_accepted_func
Short-hand for callback called on new peer-to-peer connection; or on unrecoverable error.
void finalize_q_surplus_on_success()
In thread W, in steady state, introduces the just-established peer socket handle into the state machi...
std::queue< std::variant< Peer_ptr, Error_code > > m_pending_results_q
Queue storing surplus finalized async-accept results queued up due to lacking async_accept() requests...
const Shared_name & absolute_name() const
Returns the full name/address to which the constructor bound, or attempted to bind,...
void feed_success_result_to_deficit(Peer_ptr &&peer)
In thread W, gets back to steady state by feeding the given just-connected peer socket (which must ha...
void on_next_peer_socket_or_error(const Error_code &sys_err_code)
Handler for incoming connection on m_acceptor.
Implements both sync_io::Native_handle_sender and sync_io::Native_handle_receiver concepts by using a...
String-wrapping abstraction representing a name uniquely distinguishing a kernel-persistent entity fr...
const std::string & str() const
Returns (sans copying) ref to immutable entire wrapped name string, suitable to pass into sys calls w...
Protocol::endpoint Endpoint
Short-hand for boost.asio Unix domain peer stream-socket endpoint.
Protocol::acceptor Acceptor
Short-hand for boost.asio Unix domain stream-socket acceptor (listening guy) socket.
Protocol::socket Peer_socket
Short-hand for boost.asio Unix domain peer stream-socket (usually-connected-or-empty guy).
Endpoint endpoint_at_shared_name(flow::log::Logger *logger_ptr, const Shared_name &absolute_name, Error_code *err_code)
Returns an Endpoint corresponding to the given absolute Shared_name, so that an Acceptor or Peer_sock...
@ S_OBJECT_SHUTDOWN_ABORTED_COMPLETION_HANDLER
Async completion handler is being called prematurely, because underlying object is shutting down,...
Flow-IPC module providing transmission of structured messages and/or low-level blobs (and more) betwe...
util::Shared_name Shared_name
Convenience alias for the commonly used type util::Shared_name.
std::ostream & operator<<(std::ostream &os, const Bipc_mq_handle &val)
Prints string representation of the given Bipc_mq_handle to the given ostream.
flow::util::String_view String_view
Short-hand for Flow's String_view.
Log_component
The flow::log::Component payload enumeration containing various log components used by Flow-IPC inter...
flow::Error_code Error_code
Short-hand for flow::Error_code which is very common.
A monolayer-thin wrapper around a native handle, a/k/a descriptor a/k/a FD.
bool null() const
Returns true if and only if m_native_handle equals S_NULL_HANDLE.