Flow 1.0.0
Flow project: Full implementation reference.
peer_socket.cpp
Go to the documentation of this file.
1/* Flow
2 * Copyright 2023 Akamai Technologies, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the
5 * "License"); you may not use this file except in
6 * compliance with the License. You may obtain a copy
7 * of the License at
8 *
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in
12 * writing, software distributed under the License is
13 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
14 * CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing
16 * permissions and limitations under the License. */
17
18/// @file
20
22{
23
24// Implementations.
25
26// Peer_socket implementations.
27
29 util::Task_engine* task_engine, const Peer_socket_options& opts) :
30 net_flow::Peer_socket(logger_ptr, task_engine, opts),
31 m_target_task_engine(0)
32{
33 // Only print pointer value for same reason as in super-constructor.
34 FLOW_LOG_TRACE("boost.asio-integrated Peer_socket [" << static_cast<void*>(this) << "] created; no Task_engine.");
35}
36
38{
39 FLOW_LOG_TRACE("boost.asio-integrated Peer_socket [" << this << "] destroyed.");
40}
41
43{
44 // Not thread-safe against set_async_task_engine(). Discussed in my doc header.
46}
47
49{
50 return *m_target_task_engine; // Same comment as async_task_engine().
51}
52
54{
55 // Not thread-safe against m_target_task_engine access. Discussed in my doc header.
56 FLOW_LOG_INFO("Object [" << this << "] has been assigned an Task_engine at [" << target_async_task_engine << "]; "
57 "currently [" << static_cast<void*>(m_target_task_engine) << "].");
58 m_target_task_engine = target_async_task_engine;
59}
60
62 const Fine_time_pt& wait_until)
63{
64 // This is the actual async_receive() body, after the args have been converted into a convenient form.
65
66 auto owner_node = node_or_post_error(std::move(on_result));
67 if (!owner_node)
68 {
69 return; // on_result() error invocation posted. (BTW on_result() may be blown away at this point.) Done.
70 }
71 // else
72 assert(!on_result.empty()); // Feeble sanity check: node_or_post_error() can only touch on_result on error.
73
74 /* The wrapper function guarantees `target` is null if and only if they desired this mode,
75 * wherein any actual I/O op is up to the handler on_result, and we just call it when ready.
76 * In particular no need to waste time on wrapping the non-blocking I/O op call, since we won't be
77 * doing it ourselves. */
78 const bool reactor_pattern = !target;
79
80 const auto sock = cast(shared_from_this());
81
82 /* Supply the closure that'll execute actual non-blocking op when appropriate, returning the result and
83 * setting the passed-in Error_code. */
84 Function<size_t (Error_code*)> non_blocking_func;
85 if (!reactor_pattern)
86 {
87 // Perf note: The `target` capture is just a ref-counted pointer copy.
88 non_blocking_func = [sock, target](Error_code* err_code) -> size_t { return sock->receive(*target, err_code); };
89 }
90 // else { Leave it blank; won't be used. }
91
92 // Invoke this jack-of-all-trades, just supplying the op type-specific little pieces of the puzzle.
93 owner_node->async_op
95 (sock,
96 std::move(non_blocking_func),
98 std::move(on_result));
99 // on_result, non_blocking_func may now be dead due to move(), but we are done with them (and everything else).
100}
101
103 const Fine_time_pt& wait_until)
104{
105 // Much like async_receive_impl(). Keeping comments light.
106
107 auto owner_node = node_or_post_error(std::move(on_result));
108 if (!owner_node)
109 {
110 return;
111 }
112 // else
113 assert(!on_result.empty());
114
115 const bool reactor_pattern = !source;
116 const auto sock = cast(shared_from_this());
117
118 Function<size_t (Error_code*)> non_blocking_func;
119 if (!reactor_pattern)
120 {
121 non_blocking_func = [sock, source](Error_code* err_code) -> size_t { return sock->send(*source, err_code); };
122 }
123
124 owner_node->async_op
126 (sock,
127 std::move(non_blocking_func),
129 std::move(on_result));
130}
131
133{
134 assert(async_task_engine()); // Note: cannot return civilized error, as nowhere to place error-receiving callback!
135
136 const auto owner_node = static_cast<Node*>(node());
137 if (!owner_node)
138 {
139 /* No Node* anymore => state is CLOSED, and reason why it was closed should be here. This is how we report
140 * attempts to do the non-blocking ops like receive() on CLOSED sockets, and we are no different. */
141 const auto err_code = disconnect_cause();
142
143 FLOW_LOG_WARNING("Cannot perform async op on object [" << this << "]: it is already closed for "
144 "reason [" << err_code << '/' << err_code.message() << "].");
145 on_result(err_code, 0); // It post()s user's originally-passed-in handler.
146 return 0;
147 }
148 // else
149 return owner_node;
150}
151
153{
154 using boost::static_pointer_cast;
155 return static_pointer_cast<Peer_socket>(sock);
156}
157
158// Node implementations (dealing with individual Peer_sockets).
159
161{
162 // An asio::Node always creates asio::Peer_socket (subclass) instances.
163 const auto sock = static_cast<Peer_socket*>(sock_create_forward_plus_ctor_args<Peer_socket>(opts));
164
165 // As promised, propagate Node's service to created kid sockets (even if null). They can overwrite.
166 sock->set_async_task_engine(m_target_task_engine); // Thread-safe before we give out sock to others.
167
168 return static_cast<net_flow::Peer_socket*>(sock);
169}
170
172 const boost::asio::const_buffer& serialized_metadata,
173 const Peer_socket_options* opts,
174 Handler_func&& on_result)
175{
176 using boost::asio::null_buffers;
177
178 // We are in thread U != W.
179
180 /* This follows similar beats to sync_connect_impl(); though ultimately it happens to be simpler
181 * by leveraging async_send(). */
182
183#ifndef NDEBUG
184 const auto target_task_engine = async_task_engine();
185#endif
186 assert(target_task_engine); // Note: cannot return civilized error, as nowhere to place error-receiving callback!
187
188 Error_code conn_err_code;
189 const auto sock = Peer_socket::cast(connect_with_metadata(to, serialized_metadata, &conn_err_code, opts));
190 if (!sock)
191 {
192 // It's probably some user error like an invalid destination.
193 on_result(conn_err_code, Peer_socket::Ptr()); // It post()s user's originally-passed-in handler.
194 return;
195 }
196 // else we have a socket that has started connecting.
197
198 /* We must clean up sock (call sock->close_abruptly(&dummy)) at any return point (including
199 * exception throw) below, EXCEPT the success case. */
200
201 /* "Cheat": we just want it to be writable, indicating it is connected (e.g., see sync_connect_impl()).
202 * Fortunately we can use async_send(null_buffers()) for exactly that purpose: it will await writability
203 * (not even conceivably competing with any other entity awaiting same, since we haven't given socket to
204 * anyone yet); but will not actually write anything when ready, merely calling our handler. */
205 const auto on_writable = [this, on_result = std::move(on_result), sock]
206 (const Error_code& wait_err_code, [[maybe_unused]] size_t n_sent0)
207 {
208 assert(n_sent0 == 0);
209 FLOW_LOG_TRACE("Async connect op for new socket [" << sock << "] detected writable status with "
210 "result code [" << wait_err_code << '/' << wait_err_code.message() << "].");
211
212 if (wait_err_code == error::Code::S_WAIT_USER_TIMEOUT)
213 {
214 // Our contract is timeout => report failure to user = pass null socket; so close the doomed socket now.
215 Error_code dummy_prevents_throw;
216 sock->close_abruptly(&dummy_prevents_throw);
217
218 on_result(wait_err_code, Peer_socket::Ptr()); // It post()s user's originally-passed-in handler.
219 // *sock should lose all references and destruct shortly, as we didn't pass it to on_result().
220 }
221 else if (wait_err_code)
222 {
223 // error reaching initial writability, except timeout, should have led to closure of socket.
224 assert(sock->state() == Peer_socket::State::S_CLOSED);
225 /* So we won't even pass the short-lived socket to callback, indicating failure via null pointer.
226 * See comment in sync_connect_impl() about how we avoid passing an error socket for user to discover. */
227
228 on_result(wait_err_code, Peer_socket::Ptr()); // It post()s user's originally-passed-in handler.
229 // As above, *sock should destruct soon.
230 }
231 else
232 {
233 assert(!wait_err_code);
234 on_result(wait_err_code, Peer_socket::Ptr()); // It post()s user's originally-passed-in handler.
235 // *sock lives on by being passed to them and probably saved by them!
236 }
237
238 FLOW_LOG_TRACE("Finished executing user handler for async connect of [" << sock << "].");
239 };
240 // `on_result` might be hosed now (move()d).
241
242 // As advertised, this is expected (and we need it for the async_send() below, as it post()s on_writable()):
243 assert(sock->async_task_engine() == target_task_engine);
244
245 // Go!
246 sock->async_send(null_buffers(), max_wait, on_writable);
247} // Node::async_connect_impl()
248
249// Free implementations.
250
251std::ostream& operator<<(std::ostream& os, const Peer_socket* sock)
252{
253 return
254 sock
255 ? (os << "Asio_flow_socket "
256 "[Task_engine@" << static_cast<const void*>(&(sock->async_task_engine_cref())) << "] ["
257 << static_cast<const net_flow::Peer_socket*>(sock) // Show underlying net_flow:: socket's details.
258 << "] @" << static_cast<const void*>(sock))
259 : (os << "Asio_flow_socket@null");
260}
261
262} // namespace flow::net_flow::asio
Interface that the user should implement, passing the implementing Logger into logging classes (Flow'...
Definition: log.hpp:1291
@ S_PEER_SOCKET_WRITABLE
Event type specifying the condition of interest wherein a target Peer_socket sock is such that callin...
@ S_PEER_SOCKET_READABLE
Event type specifying the condition of interest wherein a target Peer_socket sock is such that callin...
Peer_socket::Ptr connect_with_metadata(const Remote_endpoint &to, const boost::asio::const_buffer &serialized_metadata, Error_code *err_code=0, const Peer_socket_options *opts=0)
Same as connect() but sends, as part of the connection handshake, the user-supplied metadata,...
A peer (non-server) socket operating over the Flow network protocol, with optional stream-of-bytes an...
@ S_CLOSED
Neither future reads nor writes are possible, AND Node has disowned the Peer_socket.
Error_code disconnect_cause() const
The error code that perviously caused state() to become State::S_CLOSED, or success code if state is ...
Node * node() const
Node that produced this Peer_socket.
Definition: peer_socket.cpp:95
A subclass of net_flow::Node that adds the ability to easily and directly use net_flow sockets in gen...
Definition: node.hpp:236
net_flow::Peer_socket * sock_create(const Peer_socket_options &opts) override
Implements superclass API.
util::Task_engine * m_target_task_engine
See async_task_engine().
Definition: node.hpp:575
util::Task_engine * async_task_engine()
Pointer (possibly null) for the flow::util::Task_engine used by any coming async I/O calls and inheri...
Definition: node.cpp:40
void async_connect_impl(const Remote_endpoint &to, const Fine_duration &max_wait, const boost::asio::const_buffer &serialized_metadata, const Peer_socket_options *opts, Handler_func &&on_result)
Implementation core of async_connect*() that gets rid of templated or missing arguments thereof.
A net_flow::Peer_socket that adds integration with boost.asio.
Definition: peer_socket.hpp:41
~Peer_socket() override
Boring virtual destructor as in superclass. See notes there.
Definition: peer_socket.cpp:37
void set_async_task_engine(util::Task_engine *target_async_task_engine)
Overwrites the value to be returned by next async_task_engine().
Definition: peer_socket.cpp:53
static Ptr cast(net_flow::Peer_socket::Ptr sock)
Convenience method that polymorphically casts from net_flow::Peer_socket::Ptr to subclass pointer net...
util::Task_engine * async_task_engine()
Pointer (possibly null) for the flow::util::Task_engine used by any coming async_*() I/O calls.
Definition: peer_socket.cpp:42
boost::shared_ptr< Peer_socket > Ptr
Short-hand for shared_ptr to Peer_socket.
Definition: peer_socket.hpp:46
boost::shared_ptr< Target_bufs > Target_bufs_ptr
Short-hand for a low-cost-copyable smart pointer of Target_bufs.
const util::Task_engine & async_task_engine_cref() const
Read-only version of async_task_engine().
Definition: peer_socket.cpp:48
util::Task_engine * m_target_task_engine
See async_task_engine().
void async_receive_impl(Target_bufs_ptr target, Handler_func &&on_result, const Fine_time_pt &wait_until)
De-templated implementation of all async_receive() methods.
Definition: peer_socket.cpp:61
Node * node_or_post_error(Handler_func &&on_result)
Helper that returns the net_flow::asio::Node that generated *this; unless *this is closed; in which c...
void async_send_impl(Source_bufs_ptr source, Handler_func &&on_result, const Fine_time_pt &wait_until)
De-templated implementation of all async_send() methods.
Peer_socket(log::Logger *logger_ptr, util::Task_engine *task_engine, const Peer_socket_options &opts)
Constructs object.
Definition: peer_socket.cpp:28
boost::shared_ptr< Source_bufs > Source_bufs_ptr
Short-hand for a low-cost-copyable smart pointer of Source_bufs.
boost::shared_ptr< Peer_socket > Ptr
Short-hand for ref-counted pointer to mutable values of type Target_type::element_type (a-la T*).
#define FLOW_LOG_INFO(ARG_stream_fragment)
Logs an INFO message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
Definition: log.hpp:197
#define FLOW_LOG_WARNING(ARG_stream_fragment)
Logs a WARNING message into flow::log::Logger *get_logger() with flow::log::Component get_log_compone...
Definition: log.hpp:152
#define FLOW_LOG_TRACE(ARG_stream_fragment)
Logs a TRACE message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
Definition: log.hpp:227
Contains classes that add boost.asio integration to the main Flow-protocol classes such as net_flow::...
Definition: node.cpp:25
std::ostream & operator<<(std::ostream &os, const Peer_socket *sock)
Prints string representation of given socket to given standard ostream and returns the latter.
@ S_WAIT_USER_TIMEOUT
A blocking (sync_) or background-blocking (async_) operation timed out versus user-supplied time limi...
boost::asio::io_service Task_engine
Short-hand for boost.asio event service, the central class of boost.asio.
Definition: util_fwd.hpp:135
boost::system::error_code Error_code
Short-hand for a boost.system error code (which basically encapsulates an integer/enum error code and...
Definition: common.hpp:502
Fine_clock::duration Fine_duration
A high-res time duration as computed from two Fine_time_pts.
Definition: common.hpp:410
Fine_clock::time_point Fine_time_pt
A high-res time point as returned by Fine_clock::now() and suitable for precise time math in general.
Definition: common.hpp:407
A set of low-level options affecting a single Peer_socket.
Definition: options.hpp:36
Represents the remote endpoint of a Flow-protocol connection; identifies the UDP endpoint of the remo...
Definition: endpoint.hpp:93