30 net_flow::
Peer_socket(logger_ptr, task_engine, opts),
31 m_target_task_engine(0)
34 FLOW_LOG_TRACE(
"boost.asio-integrated Peer_socket [" <<
static_cast<void*
>(
this) <<
"] created; no Task_engine.");
39 FLOW_LOG_TRACE(
"boost.asio-integrated Peer_socket [" <<
this <<
"] destroyed.");
56 FLOW_LOG_INFO(
"Object [" <<
this <<
"] has been assigned an Task_engine at [" << target_async_task_engine <<
"]; "
72 assert(!on_result.empty());
78 const bool reactor_pattern = !target;
80 const auto sock =
cast(shared_from_this());
88 non_blocking_func = [sock, target](
Error_code* err_code) ->
size_t {
return sock->receive(*target, err_code); };
96 std::move(non_blocking_func),
98 std::move(on_result));
113 assert(!on_result.empty());
115 const bool reactor_pattern = !source;
116 const auto sock =
cast(shared_from_this());
119 if (!reactor_pattern)
121 non_blocking_func = [sock, source](
Error_code* err_code) ->
size_t {
return sock->send(*source, err_code); };
127 std::move(non_blocking_func),
129 std::move(on_result));
136 const auto owner_node =
static_cast<Node*
>(
node());
143 FLOW_LOG_WARNING(
"Cannot perform async op on object [" <<
this <<
"]: it is already closed for "
144 "reason [" << err_code <<
'/' << err_code.message() <<
"].");
145 on_result(err_code, 0);
154 using boost::static_pointer_cast;
155 return static_pointer_cast<Peer_socket>(sock);
163 const auto sock =
static_cast<Peer_socket*
>(sock_create_forward_plus_ctor_args<Peer_socket>(opts));
172 const boost::asio::const_buffer& serialized_metadata,
176 using boost::asio::null_buffers;
186 assert(target_task_engine);
205 const auto on_writable = [
this, on_result = std::move(on_result), sock]
206 (
const Error_code& wait_err_code, [[maybe_unused]]
size_t n_sent0)
208 assert(n_sent0 == 0);
209 FLOW_LOG_TRACE(
"Async connect op for new socket [" << sock <<
"] detected writable status with "
210 "result code [" << wait_err_code <<
'/' << wait_err_code.message() <<
"].");
216 sock->close_abruptly(&dummy_prevents_throw);
221 else if (wait_err_code)
233 assert(!wait_err_code);
238 FLOW_LOG_TRACE(
"Finished executing user handler for async connect of [" << sock <<
"].");
243 assert(sock->async_task_engine() == target_task_engine);
246 sock->async_send(null_buffers(), max_wait, on_writable);
255 ? (os <<
"Asio_flow_socket "
258 <<
"] @" <<
static_cast<const void*
>(sock))
259 : (os <<
"Asio_flow_socket@null");
Interface that the user should implement, passing the implementing Logger into logging classes (Flow'...
@ S_PEER_SOCKET_WRITABLE
Event type specifying the condition of interest wherein a target Peer_socket sock is such that callin...
@ S_PEER_SOCKET_READABLE
Event type specifying the condition of interest wherein a target Peer_socket sock is such that callin...
Peer_socket::Ptr connect_with_metadata(const Remote_endpoint &to, const boost::asio::const_buffer &serialized_metadata, Error_code *err_code=0, const Peer_socket_options *opts=0)
Same as connect() but sends, as part of the connection handshake, the user-supplied metadata,...
A peer (non-server) socket operating over the Flow network protocol, with optional stream-of-bytes an...
@ S_CLOSED
Neither future reads nor writes are possible, AND Node has disowned the Peer_socket.
Error_code disconnect_cause() const
The error code that perviously caused state() to become State::S_CLOSED, or success code if state is ...
Node * node() const
Node that produced this Peer_socket.
A subclass of net_flow::Node that adds the ability to easily and directly use net_flow sockets in gen...
net_flow::Peer_socket * sock_create(const Peer_socket_options &opts) override
Implements superclass API.
util::Task_engine * m_target_task_engine
See async_task_engine().
util::Task_engine * async_task_engine()
Pointer (possibly null) for the flow::util::Task_engine used by any coming async I/O calls and inheri...
void async_connect_impl(const Remote_endpoint &to, const Fine_duration &max_wait, const boost::asio::const_buffer &serialized_metadata, const Peer_socket_options *opts, Handler_func &&on_result)
Implementation core of async_connect*() that gets rid of templated or missing arguments thereof.
A net_flow::Peer_socket that adds integration with boost.asio.
~Peer_socket() override
Boring virtual destructor as in superclass. See notes there.
void set_async_task_engine(util::Task_engine *target_async_task_engine)
Overwrites the value to be returned by next async_task_engine().
static Ptr cast(net_flow::Peer_socket::Ptr sock)
Convenience method that polymorphically casts from net_flow::Peer_socket::Ptr to subclass pointer net...
util::Task_engine * async_task_engine()
Pointer (possibly null) for the flow::util::Task_engine used by any coming async_*() I/O calls.
boost::shared_ptr< Peer_socket > Ptr
Short-hand for shared_ptr to Peer_socket.
boost::shared_ptr< Target_bufs > Target_bufs_ptr
Short-hand for a low-cost-copyable smart pointer of Target_bufs.
const util::Task_engine & async_task_engine_cref() const
Read-only version of async_task_engine().
util::Task_engine * m_target_task_engine
See async_task_engine().
void async_receive_impl(Target_bufs_ptr target, Handler_func &&on_result, const Fine_time_pt &wait_until)
De-templated implementation of all async_receive() methods.
Node * node_or_post_error(Handler_func &&on_result)
Helper that returns the net_flow::asio::Node that generated *this; unless *this is closed; in which c...
void async_send_impl(Source_bufs_ptr source, Handler_func &&on_result, const Fine_time_pt &wait_until)
De-templated implementation of all async_send() methods.
Peer_socket(log::Logger *logger_ptr, util::Task_engine *task_engine, const Peer_socket_options &opts)
Constructs object.
boost::shared_ptr< Source_bufs > Source_bufs_ptr
Short-hand for a low-cost-copyable smart pointer of Source_bufs.
boost::shared_ptr< Peer_socket > Ptr
Short-hand for ref-counted pointer to mutable values of type Target_type::element_type (a-la T*).
#define FLOW_LOG_INFO(ARG_stream_fragment)
Logs an INFO message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
#define FLOW_LOG_WARNING(ARG_stream_fragment)
Logs a WARNING message into flow::log::Logger *get_logger() with flow::log::Component get_log_compone...
#define FLOW_LOG_TRACE(ARG_stream_fragment)
Logs a TRACE message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
Contains classes that add boost.asio integration to the main Flow-protocol classes such as net_flow::...
std::ostream & operator<<(std::ostream &os, const Peer_socket *sock)
Prints string representation of given socket to given standard ostream and returns the latter.
@ S_WAIT_USER_TIMEOUT
A blocking (sync_) or background-blocking (async_) operation timed out versus user-supplied time limi...
boost::asio::io_service Task_engine
Short-hand for boost.asio event service, the central class of boost.asio.
boost::system::error_code Error_code
Short-hand for a boost.system error code (which basically encapsulates an integer/enum error code and...
Fine_clock::duration Fine_duration
A high-res time duration as computed from two Fine_time_pts.
Fine_clock::time_point Fine_time_pt
A high-res time point as returned by Fine_clock::now() and suitable for precise time math in general.
A set of low-level options affecting a single Peer_socket.
Represents the remote endpoint of a Flow-protocol connection; identifies the UDP endpoint of the remo...