23#include <boost/algorithm/cxx11/all_of.hpp> 
   24#include <boost/algorithm/cxx11/one_of.hpp> 
   25#include <boost/functional.hpp> 
   44  m_state(
State::S_CLOSED), 
 
   46  m_want(empty_ev_type_to_socks_map()), 
 
   47  m_can(empty_ev_type_to_socks_map()), 
 
   48  m_baseline_check_pending(false)
 
   77  using boost::algorithm::all_of;
 
  247    FLOW_LOG_TRACE(
"Event_set [" << 
this << 
"] wait finish requested; moving from " 
  257    FLOW_LOG_TRACE(
"Event_set [" << 
this << 
"] wait finish requested, but state was already " 
  322  const bool baseline_check_ok =
 
  325  assert(baseline_check_ok); 
 
  337  using boost::promise;
 
  338  using boost::unique_future;
 
  339  using boost::future_status;
 
  340  using boost::chrono::milliseconds;
 
  341  using boost::chrono::round;
 
  342  using boost::algorithm::one_of;
 
  344  assert(max_wait.count() > 0);
 
  375  promise<bool> intr_promise; 
 
  376  unique_future<bool> intr_future = intr_promise.get_future(); 
 
  382  if (!
async_wait([&intr_promise](
bool interrupted) { intr_promise.set_value(interrupted); },
 
  390  bool timed_out = 
false;
 
  393  if (max_wait == Fine_duration::max())
 
  402    if (intr_future.wait_for(max_wait) != future_status::ready)
 
  417      FLOW_LOG_INFO(
"Synchronous wait on Event_set [" << 
this << 
"] timed out; " 
  418                    "timeout = [" << round<milliseconds>(max_wait) << 
"].");
 
  438    if (intr_future.get())
 
  440      FLOW_LOG_INFO(
"Synchronous wait on Event_set [" << 
this << 
"] was interrupted.");
 
  453  assert((!err_code) || (!*err_code)); 
 
  459  using boost::chrono::microseconds;
 
  460  return sync_wait(microseconds(microseconds::max()), err_code); 
 
  509  FLOW_LOG_TRACE(
"Wanted set for event type [" << ev_type << 
"] swapped in Event_set [" << 
this << 
"]; pre-swap sizes: " 
  510                 "Event_set [" << want_set.
size() << 
"]; user [" << target_set->
size() << 
"].");
 
  520  target_set->
swap(want_set);
 
  537  FLOW_LOG_TRACE(
"Wanted set for event type [" << ev_type << 
"] cleared in Event_set [" << 
this << 
"]; " 
  538                 "size [" << want_set.
size() << 
"].");
 
  556  using boost::algorithm::one_of;
 
  579  using boost::algorithm::one_of;
 
  605  FLOW_LOG_TRACE(
"Wait result set checked for activity in Event_set [" << 
this << 
"]; " 
  625  FLOW_LOG_TRACE(
"Wait result set for event type [" << ev_type << 
"] emitted in Event_set [" << 
this << 
"]; " 
  626                 "size [" << can_set.
size() << 
"].");
 
  638  can_set.
swap(*target_set);
 
  659  FLOW_LOG_TRACE(
"Wait result set for event type [" << ev_type << 
"] cleared in Event_set [" << 
this << 
"]; " 
  660                 "size [" << can_set.
size() << 
"].");
 
  717  FLOW_LOG_TRACE(
"Clearing sets in Event_set [" << 
this << 
"]; pre-clear set sizes: " 
  747  assert(ev_type_to_socks_map);
 
  748  for (
auto& ev_type_and_socks : *ev_type_to_socks_map)
 
  750    ev_type_and_socks.second.clear();
 
  756  return ev_type_and_socks.second.empty();
 
  764  size_t n_left = ev_type_to_socks_map.
size();
 
  765  for (
const auto& ev_type_and_socks : ev_type_to_socks_map)
 
  768                               ev_type_and_socks.first, 
": ", ev_type_and_socks.second.size(),
 
  769                               (((--n_left) == 0) ? 
"" : 
", "));
 
  777  using boost::any_cast;
 
  785  const auto& type_id = sock_as_any.type();
 
  796    assert(sock_as_any.empty());
 
  809  using boost::any_cast;
 
  820  const auto& type_id = sock_as_any.type();
 
  823    return hash<Peer_socket::Ptr>()(any_cast<Peer_socket::Ptr>(sock_as_any));
 
  828    return hash<Server_socket::Ptr>()(any_cast<Server_socket::Ptr>(sock_as_any));
 
  832  assert(sock_as_any.empty());
 
  837                                                 const boost::any& sock_as_any2)
 const 
  839  using boost::any_cast;
 
  850  if (sock_as_any1.type() != sock_as_any2.type())
 
  856  const auto& type_id = sock_as_any1.type();
 
  860    return any_cast<Peer_socket::Ptr>(sock_as_any1) == any_cast<Peer_socket::Ptr>(sock_as_any2);
 
  865    return any_cast<Server_socket::Ptr>(sock_as_any1) == any_cast<Server_socket::Ptr>(sock_as_any2);
 
  869  assert(sock_as_any1.empty()); 
 
  870  assert(sock_as_any2.empty());
 
  902  event_set->m_node = 
this;
 
  903  event_set->m_baseline_check_pending = 
false;
 
  907    m_event_sets.insert(event_set);
 
  918  using boost::asio::post;
 
  937  FLOW_LOG_TRACE(
"Event_set [" << event_set << 
"] wait requested; moving from " 
  942  event_set->m_on_event = on_event;
 
  954  post(m_task_engine, [
this, event_set]() { event_set_check_baseline_assuming_state(event_set); });
 
  976  event_set->m_baseline_check_pending = 
true;
 
 1002    FLOW_LOG_TRACE(
"Event_set [" << event_set << 
"] baseline check ran, but state is no " 
 1008  if (event_set_check_baseline(event_set)) 
 
 1011    event_set_fire_if_got_events(event_set); 
 
 1018  using boost::any_cast;
 
 1035  if (!event_set->m_baseline_check_pending)
 
 1039    FLOW_LOG_TRACE(
"Event_set [" << event_set << 
"] baseline check ran, but skipping because same check already " 
 1045  FLOW_LOG_TRACE(
"Event_set [" << event_set << 
"] baseline check started.");
 
 1046  event_set->m_baseline_check_pending = 
false;
 
 1052                "Expecting amortized constant time insertion sockets container."); 
 
 1061    const auto& does_event_hold = ev_type_and_is_active_mtd.second;
 
 1065    assert(can_set.
empty()); 
 
 1066    for (
const any& sock_as_any : want_set)
 
 1068      if (does_event_hold(
this, sock_as_any))
 
 1072                       "Event_set [" << event_set << 
"].");
 
 1074        can_set.
insert(sock_as_any);
 
 1084  using boost::algorithm::all_of;
 
 1103  FLOW_LOG_TRACE(
"Event_set [" << event_set << 
"] has ready events; firing and moving from " 
 1106  event_set->m_on_event(
false);
 
 1109  event_set->m_on_event.clear();
 
 1132  using boost::algorithm::all_of;
 
 1143                 "defer_delta_check = [" << defer_delta_check << 
"]; " 
 1146  if (defer_delta_check)
 
 1201    if (!event_set_check_baseline(event_set)) 
 
 1205      FLOW_LOG_TRACE(
"Event_set [" << event_set << 
"] delta check started.");
 
 1214      for (
auto& ev_type_and_socks_from_node : m_sock_events)
 
 1222        assert(can_set.
empty());
 
 1237                      "Expecting amortized constant time search sockets container.");
 
 1239                      "Expecting amortized constant time insertion sockets container.");
 
 1241        const bool want_set_smaller = want_set.
size() < all_can_set.
size();
 
 1245        for (
const any& sock_as_any : small_set)
 
 1251                           "Event_set [" << event_set << 
"].");
 
 1253            can_set.
insert(sock_as_any);
 
 1264    event_set_fire_if_got_events(event_set); 
 
 1275  using boost::adopt_lock;
 
 1323    event_set_close_worker(event_set);
 
 1363  FLOW_LOG_TRACE(
"Closing Event_set [" << event_set << 
"]: changing state " 
 1368  event_set->m_node = 0; 
 
 1380  event_set->m_on_event.clear();
 
 1384  const bool erased = 1 ==
 
 1386    m_event_sets.erase(event_set);
 
 1392  using boost::asio::post;
 
 1395        ([
this](
Error_code* actual_err_code) { interrupt_all_waits(actual_err_code); },
 
 1414  post(m_task_engine, [
this]() { interrupt_all_waits_worker(); });
 
 1423  FLOW_LOG_INFO(
"Executing request to interrupt all waiting Event_sets.");
 
 1438      FLOW_LOG_INFO(
"Event_set [" << event_set << 
"] is being interrupted; firing and moving from " 
 1447      event_set->m_on_event(
true); 
 
 1448      event_set->m_on_event.clear();
 
 1458  if (sys_err_code == boost::asio::error::operation_aborted)
 
 1464  FLOW_LOG_INFO(
"Internal interrupt signal handler executed with signal number [" << sig_number << 
"].");
 
 1470    FLOW_LOG_WARNING(
"Internal signal handler executed with unexpected error indicator.  Strange!  " 
 1471                     "Ignoring and continuing other operation.");
 
 1483    interrupt_all_waits_worker();
 
 1487  m_signal_set.async_wait([
this](
const Error_code& sys_err_code, 
int sig_num)
 
 1489    interrupt_all_waits_internal_sig_handler(sys_err_code, sig_num);
 
 1500#define STATE_TO_CASE_STATEMENT(ARG_state) \ 
 1501  case Event_set::State::S_##ARG_state: \ 
 1502    return os << #ARG_state 
 1511    STATE_TO_CASE_STATEMENT(INACTIVE);
 
 1512    STATE_TO_CASE_STATEMENT(WAITING);
 
 1513    STATE_TO_CASE_STATEMENT(CLOSED);
 
 1516#undef STATE_TO_CASE_STATEMENT 
 1523#define TYPE_TO_CASE_STATEMENT(ARG_type) \ 
 1524  case Event_set::Event_type::S_##ARG_type: \ 
 1525    return os << #ARG_type 
 1535    TYPE_TO_CASE_STATEMENT(PEER_SOCKET_READABLE);
 
 1536    TYPE_TO_CASE_STATEMENT(PEER_SOCKET_WRITABLE);
 
 1537    TYPE_TO_CASE_STATEMENT(SERVER_SOCKET_ACCEPTABLE);
 
 1540#undef TYPE_TO_CASE_STATEMENT 
Logger * get_logger() const
Returns the stored Logger pointer, particularly as many FLOW_LOG_*() macros expect.
Interface that the user should implement, passing the implementing Logger into logging classes (Flow'...
bool operator()(const boost::any &sock_as_any1, const boost::any &sock_as_any2) const
Returns whether the two objects, which must be stored in Sockets objects, are equal by value.
size_t operator()(const boost::any &sock_as_any) const
Returns hash value of the given object which must be stored in a Sockets object.
Mutex m_mutex
Mutex protecting ALL data in this object.
static void clear_ev_type_to_socks_map(Ev_type_to_socks_map *ev_type_to_socks_map)
Helper that clears each Sockets set inside an Ev_type_to_socks_map.
bool sync_wait(Error_code *err_code=0)
Blocks indefinitely until one or more of the previously described events hold – or the wait is interr...
bool sync_wait_impl(const Fine_duration &max_wait, Error_code *err_code)
Same as the public sync_wait(max_wait) but uses a Fine_clock-based Fine_duration non-template type fo...
Event_type
Type of event or condition of interest supported by class Event_set.
@ S_PEER_SOCKET_WRITABLE
Event type specifying the condition of interest wherein a target Peer_socket sock is such that callin...
@ S_PEER_SOCKET_READABLE
Event type specifying the condition of interest wherein a target Peer_socket sock is such that callin...
@ S_SERVER_SOCKET_ACCEPTABLE
Event type specifying the condition of interest wherein a target Server_socket serv is such that call...
static std::string ev_type_to_socks_map_sizes_to_str(const Ev_type_to_socks_map &ev_type_to_socks_map)
Helper that returns a loggable string summarizing the sizes of the socket sets, by type,...
State
A state of an Event_set.
@ S_WAITING
Waiting state: valid Event_set that is currently waiting on previously described events.
@ S_CLOSED
Node has disowned the Peer_socket; all further operations will result in error.
@ S_INACTIVE
Default state; valid Event_set that is not currently waiting on events.
bool async_wait_finish(Error_code *err_code=0)
Moves object from State::S_WAITING to State::S_INACTIVE, and forgets any handler saved by async_wait(...
bool events_detected(Error_code *err_code=0) const
Returns true if and only if the last wait, if any, detected at least one event.
bool poll(Error_code *err_code=0)
Checks for all previously described events that currently hold, saves them for retrieval via emit_res...
bool async_wait(const Event_handler &on_event, Error_code *err_code=0)
Moves object to State::S_WAITING state, saves the given handler to be executed later (in a different,...
State m_state
See state(). Should be set before user gets access to *this. Must not be modified by non-W threads af...
util::Linked_hash_set< boost::any, Socket_as_any_hash, Socket_as_any_equals > Sockets
A set of sockets of one type, used to communicate sets of desired and resulting events in various Eve...
Event_handler m_on_event
During State::S_WAITING, stores the handler (a void function with 1 bool argument) that will be calle...
bool swap_wanted_sockets(Sockets *target_set, Event_type ev_type, Error_code *err_code)
Efficiently exchanges the current set of sockets we want to know are "ready" by the definiton of the ...
bool emit_result_sockets(Sockets *target_set, Event_type ev_type, Error_code *err_code=0)
Gets the sockets that satisfy the condition of the given Event_type detected during the last wait.
static Ev_type_to_socks_map empty_ev_type_to_socks_map()
Creates a maximally empty Ev_type_to_socks_map: it will have all possible Event_type as keys but only...
Event_set(log::Logger *logger_ptr)
Constructs object; initializes all values to well-defined but possibly meaningless values (0,...
util::Lock_guard< Mutex > Lock_guard
Short-hand for RAII lock guard of Mutex. Use instead of boost::lock_guard for release() at least.
State state() const
Current State of the Event_set.
bool clear_wanted_sockets(Event_type ev_type, Error_code *err_code=0)
Identical to swap_wanted_sockets(&sockets, ev_type, err_code), where originally sockets is empty and ...
bool ok_to_mod_socket_set(Error_code *err_code) const
Helper that ensures the state of *this is such that one may modify the m_can and m_want socket sets.
bool events_wanted(Error_code *err_code=0) const
Returns true if and only if at least one wanted event for at least one socket is registered (via add_...
void close(Error_code *err_code=0)
Clears all stored resources (any desired events, result events, and any handler saved by async_wait()...
static const boost::unordered_map< Event_type, Function< bool(const Node *, const boost::any &)> > S_EV_TYPE_TO_IS_ACTIVE_NODE_MTD
Mapping from each possible Event_type to the Node method that determines whether the condition define...
Node * node() const
Node that produced this Event_set.
bool clear(Error_code *err_code=0)
Forgets all sockets stored in this object in any fashion.
Ev_type_to_socks_map m_can
The sockets, categorized by Event_type of interest, that were found to be "ready" (as defined in the ...
~Event_set()
Boring destructor. Note that deletion is to be handled exclusively via shared_ptr,...
bool m_baseline_check_pending
While in State::S_WAITING, if this is true, an exhaustive check of all desired events is yet to be pe...
static std::string sock_as_any_to_str(const boost::any &sock_as_any)
Helper that returns a loggable string representing the socket stored in the given boost::any that sto...
std::ostream & operator<<(std::ostream &os, Event_set::State state)
Prints string representation of given Event_set state to given standard ostream and returns the latte...
static bool ev_type_to_socks_map_entry_is_empty(const Ev_type_to_socks_map::Value &ev_type_and_socks)
Functional helper that checks whether a given pair in an Ev_type_to_socks_map contains an empty set o...
Ev_type_to_socks_map m_want
The sockets, categorized by Event_type of interest, to check for "ready" status (as defined in the do...
Node * m_node
See node(). Should be set before user gets access to *this. Must not be modified by non-W threads aft...
bool clear_result_sockets(Event_type ev_type, Error_code *err_code=0)
Identical to emit_result_sockets(&sockets, ev_type, err_code), where originally sockets is empty and ...
util::Linked_hash_map< Event_type, Sockets > Ev_type_to_socks_map
Short-hand for type storing a set of socket sets – one per possible Event_type enum value.
An object of this class is a single Flow-protocol networking node, in the sense that: (1) it has a di...
bool sock_is_writable(const boost::any &sock_as_any) const
Returns true if and only if calling sock->send() with at least some arguments would return either non...
void interrupt_all_waits(Error_code *err_code=0)
Interrupts any blocking operation, a/k/a wait, and informs the invoker of that operation that the blo...
bool event_set_check_baseline(Event_set::Ptr event_set)
Checks each desired (Event_set::m_want) event in event_set; any that holds true is saved into event_s...
void event_set_close(Event_set::Ptr event_set, Error_code *err_code)
Implementation of Event_set::close() when Event_set::state() != Event_set::State::S_CLOSED for event_...
void event_set_check_baseline_assuming_state(Event_set::Ptr event_set)
Helper placed by event_set_async_wait() onto thread W to invoke event_set_check_baseline() but first ...
bool serv_is_acceptable(const boost::any &serv_as_any) const
Returns true if and only if calling serv->accept() with at least some arguments would return either n...
void event_set_close_worker(Event_set::Ptr event_set)
The guts of event_set_close_worker_check_state(): same thing, but assumes Event_set::state() == Event...
void event_set_all_check_delta(bool defer_delta_check)
For each WAITING Event_set within the Node: checks for any events that hold, and if any do hold,...
bool event_set_async_wait(Event_set::Ptr event_set, const Event_set::Event_handler &on_event, Error_code *err_code)
Implementation of Event_set::async_wait() when Event_set::state() == Event_set::State::S_INACTIVE.
bool sock_is_readable(const boost::any &sock_as_any) const
Returns true if and only if calling sock->receive() with at least some arguments would return either ...
Event_set::Ptr event_set_create(Error_code *err_code=0)
Creates a new Event_set in Event_set::State::S_INACTIVE state with no sockets/events stored; returns ...
void interrupt_all_waits_worker()
Thread W implementation of interrupt_all_waits().
void interrupt_all_waits_internal_sig_handler(const Error_code &sys_err_code, int sig_number)
signal_set handler, executed on SIGINT and SIGTERM, if user has enabled this feature: causes interrup...
void event_set_fire_if_got_events(Event_set::Ptr event_set)
Check whether given Event_set contains any active sockets (Event_set::m_can); if so,...
Properties of various container types.
size_type size() const
Returns number of elements stored.
An object of this class is a set that combines the lookup speed of an unordered_set<> and ordering an...
bool empty() const
Returns true if and only if container is empty.
void swap(Linked_hash_set &other)
Swaps the contents of this structure and other.
std::pair< Iterator, bool > insert(Value const &key)
Attempts to insert the given key into the set.
void clear()
Makes it so that size() == 0.
size_type size() const
Returns number of elements stored.
boost::shared_ptr< Peer_socket > Ptr
Short-hand for ref-counted pointer to mutable values of type Target_type::element_type (a-la T*).
#define FLOW_ERROR_SYS_ERROR_LOG_WARNING()
Logs a warning about the (often errno-based or from a library) error code in sys_err_code.
#define FLOW_ERROR_EXEC_AND_THROW_ON_ERROR(ARG_ret_type, ARG_function_name,...)
Narrow-use macro that implements the error code/exception semantics expected of most public-facing Fl...
#define FLOW_ERROR_EMIT_ERROR(ARG_val)
Sets *err_code to ARG_val and logs a warning about the error using FLOW_LOG_WARNING().
#define FLOW_LOG_INFO(ARG_stream_fragment)
Logs an INFO message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
#define FLOW_LOG_WARNING(ARG_stream_fragment)
Logs a WARNING message into flow::log::Logger *get_logger() with flow::log::Component get_log_compone...
#define FLOW_LOG_TRACE(ARG_stream_fragment)
Logs a TRACE message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
Synchronicity
Enumeration indicating the manner in which asio_exec_ctx_post(), and various boost....
void asio_exec_ctx_post(log::Logger *logger_ptr, Execution_context *exec_ctx, Synchronicity synchronicity, Task &&task)
An extension of boost.asio's post() and dispatch() free function templates, this free function templa...
bool exec_void_and_throw_on_error(const Func &func, Error_code *err_code, util::String_view context)
Equivalent of exec_and_throw_on_error() for operations with void return type.
@ S_EVENT_SET_RESULT_CHECK_WHEN_WAITING
Attempted to check wait results while still waiting.
@ S_EVENT_SET_IMMUTABLE_WHEN_WAITING
Attempted to write to an event set, while a wait operation was pending on that event set.
@ S_EVENT_SET_NO_EVENTS
Attempted to wait on an event set without specifying event on which to wait.
@ S_WAIT_INTERRUPTED
A blocking (sync_) or background-blocking (async_) operation was interrupted, such as by a signal.
@ S_EVENT_SET_DOUBLE_WAIT_OR_POLL
Attempted to wait on or poll an event set while already waiting on that event set.
@ S_EVENT_SET_CLOSED
Attempted operation on an event set, when that event set was closed.
@ S_NODE_NOT_RUNNING
Node not running.
Flow module containing the API and implementation of the Flow network protocol, a TCP-inspired stream...
bool key_exists(const Container &container, const typename Container::key_type &key)
Returns true if and only if the given key is present at least once in the given associative container...
void ostream_op_to_string(std::string *target_str, T const &... ostream_args)
Writes to the specified string, as if the given arguments were each passed, via << in sequence,...
boost::system::error_code Error_code
Short-hand for a boost.system error code (which basically encapsulates an integer/enum error code and...
Flow_log_component
The flow::log::Component payload enumeration comprising various log components used by Flow's own int...
Fine_clock::duration Fine_duration
A high-res time duration as computed from two Fine_time_pts.
#define FLOW_UTIL_WHERE_AM_I_STR()
Same as FLOW_UTIL_WHERE_AM_I() but evaluates to an std::string.