23#include <boost/algorithm/cxx11/all_of.hpp>
24#include <boost/algorithm/cxx11/one_of.hpp>
25#include <boost/functional.hpp>
44 m_state(
State::S_CLOSED),
46 m_want(empty_ev_type_to_socks_map()),
47 m_can(empty_ev_type_to_socks_map()),
48 m_baseline_check_pending(false)
74 namespace bind_ns = util::bind_ns;
78 using boost::algorithm::all_of;
248 FLOW_LOG_TRACE(
"Event_set [" <<
this <<
"] wait finish requested; moving from "
258 FLOW_LOG_TRACE(
"Event_set [" <<
this <<
"] wait finish requested, but state was already "
323 const bool baseline_check_ok =
326 assert(baseline_check_ok);
338 using boost::promise;
339 using boost::unique_future;
340 using boost::future_status;
341 using boost::chrono::milliseconds;
342 using boost::chrono::round;
343 using boost::algorithm::one_of;
345 assert(max_wait.count() > 0);
376 promise<bool> intr_promise;
377 unique_future<bool> intr_future = intr_promise.get_future();
383 if (!
async_wait([&intr_promise](
bool interrupted) { intr_promise.set_value(interrupted); },
391 bool timed_out =
false;
394 if (max_wait == Fine_duration::max())
403 if (intr_future.wait_for(max_wait) != future_status::ready)
418 FLOW_LOG_INFO(
"Synchronous wait on Event_set [" <<
this <<
"] timed out; "
419 "timeout = [" << round<milliseconds>(max_wait) <<
"].");
439 if (intr_future.get())
441 FLOW_LOG_INFO(
"Synchronous wait on Event_set [" <<
this <<
"] was interrupted.");
454 assert((!err_code) || (!*err_code));
460 using boost::chrono::microseconds;
461 return sync_wait(microseconds(microseconds::max()), err_code);
496 target_set, ev_type, _1);
511 FLOW_LOG_TRACE(
"Wanted set for event type [" << ev_type <<
"] swapped in Event_set [" <<
this <<
"]; pre-swap sizes: "
512 "Event_set [" << want_set.
size() <<
"]; user [" << target_set->
size() <<
"].");
522 target_set->
swap(want_set);
539 FLOW_LOG_TRACE(
"Wanted set for event type [" << ev_type <<
"] cleared in Event_set [" <<
this <<
"]; "
540 "size [" << want_set.
size() <<
"].");
558 using boost::algorithm::one_of;
581 using boost::algorithm::one_of;
607 FLOW_LOG_TRACE(
"Wait result set checked for activity in Event_set [" <<
this <<
"]; "
627 FLOW_LOG_TRACE(
"Wait result set for event type [" << ev_type <<
"] emitted in Event_set [" <<
this <<
"]; "
628 "size [" << can_set.
size() <<
"].");
640 can_set.
swap(*target_set);
661 FLOW_LOG_TRACE(
"Wait result set for event type [" << ev_type <<
"] cleared in Event_set [" <<
this <<
"]; "
662 "size [" << can_set.
size() <<
"].");
719 FLOW_LOG_TRACE(
"Clearing sets in Event_set [" <<
this <<
"]; pre-clear set sizes: "
749 assert(ev_type_to_socks_map);
750 for (
auto& ev_type_and_socks : *ev_type_to_socks_map)
752 ev_type_and_socks.second.clear();
758 return ev_type_and_socks.second.empty();
766 size_t n_left = ev_type_to_socks_map.
size();
767 for (
const auto& ev_type_and_socks : ev_type_to_socks_map)
770 ev_type_and_socks.first,
": ", ev_type_and_socks.second.size(),
771 (((--n_left) == 0) ?
"" :
", "));
779 using boost::any_cast;
787 const auto& type_id = sock_as_any.type();
798 assert(sock_as_any.empty());
811 using boost::any_cast;
822 const auto& type_id = sock_as_any.type();
825 return hash<Peer_socket::Ptr>()(any_cast<Peer_socket::Ptr>(sock_as_any));
830 return hash<Server_socket::Ptr>()(any_cast<Server_socket::Ptr>(sock_as_any));
834 assert(sock_as_any.empty());
839 const boost::any& sock_as_any2)
const
841 using boost::any_cast;
852 if (sock_as_any1.type() != sock_as_any2.type())
858 const auto& type_id = sock_as_any1.type();
862 return any_cast<Peer_socket::Ptr>(sock_as_any1) == any_cast<Peer_socket::Ptr>(sock_as_any2);
867 return any_cast<Server_socket::Ptr>(sock_as_any1) == any_cast<Server_socket::Ptr>(sock_as_any2);
871 assert(sock_as_any1.empty());
872 assert(sock_as_any2.empty());
904 event_set->m_node =
this;
905 event_set->m_baseline_check_pending =
false;
909 m_event_sets.insert(event_set);
920 using boost::asio::post;
939 FLOW_LOG_TRACE(
"Event_set [" << event_set <<
"] wait requested; moving from "
944 event_set->m_on_event = on_event;
956 post(m_task_engine, [
this, event_set]() { event_set_check_baseline_assuming_state(event_set); });
978 event_set->m_baseline_check_pending =
true;
1004 FLOW_LOG_TRACE(
"Event_set [" << event_set <<
"] baseline check ran, but state is no "
1010 if (event_set_check_baseline(event_set))
1013 event_set_fire_if_got_events(event_set);
1020 using boost::any_cast;
1037 if (!event_set->m_baseline_check_pending)
1041 FLOW_LOG_TRACE(
"Event_set [" << event_set <<
"] baseline check ran, but skipping because same check already "
1047 FLOW_LOG_TRACE(
"Event_set [" << event_set <<
"] baseline check started.");
1048 event_set->m_baseline_check_pending =
false;
1054 "Expecting amortized constant time insertion sockets container.");
1063 const auto& does_event_hold = ev_type_and_is_active_mtd.second;
1067 assert(can_set.
empty());
1068 for (
const any& sock_as_any : want_set)
1070 if (does_event_hold(
this, sock_as_any))
1074 "Event_set [" << event_set <<
"].");
1076 can_set.
insert(sock_as_any);
1086 using boost::algorithm::all_of;
1105 FLOW_LOG_TRACE(
"Event_set [" << event_set <<
"] has ready events; firing and moving from "
1108 event_set->m_on_event(
false);
1111 event_set->m_on_event.clear();
1134 using boost::algorithm::all_of;
1145 "defer_delta_check = [" << defer_delta_check <<
"]; "
1148 if (defer_delta_check)
1203 if (!event_set_check_baseline(event_set))
1207 FLOW_LOG_TRACE(
"Event_set [" << event_set <<
"] delta check started.");
1216 for (
auto& ev_type_and_socks_from_node : m_sock_events)
1224 assert(can_set.
empty());
1239 "Expecting amortized constant time search sockets container.");
1241 "Expecting amortized constant time insertion sockets container.");
1243 const bool want_set_smaller = want_set.
size() < all_can_set.
size();
1247 for (
const any& sock_as_any : small_set)
1253 "Event_set [" << event_set <<
"].");
1255 can_set.
insert(sock_as_any);
1266 event_set_fire_if_got_events(event_set);
1277 using boost::adopt_lock;
1325 event_set_close_worker(event_set);
1365 FLOW_LOG_TRACE(
"Closing Event_set [" << event_set <<
"]: changing state "
1370 event_set->m_node = 0;
1382 event_set->m_on_event.clear();
1386 const bool erased = 1 ==
1388 m_event_sets.erase(event_set);
1394 using boost::asio::post;
1397 ([
this](
Error_code* actual_err_code) { interrupt_all_waits(actual_err_code); },
1416 post(m_task_engine, [
this]() { interrupt_all_waits_worker(); });
1425 FLOW_LOG_INFO(
"Executing request to interrupt all waiting Event_sets.");
1440 FLOW_LOG_INFO(
"Event_set [" << event_set <<
"] is being interrupted; firing and moving from "
1449 event_set->m_on_event(
true);
1450 event_set->m_on_event.clear();
1460 if (sys_err_code == boost::asio::error::operation_aborted)
1466 FLOW_LOG_INFO(
"Internal interrupt signal handler executed with signal number [" << sig_number <<
"].");
1472 FLOW_LOG_WARNING(
"Internal signal handler executed with unexpected error indicator. Strange! "
1473 "Ignoring and continuing other operation.");
1485 interrupt_all_waits_worker();
1489 m_signal_set.async_wait([
this](
const Error_code& sys_err_code,
int sig_num)
1491 interrupt_all_waits_internal_sig_handler(sys_err_code, sig_num);
1502#define STATE_TO_CASE_STATEMENT(ARG_state) \
1503 case Event_set::State::S_##ARG_state: \
1504 return os << #ARG_state
1513 STATE_TO_CASE_STATEMENT(INACTIVE);
1514 STATE_TO_CASE_STATEMENT(WAITING);
1515 STATE_TO_CASE_STATEMENT(CLOSED);
1518#undef STATE_TO_CASE_STATEMENT
1525#define TYPE_TO_CASE_STATEMENT(ARG_type) \
1526 case Event_set::Event_type::S_##ARG_type: \
1527 return os << #ARG_type
1537 TYPE_TO_CASE_STATEMENT(PEER_SOCKET_READABLE);
1538 TYPE_TO_CASE_STATEMENT(PEER_SOCKET_WRITABLE);
1539 TYPE_TO_CASE_STATEMENT(SERVER_SOCKET_ACCEPTABLE);
1542#undef TYPE_TO_CASE_STATEMENT
Logger * get_logger() const
Returns the stored Logger pointer, particularly as many FLOW_LOG_*() macros expect.
Interface that the user should implement, passing the implementing Logger into logging classes (Flow'...
bool operator()(const boost::any &sock_as_any1, const boost::any &sock_as_any2) const
Returns whether the two objects, which must be stored in Sockets objects, are equal by value.
size_t operator()(const boost::any &sock_as_any) const
Returns hash value of the given object which must be stored in a Sockets object.
Mutex m_mutex
Mutex protecting ALL data in this object.
static void clear_ev_type_to_socks_map(Ev_type_to_socks_map *ev_type_to_socks_map)
Helper that clears each Sockets set inside an Ev_type_to_socks_map.
bool sync_wait(Error_code *err_code=0)
Blocks indefinitely until one or more of the previously described events hold – or the wait is interr...
bool sync_wait_impl(const Fine_duration &max_wait, Error_code *err_code)
Same as the public sync_wait(max_wait) but uses a Fine_clock-based Fine_duration non-template type fo...
Event_type
Type of event or condition of interest supported by class Event_set.
@ S_PEER_SOCKET_WRITABLE
Event type specifying the condition of interest wherein a target Peer_socket sock is such that callin...
@ S_PEER_SOCKET_READABLE
Event type specifying the condition of interest wherein a target Peer_socket sock is such that callin...
@ S_SERVER_SOCKET_ACCEPTABLE
Event type specifying the condition of interest wherein a target Server_socket serv is such that call...
static std::string ev_type_to_socks_map_sizes_to_str(const Ev_type_to_socks_map &ev_type_to_socks_map)
Helper that returns a loggable string summarizing the sizes of the socket sets, by type,...
State
A state of an Event_set.
@ S_WAITING
Waiting state: valid Event_set that is currently waiting on previously described events.
@ S_CLOSED
Node has disowned the Peer_socket; all further operations will result in error.
@ S_INACTIVE
Default state; valid Event_set that is not currently waiting on events.
bool async_wait_finish(Error_code *err_code=0)
Moves object from State::S_WAITING to State::S_INACTIVE, and forgets any handler saved by async_wait(...
bool events_detected(Error_code *err_code=0) const
Returns true if and only if the last wait, if any, detected at least one event.
bool poll(Error_code *err_code=0)
Checks for all previously described events that currently hold, saves them for retrieval via emit_res...
bool async_wait(const Event_handler &on_event, Error_code *err_code=0)
Moves object to State::S_WAITING state, saves the given handler to be executed later (in a different,...
State m_state
See state(). Should be set before user gets access to *this. Must not be modified by non-W threads af...
util::Linked_hash_set< boost::any, Socket_as_any_hash, Socket_as_any_equals > Sockets
A set of sockets of one type, used to communicate sets of desired and resulting events in various Eve...
Event_handler m_on_event
During State::S_WAITING, stores the handler (a void function with 1 bool argument) that will be calle...
bool swap_wanted_sockets(Sockets *target_set, Event_type ev_type, Error_code *err_code)
Efficiently exchanges the current set of sockets we want to know are "ready" by the definiton of the ...
bool emit_result_sockets(Sockets *target_set, Event_type ev_type, Error_code *err_code=0)
Gets the sockets that satisfy the condition of the given Event_type detected during the last wait.
static Ev_type_to_socks_map empty_ev_type_to_socks_map()
Creates a maximally empty Ev_type_to_socks_map: it will have all possible Event_type as keys but only...
Event_set(log::Logger *logger_ptr)
Constructs object; initializes all values to well-defined but possibly meaningless values (0,...
util::Lock_guard< Mutex > Lock_guard
Short-hand for RAII lock guard of Mutex. Use instead of boost::lock_guard for release() at least.
State state() const
Current State of the Event_set.
bool clear_wanted_sockets(Event_type ev_type, Error_code *err_code=0)
Identical to swap_wanted_sockets(&sockets, ev_type, err_code), where originally sockets is empty and ...
bool ok_to_mod_socket_set(Error_code *err_code) const
Helper that ensures the state of *this is such that one may modify the m_can and m_want socket sets.
bool events_wanted(Error_code *err_code=0) const
Returns true if and only if at least one wanted event for at least one socket is registered (via add_...
void close(Error_code *err_code=0)
Clears all stored resources (any desired events, result events, and any handler saved by async_wait()...
static const boost::unordered_map< Event_type, Function< bool(const Node *, const boost::any &)> > S_EV_TYPE_TO_IS_ACTIVE_NODE_MTD
Mapping from each possible Event_type to the Node method that determines whether the condition define...
Node * node() const
Node that produced this Event_set.
bool clear(Error_code *err_code=0)
Forgets all sockets stored in this object in any fashion.
Ev_type_to_socks_map m_can
The sockets, categorized by Event_type of interest, that were found to be "ready" (as defined in the ...
~Event_set()
Boring destructor. Note that deletion is to be handled exclusively via shared_ptr,...
bool m_baseline_check_pending
While in State::S_WAITING, if this is true, an exhaustive check of all desired events is yet to be pe...
static std::string sock_as_any_to_str(const boost::any &sock_as_any)
Helper that returns a loggable string representing the socket stored in the given boost::any that sto...
std::ostream & operator<<(std::ostream &os, Event_set::State state)
Prints string representation of given Event_set state to given standard ostream and returns the latte...
static bool ev_type_to_socks_map_entry_is_empty(const Ev_type_to_socks_map::Value &ev_type_and_socks)
Functional helper that checks whether a given pair in an Ev_type_to_socks_map contains an empty set o...
Ev_type_to_socks_map m_want
The sockets, categorized by Event_type of interest, to check for "ready" status (as defined in the do...
Node * m_node
See node(). Should be set before user gets access to *this. Must not be modified by non-W threads aft...
bool clear_result_sockets(Event_type ev_type, Error_code *err_code=0)
Identical to emit_result_sockets(&sockets, ev_type, err_code), where originally sockets is empty and ...
util::Linked_hash_map< Event_type, Sockets > Ev_type_to_socks_map
Short-hand for type storing a set of socket sets – one per possible Event_type enum value.
An object of this class is a single Flow-protocol networking node, in the sense that: (1) it has a di...
bool sock_is_writable(const boost::any &sock_as_any) const
Returns true if and only if calling sock->send() with at least some arguments would return either non...
void interrupt_all_waits(Error_code *err_code=0)
Interrupts any blocking operation, a/k/a wait, and informs the invoker of that operation that the blo...
bool event_set_check_baseline(Event_set::Ptr event_set)
Checks each desired (Event_set::m_want) event in event_set; any that holds true is saved into event_s...
void event_set_close(Event_set::Ptr event_set, Error_code *err_code)
Implementation of Event_set::close() when Event_set::state() != Event_set::State::S_CLOSED for event_...
void event_set_check_baseline_assuming_state(Event_set::Ptr event_set)
Helper placed by event_set_async_wait() onto thread W to invoke event_set_check_baseline() but first ...
bool serv_is_acceptable(const boost::any &serv_as_any) const
Returns true if and only if calling serv->accept() with at least some arguments would return either n...
void event_set_close_worker(Event_set::Ptr event_set)
The guts of event_set_close_worker_check_state(): same thing, but assumes Event_set::state() == Event...
void event_set_all_check_delta(bool defer_delta_check)
For each WAITING Event_set within the Node: checks for any events that hold, and if any do hold,...
bool event_set_async_wait(Event_set::Ptr event_set, const Event_set::Event_handler &on_event, Error_code *err_code)
Implementation of Event_set::async_wait() when Event_set::state() == Event_set::State::S_INACTIVE.
bool sock_is_readable(const boost::any &sock_as_any) const
Returns true if and only if calling sock->receive() with at least some arguments would return either ...
Event_set::Ptr event_set_create(Error_code *err_code=0)
Creates a new Event_set in Event_set::State::S_INACTIVE state with no sockets/events stored; returns ...
void interrupt_all_waits_worker()
Thread W implementation of interrupt_all_waits().
void interrupt_all_waits_internal_sig_handler(const Error_code &sys_err_code, int sig_number)
signal_set handler, executed on SIGINT and SIGTERM, if user has enabled this feature: causes interrup...
void event_set_fire_if_got_events(Event_set::Ptr event_set)
Check whether given Event_set contains any active sockets (Event_set::m_can); if so,...
Properties of various container types.
size_type size() const
Returns number of elements stored.
An object of this class is a set that combines the lookup speed of an unordered_set<> and ordering an...
bool empty() const
Returns true if and only if container is empty.
void swap(Linked_hash_set &other)
Swaps the contents of this structure and other.
std::pair< Iterator, bool > insert(Value const &key)
Attempts to insert the given key into the set.
void clear()
Makes it so that size() == 0.
size_type size() const
Returns number of elements stored.
boost::shared_ptr< Peer_socket > Ptr
Short-hand for ref-counted pointer to mutable values of type Target_type::element_type (a-la T*).
#define FLOW_ERROR_SYS_ERROR_LOG_WARNING()
Logs a warning about the (often errno-based or from a library) error code in sys_err_code.
#define FLOW_ERROR_EXEC_AND_THROW_ON_ERROR(ARG_ret_type, ARG_method_name,...)
Narrow-use macro that implements the error code/exception semantics expected of most public-facing Fl...
#define FLOW_ERROR_EMIT_ERROR(ARG_val)
Sets *err_code to ARG_val and logs a warning about the error using FLOW_LOG_WARNING().
#define FLOW_LOG_INFO(ARG_stream_fragment)
Logs an INFO message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
#define FLOW_LOG_WARNING(ARG_stream_fragment)
Logs a WARNING message into flow::log::Logger *get_logger() with flow::log::Component get_log_compone...
#define FLOW_LOG_TRACE(ARG_stream_fragment)
Logs a TRACE message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
Synchronicity
Enumeration indicating the manner in which asio_exec_ctx_post(), and various boost....
void asio_exec_ctx_post(log::Logger *logger_ptr, Execution_context *exec_ctx, Synchronicity synchronicity, Task &&task)
An extension of boost.asio's post() and dispatch() free function templates, this free function templa...
bool exec_void_and_throw_on_error(const Func &func, Error_code *err_code, util::String_view context)
Equivalent of exec_and_throw_on_error() for operations with void return type.
@ S_EVENT_SET_RESULT_CHECK_WHEN_WAITING
Attempted to check wait results while still waiting.
@ S_EVENT_SET_IMMUTABLE_WHEN_WAITING
Attempted to write to an event set, while a wait operation was pending on that event set.
@ S_EVENT_SET_NO_EVENTS
Attempted to wait on an event set without specifying event on which to wait.
@ S_WAIT_INTERRUPTED
A blocking (sync_) or background-blocking (async_) operation was interrupted, such as by a signal.
@ S_EVENT_SET_DOUBLE_WAIT_OR_POLL
Attempted to wait on or poll an event set while already waiting on that event set.
@ S_EVENT_SET_CLOSED
Attempted operation on an event set, when that event set was closed.
@ S_NODE_NOT_RUNNING
Node not running.
Flow module containing the API and implementation of the Flow network protocol, a TCP-inspired stream...
bool key_exists(const Container &container, const typename Container::key_type &key)
Returns true if and only if the given key is present at least once in the given associative container...
void ostream_op_to_string(std::string *target_str, T const &... ostream_args)
Writes to the specified string, as if the given arguments were each passed, via << in sequence,...
boost::system::error_code Error_code
Short-hand for a boost.system error code (which basically encapsulates an integer/enum error code and...
Flow_log_component
The flow::log::Component payload enumeration comprising various log components used by Flow's own int...
Fine_clock::duration Fine_duration
A high-res time duration as computed from two Fine_time_pts.
#define FLOW_UTIL_WHERE_AM_I_STR()
Same as FLOW_UTIL_WHERE_AM_I() but evaluates to an std::string.