378 template<
typename Rep,
typename Period,
typename Handler>
380 const Handler& on_result,
381 const boost::chrono::duration<Rep, Period>& max_wait,
405 template<
typename Rep,
typename Period,
typename Handler>
407 const Handler& on_result,
408 const boost::chrono::duration<Rep, Period>& max_wait,
409 const boost::asio::const_buffer& serialized_metadata,
425 template<
typename Handler>
427 const Handler& on_result,
445 template<
typename Handler>
447 const Handler& on_result,
448 const boost::asio::const_buffer& serialized_metadata,
523 template<
typename Socket,
typename Base_socket,
typename Non_blocking_func_ret_type>
524 void async_op(
typename Socket::Ptr sock,
526 Non_blocking_func_ret_type would_block_ret_val,
552 const boost::asio::const_buffer& serialized_metadata,
569 template<
typename Handler>
580template<
typename Socket,
typename Base_socket,
typename Non_blocking_func_ret_type>
583 Non_blocking_func_ret_type would_block_ret_val,
588 using boost::shared_ptr;
589 using boost::chrono::milliseconds;
590 using boost::chrono::round;
591 using boost::asio::bind_executor;
628 on_result(err_code, would_block_ret_val);
634 FLOW_LOG_TRACE(
"Begin async op (identified by Event_set [" << event_set <<
"]) of type [" << ev_type <<
"] on "
635 "object [" << sock <<
"].");
640 if (!(event_set->add_wanted_socket<Base_socket>(Base_socket::ptr_cast(sock), ev_type, &err_code)))
642 on_result(err_code, would_block_ret_val);
650 const bool timeout_given = wait_until !=
Fine_time_pt();
685 using Timeout_state_ptr = shared_ptr<Timeout_state>;
686 Timeout_state_ptr timeout_state;
690 if (wait_until <= Fine_clock::now())
698 FLOW_LOG_TRACE(
"Timeout timer begin for async op [" << event_set <<
"] in "
699 "period [" << round<milliseconds>(wait_until - Fine_clock::now()) <<
"].");
704 timeout_state.reset(
new Timeout_state(task_engine));
707 timeout_state->sched_task
711 bind_executor(timeout_state->m_make_serial,
712 [
this, timeout_state, on_result, would_block_ret_val, event_set]
717 FLOW_LOG_TRACE(
"[User event loop] "
718 "Timeout fired for async op [" << event_set <<
"]; clean up and report to user.");
720 Error_code dummy_prevents_throw;
721 event_set->async_wait_finish(&dummy_prevents_throw);
722 event_set->close(&dummy_prevents_throw);
724 FLOW_LOG_TRACE(
"[User event loop] User handler execution begins for async op [" << event_set <<
"].");
725 on_result(error::Code::S_WAIT_USER_TIMEOUT, would_block_ret_val);
727 FLOW_LOG_TRACE(
"[User event loop] User handler execution ends for async op [" << event_set <<
"].");
736 auto on_async_wait_user_loop
737 = [
this, sock, timeout_state, would_block_ret_val, ev_type, wait_until, event_set,
738 non_blocking_func = std::move(non_blocking_func),
739 on_result = std::move(on_result)]
749 event_set->close(&dummy_prevents_throw);
759 "Events-ready in async op [" << event_set <<
"], but timeout already expired recently. "
760 "Interesting timing coincidence.");
771 const auto err_code = &err_code_val;
775 "Events-ready in async op [" << event_set <<
"]; user handler execution begins.");
776 on_result(err_code_val, would_block_ret_val);
780 assert(!interrupted);
787 Non_blocking_func_ret_type op_result;
789 const bool reactor_pattern = non_blocking_func.empty();
794 assert(!op_err_code);
797 "Events-ready in async op [" << event_set <<
"]; reactor pattern mode on; "
798 "user handler execution begins.");
799 on_result(op_err_code, would_block_ret_val);
803 assert(!reactor_pattern);
806 "Events-ready in async op [" << event_set <<
"]; reactor pattern mode off; "
807 "executing non-blocking operation now.");
808 op_result = non_blocking_func(&op_err_code);
815 "Error observed in instant op; code [" << op_err_code <<
'/' << op_err_code.message() <<
"]; "
816 "in async op [" << event_set <<
"]; "
817 "user handler execution begins.");
818 on_result(op_err_code, would_block_ret_val);
822 assert(!op_err_code);
825 if (op_result == would_block_ret_val)
828 "Instant op yielded would-block despite events-ready "
829 "in async op [" << event_set <<
"]; are there concurrent competing operations? "
830 "Trying async op again.");
831 assert(!reactor_pattern);
838 <Socket, Base_socket, Non_blocking_func_ret_type>
839 (sock, std::move(non_blocking_func),
840 would_block_ret_val, ev_type, wait_until, std::move(on_result));
847 "Instant op yielded positive result [" << op_result <<
"] "
848 "in async op [" << event_set <<
"]; "
849 "user handler execution begins.");
851 assert(op_result != would_block_ret_val);
852 assert(!op_err_code);
853 on_result(op_err_code, op_result);
859 FLOW_LOG_TRACE(
"[User event loop] User handler execution ends for async op [" << event_set <<
"].");
871 event_set->async_wait([sock, timeout_state,
872 on_async_wait_user_loop = std::move(on_async_wait_user_loop)]
873 (
bool interrupted)
mutable
880 post(timeout_state->m_make_serial, [interrupted,
881 on_async_wait_user_loop = std::move(on_async_wait_user_loop)]
884 on_async_wait_user_loop(interrupted);
901 event_set->async_wait([sock, on_async_wait_user_loop = std::move(on_async_wait_user_loop)]
902 (
bool interrupted)
mutable
905 post(*sock->async_task_engine(), [interrupted,
906 on_async_wait_user_loop = std::move(on_async_wait_user_loop)]
909 on_async_wait_user_loop(interrupted);
915template<
typename Rep,
typename Period,
typename Handler>
917 const Handler& on_result,
918 const boost::chrono::duration<Rep, Period>& max_wait,
921 using boost::asio::buffer;
929template<
typename Handler>
931 const Handler& on_result,
934 using boost::asio::buffer;
942template<
typename Rep,
typename Period,
typename Handler>
944 const Handler& on_result,
945 const boost::chrono::duration<Rep, Period>& max_wait,
946 const boost::asio::const_buffer& serialized_metadata,
954template<
typename Handler>
956 const Handler& on_result,
957 const boost::asio::const_buffer& serialized_metadata,
965template<
typename Handler>
968 using boost::asio::post;
969 using boost::asio::bind_executor;
970 using boost::asio::get_associated_executor;
975 return [
this, on_result = std::move(on_result)]
979 const auto executor = get_associated_executor(on_result);
981 bind_executor(executor,
982 [err_code, new_sock, on_result = std::move(on_result)]
984 on_result(err_code, new_sock);
Logger * get_logger() const
Returns the stored Logger pointer, particularly as many FLOW_LOG_*() macros expect.
Interface that the user should implement, passing the implementing Logger into logging classes (Flow'...
Event_type
Type of event or condition of interest supported by class Event_set.
Objects of this class can be fed to Node to make it internally simulate network conditions like loss,...
An object of this class is a single Flow-protocol networking node, in the sense that: (1) it has a di...
static const uint8_t S_DEFAULT_CONN_METADATA
Type and value to supply as user-supplied metadata in SYN, if user chooses to use [[a]sync_]connect()...
Event_set::Ptr event_set_create(Error_code *err_code=0)
Creates a new Event_set in Event_set::State::S_INACTIVE state with no sockets/events stored; returns ...
A peer (non-server) socket operating over the Flow network protocol, with optional stream-of-bytes an...
A server socket able to listen on a single Flow port for incoming connections and return peer sockets...
A subclass of net_flow::Node that adds the ability to easily and directly use net_flow sockets in gen...
void async_op(typename Socket::Ptr sock, Function< Non_blocking_func_ret_type(Error_code *)> &&non_blocking_func, Non_blocking_func_ret_type would_block_ret_val, Event_set::Event_type ev_type, const Fine_time_pt &wait_until, Function< void(const Error_code &, Non_blocking_func_ret_type)> &&on_result)
Implementation of core asynchronous transfer methods, namely asio::Peer_socket::async_send(),...
void set_async_task_engine(util::Task_engine *target_async_task_engine)
Overwrites the value to be returned by next async_task_engine().
net_flow::Peer_socket * sock_create(const Peer_socket_options &opts) override
Implements superclass API.
util::Task_engine * m_target_task_engine
See async_task_engine().
net_flow::Server_socket * serv_create(const Peer_socket_options *child_sock_opts) override
Implements superclass API.
util::Task_engine * async_task_engine()
Pointer (possibly null) for the flow::util::Task_engine used by any coming async I/O calls and inheri...
Node(log::Logger *logger, util::Task_engine *target_async_task_engine, const util::Udp_endpoint &low_lvl_endpoint, Net_env_simulator *net_env_sim=0, Error_code *err_code=0, const Node_options &opts=Node_options())
Constructs Node.
void async_connect(const Remote_endpoint &to, const Handler &on_result, const boost::chrono::duration< Rep, Period > &max_wait, const Peer_socket_options *opts=0)
The boost.asio asynchronous version of sync_connect(), performing any necessary wait and connection i...
Handler_func handler_func(Handler &&on_result)
Returns a functor that essentially performs post() on_result onto *async_task_engine() in a way suita...
void async_connect_impl(const Remote_endpoint &to, const Fine_duration &max_wait, const boost::asio::const_buffer &serialized_metadata, const Peer_socket_options *opts, Handler_func &&on_result)
Implementation core of async_connect*() that gets rid of templated or missing arguments thereof.
void async_connect_with_metadata(const Remote_endpoint &to, const Handler &on_result, const boost::chrono::duration< Rep, Period > &max_wait, const boost::asio::const_buffer &serialized_metadata, const Peer_socket_options *opts=0)
A combination of async_connect() and connect_with_metadata() (asynchronously blocking connect,...
A net_flow::Peer_socket that adds integration with boost.asio.
boost::shared_ptr< Peer_socket > Ptr
Short-hand for shared_ptr to Peer_socket.
A net_flow::Server_socket that adds integration with boost.asio.
boost::shared_ptr< Event_set > Ptr
Short-hand for ref-counted pointer to mutable values of type Target_type::element_type (a-la T*).
#define FLOW_ERROR_EMIT_ERROR(ARG_val)
Sets *err_code to ARG_val and logs a warning about the error using FLOW_LOG_WARNING().
#define FLOW_LOG_INFO(ARG_stream_fragment)
Logs an INFO message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
#define FLOW_LOG_TRACE(ARG_stream_fragment)
Logs a TRACE message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
Contains classes that add boost.asio integration to the main Flow-protocol classes such as net_flow::...
@ S_WAIT_USER_TIMEOUT
A blocking (sync_) or background-blocking (async_) operation timed out versus user-supplied time limi...
@ S_WAIT_INTERRUPTED
A blocking (sync_) or background-blocking (async_) operation was interrupted, such as by a signal.
boost::shared_ptr< Scheduled_task_handle_state > Scheduled_task_handle
Black-box type that represents a handle to a scheduled task as scheduled by schedule_task_at() or sch...
Task_engine::strand Strand
Short-hand for boost.asio strand, an ancillary class that works with Task_engine for advanced task sc...
Scheduled_task_handle schedule_task_at(log::Logger *logger_ptr, const Fine_time_pt &at, bool single_threaded, Task_engine *task_engine, Scheduled_task_handler &&task_body_moved)
Identical to schedule_task_from_now() except the time is specified in absolute terms.
bool scheduled_task_cancel(log::Logger *logger_ptr, Scheduled_task_handle task)
Attempts to prevent the execution of a previously scheduled (by schedule_task_from_now() or similar) ...
boost::asio::io_service Task_engine
Short-hand for boost.asio event service, the central class of boost.asio.
boost::asio::ip::udp::endpoint Udp_endpoint
Short-hand for the UDP endpoint (IP/port) type.
boost::system::error_code Error_code
Short-hand for a boost.system error code (which basically encapsulates an integer/enum error code and...
Fine_clock::duration Fine_duration
A high-res time duration as computed from two Fine_time_pts.
Fine_clock::time_point Fine_time_pt
A high-res time point as returned by Fine_clock::now() and suitable for precise time math in general.
A set of low-level options affecting a single Flow Node, including Peer_socket objects and other obje...
A set of low-level options affecting a single Peer_socket.
Represents the remote endpoint of a Flow-protocol connection; identifies the UDP endpoint of the remo...