378  template<
typename Rep, 
typename Period, 
typename Handler>
 
  380                     const Handler& on_result,
 
  381                     const boost::chrono::duration<Rep, Period>& max_wait,
 
  405  template<
typename Rep, 
typename Period, 
typename Handler>
 
  407                                   const Handler& on_result,
 
  408                                   const boost::chrono::duration<Rep, Period>& max_wait,
 
  409                                   const boost::asio::const_buffer& serialized_metadata,
 
  425  template<
typename Handler>
 
  427                     const Handler& on_result,
 
  445  template<
typename Handler>
 
  447                                   const Handler& on_result,
 
  448                                   const boost::asio::const_buffer& serialized_metadata,
 
  523  template<
typename Socket, 
typename Base_socket, 
typename Non_blocking_func_ret_type>
 
  524  void async_op(
typename Socket::Ptr sock,
 
  526                Non_blocking_func_ret_type would_block_ret_val,
 
  552                          const boost::asio::const_buffer& serialized_metadata,
 
  569  template<
typename Handler>
 
  580template<
typename Socket, 
typename Base_socket, 
typename Non_blocking_func_ret_type>
 
  583                    Non_blocking_func_ret_type would_block_ret_val,
 
  588  using boost::shared_ptr;
 
  589  using boost::chrono::milliseconds;
 
  590  using boost::chrono::round;
 
  591  using boost::asio::bind_executor;
 
  628    on_result(err_code, would_block_ret_val); 
 
  634  FLOW_LOG_TRACE(
"Begin async op (identified by Event_set [" << event_set << 
"]) of type [" << ev_type << 
"] on " 
  635                 "object [" << sock << 
"].");
 
  640  if (!(event_set->add_wanted_socket<Base_socket>(Base_socket::ptr_cast(sock), ev_type, &err_code)))
 
  642    on_result(err_code, would_block_ret_val); 
 
  650  const bool timeout_given = wait_until != 
Fine_time_pt();
 
  685  using Timeout_state_ptr = shared_ptr<Timeout_state>; 
 
  686  Timeout_state_ptr timeout_state;
 
  690    if (wait_until <= Fine_clock::now()) 
 
  698    FLOW_LOG_TRACE(
"Timeout timer begin for async op [" << event_set << 
"] in " 
  699                   "period [" << round<milliseconds>(wait_until - Fine_clock::now()) << 
"].");
 
  704    timeout_state.reset(
new Timeout_state(task_engine));
 
  707    timeout_state->sched_task
 
  711                               bind_executor(timeout_state->m_make_serial,
 
  712                                             [
this, timeout_state, on_result, would_block_ret_val, event_set]
 
  717      FLOW_LOG_TRACE(
"[User event loop] " 
  718                     "Timeout fired for async op [" << event_set << 
"]; clean up and report to user.");
 
  720      Error_code dummy_prevents_throw;
 
  721      event_set->async_wait_finish(&dummy_prevents_throw);
 
  722      event_set->close(&dummy_prevents_throw);
 
  724      FLOW_LOG_TRACE(
"[User event loop] User handler execution begins for async op [" << event_set << 
"].");
 
  725      on_result(error::Code::S_WAIT_USER_TIMEOUT, would_block_ret_val); 
 
  727      FLOW_LOG_TRACE(
"[User event loop] User handler execution ends for async op [" << event_set << 
"].");
 
  736  auto on_async_wait_user_loop
 
  737    = [
this, sock, timeout_state, would_block_ret_val, ev_type, wait_until, event_set,
 
  738       non_blocking_func = std::move(non_blocking_func),
 
  739       on_result = std::move(on_result)]
 
  749    event_set->close(&dummy_prevents_throw);
 
  759                    "Events-ready in async op [" << event_set << 
"], but timeout already expired recently.  " 
  760                    "Interesting timing coincidence.");
 
  771      const auto err_code = &err_code_val;
 
  775                     "Events-ready in async op [" << event_set << 
"]; user handler execution begins.");
 
  776      on_result(err_code_val, would_block_ret_val); 
 
  780      assert(!interrupted);
 
  787      Non_blocking_func_ret_type op_result;
 
  789      const bool reactor_pattern = non_blocking_func.empty(); 
 
  794        assert(!op_err_code);
 
  797                       "Events-ready in async op [" << event_set << 
"]; reactor pattern mode on; " 
  798                       "user handler execution begins.");
 
  799        on_result(op_err_code, would_block_ret_val); 
 
  803        assert(!reactor_pattern);
 
  806                       "Events-ready in async op [" << event_set << 
"]; reactor pattern mode off; " 
  807                       "executing non-blocking operation now.");
 
  808        op_result = non_blocking_func(&op_err_code); 
 
  815                         "Error observed in instant op; code [" << op_err_code << 
'/' << op_err_code.message() << 
"]; " 
  816                         "in async op [" << event_set << 
"]; " 
  817                         "user handler execution begins.");
 
  818          on_result(op_err_code, would_block_ret_val); 
 
  822          assert(!op_err_code);
 
  825          if (op_result == would_block_ret_val)
 
  828                           "Instant op yielded would-block despite events-ready " 
  829                           "in async op [" << event_set << 
"]; are there concurrent competing operations?  " 
  830                           "Trying async op again.");
 
  831            assert(!reactor_pattern); 
 
  838              <Socket, Base_socket, Non_blocking_func_ret_type>
 
  839              (sock, std::move(non_blocking_func),
 
  840               would_block_ret_val, ev_type, wait_until, std::move(on_result));
 
  847                         "Instant op yielded positive result [" << op_result << 
"] " 
  848                         "in async op [" << event_set << 
"]; " 
  849                         "user handler execution begins.");
 
  851          assert(op_result != would_block_ret_val);
 
  852          assert(!op_err_code);
 
  853          on_result(op_err_code, op_result); 
 
  859    FLOW_LOG_TRACE(
"[User event loop] User handler execution ends for async op [" << event_set << 
"].");
 
  871    event_set->async_wait([sock, timeout_state,
 
  872                           on_async_wait_user_loop = std::move(on_async_wait_user_loop)]
 
  873                          (
bool interrupted) 
mutable 
  880      post(timeout_state->m_make_serial, [interrupted,
 
  881                                          on_async_wait_user_loop = std::move(on_async_wait_user_loop)]
 
  884        on_async_wait_user_loop(interrupted); 
 
  901    event_set->async_wait([sock, on_async_wait_user_loop = std::move(on_async_wait_user_loop)]
 
  902                            (
bool interrupted) 
mutable 
  905      post(*sock->async_task_engine(), [interrupted,
 
  906                                        on_async_wait_user_loop = std::move(on_async_wait_user_loop)]
 
  909        on_async_wait_user_loop(interrupted);
 
  915template<
typename Rep, 
typename Period, 
typename Handler>
 
  917                         const Handler& on_result,
 
  918                         const boost::chrono::duration<Rep, Period>& max_wait,
 
  921  using boost::asio::buffer;
 
  929template<
typename Handler>
 
  931                         const Handler& on_result,
 
  934  using boost::asio::buffer;
 
  942template<
typename Rep, 
typename Period, 
typename Handler>
 
  944                                       const Handler& on_result,
 
  945                                       const boost::chrono::duration<Rep, Period>& max_wait,
 
  946                                       const boost::asio::const_buffer& serialized_metadata,
 
  954template<
typename Handler>
 
  956                                       const Handler& on_result,
 
  957                                       const boost::asio::const_buffer& serialized_metadata,
 
  965template<
typename Handler>
 
  968  using boost::asio::post;
 
  969  using boost::asio::bind_executor;
 
  970  using boost::asio::get_associated_executor;
 
  975  return [
this, on_result = std::move(on_result)]
 
  979    const auto executor = get_associated_executor(on_result);
 
  981         bind_executor(executor,
 
  982                       [err_code, new_sock, on_result = std::move(on_result)]
 
  984      on_result(err_code, new_sock);
 
Logger * get_logger() const
Returns the stored Logger pointer, particularly as many FLOW_LOG_*() macros expect.
Interface that the user should implement, passing the implementing Logger into logging classes (Flow'...
Event_type
Type of event or condition of interest supported by class Event_set.
Objects of this class can be fed to Node to make it internally simulate network conditions like loss,...
An object of this class is a single Flow-protocol networking node, in the sense that: (1) it has a di...
static const uint8_t S_DEFAULT_CONN_METADATA
Type and value to supply as user-supplied metadata in SYN, if user chooses to use [[a]sync_]connect()...
Event_set::Ptr event_set_create(Error_code *err_code=0)
Creates a new Event_set in Event_set::State::S_INACTIVE state with no sockets/events stored; returns ...
A peer (non-server) socket operating over the Flow network protocol, with optional stream-of-bytes an...
A server socket able to listen on a single Flow port for incoming connections and return peer sockets...
A subclass of net_flow::Node that adds the ability to easily and directly use net_flow sockets in gen...
void async_op(typename Socket::Ptr sock, Function< Non_blocking_func_ret_type(Error_code *)> &&non_blocking_func, Non_blocking_func_ret_type would_block_ret_val, Event_set::Event_type ev_type, const Fine_time_pt &wait_until, Function< void(const Error_code &, Non_blocking_func_ret_type)> &&on_result)
Implementation of core asynchronous transfer methods, namely asio::Peer_socket::async_send(),...
void set_async_task_engine(util::Task_engine *target_async_task_engine)
Overwrites the value to be returned by next async_task_engine().
net_flow::Peer_socket * sock_create(const Peer_socket_options &opts) override
Implements superclass API.
util::Task_engine * m_target_task_engine
See async_task_engine().
net_flow::Server_socket * serv_create(const Peer_socket_options *child_sock_opts) override
Implements superclass API.
util::Task_engine * async_task_engine()
Pointer (possibly null) for the flow::util::Task_engine used by any coming async I/O calls and inheri...
Node(log::Logger *logger, util::Task_engine *target_async_task_engine, const util::Udp_endpoint &low_lvl_endpoint, Net_env_simulator *net_env_sim=0, Error_code *err_code=0, const Node_options &opts=Node_options())
Constructs Node.
void async_connect(const Remote_endpoint &to, const Handler &on_result, const boost::chrono::duration< Rep, Period > &max_wait, const Peer_socket_options *opts=0)
The boost.asio asynchronous version of sync_connect(), performing any necessary wait and connection i...
Handler_func handler_func(Handler &&on_result)
Returns a functor that essentially performs post() on_result onto *async_task_engine() in a way suita...
void async_connect_impl(const Remote_endpoint &to, const Fine_duration &max_wait, const boost::asio::const_buffer &serialized_metadata, const Peer_socket_options *opts, Handler_func &&on_result)
Implementation core of async_connect*() that gets rid of templated or missing arguments thereof.
void async_connect_with_metadata(const Remote_endpoint &to, const Handler &on_result, const boost::chrono::duration< Rep, Period > &max_wait, const boost::asio::const_buffer &serialized_metadata, const Peer_socket_options *opts=0)
A combination of async_connect() and connect_with_metadata() (asynchronously blocking connect,...
A net_flow::Peer_socket that adds integration with boost.asio.
boost::shared_ptr< Peer_socket > Ptr
Short-hand for shared_ptr to Peer_socket.
A net_flow::Server_socket that adds integration with boost.asio.
boost::shared_ptr< Event_set > Ptr
Short-hand for ref-counted pointer to mutable values of type Target_type::element_type (a-la T*).
#define FLOW_ERROR_EMIT_ERROR(ARG_val)
Sets *err_code to ARG_val and logs a warning about the error using FLOW_LOG_WARNING().
#define FLOW_LOG_INFO(ARG_stream_fragment)
Logs an INFO message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
#define FLOW_LOG_TRACE(ARG_stream_fragment)
Logs a TRACE message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
Contains classes that add boost.asio integration to the main Flow-protocol classes such as net_flow::...
@ S_WAIT_USER_TIMEOUT
A blocking (sync_) or background-blocking (async_) operation timed out versus user-supplied time limi...
@ S_WAIT_INTERRUPTED
A blocking (sync_) or background-blocking (async_) operation was interrupted, such as by a signal.
boost::shared_ptr< Scheduled_task_handle_state > Scheduled_task_handle
Black-box type that represents a handle to a scheduled task as scheduled by schedule_task_at() or sch...
Task_engine::strand Strand
Short-hand for boost.asio strand, an ancillary class that works with Task_engine for advanced task sc...
Scheduled_task_handle schedule_task_at(log::Logger *logger_ptr, const Fine_time_pt &at, bool single_threaded, Task_engine *task_engine, Scheduled_task_handler &&task_body_moved)
Identical to schedule_task_from_now() except the time is specified in absolute terms.
bool scheduled_task_cancel(log::Logger *logger_ptr, Scheduled_task_handle task)
Attempts to prevent the execution of a previously scheduled (by schedule_task_from_now() or similar) ...
boost::asio::io_service Task_engine
Short-hand for boost.asio event service, the central class of boost.asio.
boost::asio::ip::udp::endpoint Udp_endpoint
Short-hand for the UDP endpoint (IP/port) type.
boost::system::error_code Error_code
Short-hand for a boost.system error code (which basically encapsulates an integer/enum error code and...
Fine_clock::duration Fine_duration
A high-res time duration as computed from two Fine_time_pts.
Fine_clock::time_point Fine_time_pt
A high-res time point as returned by Fine_clock::now() and suitable for precise time math in general.
A set of low-level options affecting a single Flow Node, including Peer_socket objects and other obje...
A set of low-level options affecting a single Peer_socket.
Represents the remote endpoint of a Flow-protocol connection; identifies the UDP endpoint of the remo...