Flow 1.0.0
Flow project: Full implementation reference.
node.hpp
Go to the documentation of this file.
1/* Flow
2 * Copyright 2023 Akamai Technologies, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the
5 * "License"); you may not use this file except in
6 * compliance with the License. You may obtain a copy
7 * of the License at
8 *
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in
12 * writing, software distributed under the License is
13 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
14 * CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing
16 * permissions and limitations under the License. */
17
18/// @file
19#pragma once
20
26
28{
29/**
30 * A subclass of net_flow::Node that adds the ability to easily and directly use `net_flow` sockets in general
31 * boost.asio event loops.
32 *
33 * net_flow::Node, the superclass, provides the vast bulk of the functionality in this library, but
34 * the sockets it generates directly or indirectly (net_flow::Peer_socket, net_flow::Server_socket) cannot be used
35 * directly in a boost.asio event loop (see reference below for background). It is possible to use the
36 * vanilla net_flow::Node, etc., with *any* asynchronous event loop (boost.asio or native `epoll` or `select()`...) --
37 * notably by using a glue socket, as explained in Event_set doc header. However, for those using the boost.asio
38 * `io_service` as the app's core main loop, this is not as easy as what one might desire. (Side note:
39 * I personally strongly recommend using boost.asio event loops instead of rolling one's own or the many alternatives
40 * I have seen. It is also on its way to ending up in the C++ standard library. Lastly, such a loop is used
41 * internally to implement `net_flow`, so you know I like it, for what that's worth.)
42 *
43 * Basically, if you use this asio::Node instead of net_flow::Node, then you get everything in the latter plus
44 * the ability to use Flow-protocol sockets `{Server|Peer}_socket` with boost.asio as easily as and similarly to how
45 * one uses `boost::asio::ip::tcp::acceptor::async_*()` and `boost::asio::ip::tcp::socket::async_*()`, respectively.
46 *
47 * The usage pattern is simple, assuming you understand tha basics of net_flow::Node, net_flow::Server_socket, and
48 * net_flow::Peer_socket -- and especially if you are familiar with boost.asio's built-in `tcp::acceptor` and
49 * `tcp::socket` (and, of course, `boost::asio::io_service` that ties them and many more I/O objects together).
50 *
51 * - First, simply create an asio::Node using the same form of constructor as you would with net_flow::Node, plus
52 * an arg that's a pointer to the subsequently used target boost.asio flow::util::Task_engine, which is memorized.
53 * - All facilities from the regular net_flow::Node are available on this object (plus some minor boost.asio ones).
54 * - Upon obtaining a net_flow::Server_socket::Ptr via listen(), cast it to its true polymorphic
55 * equivalent asio::Server_socket::Ptr, via convenience method net_flow::asio::Server_socket::cast():
56 * `auto serv = asio::Server_socket::cast(node.listen(...));`.
57 * - All facilities from the superclass net_flow::Server_socket are available (plus some minor boost.asio ones).
58 * - Similarly cast any net_flow::Peer_socket::Ptr to `asio` equivalent for any peer socket returned
59 * by asio::Server_socket::accept() and asio::Node::connect() and similar.
60 * - All facilities from the superclass net_flow::Peer_socket are available (plus some minor boost.asio ones).
61 * - All Server_socket and Peer_socket objects created by the Node subsequently will inherit the `Task_engine*`
62 * supplied in Node constructor. However, you may overwrite these (or remove them, by setting to null) at any
63 * time via Node::set_async_task_engine(), Server_socket::set_async_task_engine(),
64 * Peer_socket::set_async_task_engine().
65 * - The only rule is this: `net_flow::asio::X::async_task_engine()` must not be null whenever one calls
66 * `net_flow::asio::X::async_F()`, where X is Node, Server_socket, or Peer_socket; and `async_F()` is some async
67 * operation added on top of base class `net_flow::X` (which typically features
68 * `net_flow::X::F()` and `net_flow::X::sync_F()` -- the non-blocking and thread-blocking versions of F,
69 * respectively).
70 * - The automatic inheriting of `Task_engine` pointer is as follows:
71 * `Node -listen-> Server_socket -accept-> Peer_socket <-connect- Node`.
72 * - For each op inside an "arrow," I mean to include all forms of it
73 * (`F` includes: `F()`, `sync_F()`, `async_F()`).
74 * - This pattern is more practical/reasonable for us than the boost.asio pattern of each object memorizing its
75 * `io_service` reference in its explicit constructor. That is because we fully embrace the
76 * factory-of-shared-pointers pattern, and boost.asio doesn't use that pattern. We also provide an explicit
77 * mutator however.
78 * - We also allow for a null `Task_engine*` -- except when `async_*()` is actually performed.
79 * (boost.asio I/O objects seem to require a real flow::util::Task_engine, even if not planning to do anything
80 * async. Note, though, that if you don't plan to do anything async with `net_flow`, then you can just use
81 * vanilla net_flow::Node. Bottom line, you have (on this topic) *at least* as much flexibility as with
82 * boost.asio built-in I/O objects.)
83 *
84 * Having thus obtained asio::Server_socket and asio::Peer_socket objects (and assigned each their desired
85 * boost.asio `io_service` loop(s) -- essentially as one does with, e.g., `tcp::acceptor` and `tcp::socket`
86 * respectively) -- you many now use the full range of I/O operations. Here is a bird's eye view of all operations
87 * available in the entire hierarchy. Note that conceptually this is essentially identical to boost.asio `tcp`
88 * I/O objects, but nomenclature and API decisions are in some cases somewhat different (I contend: with good reason).
89 *
90 * - Non-blocking operations:
91 * - net_flow::Node::listen() -> net_flow::Server_socket: Create an object listening for connections.
92 * - Equivalent: `tcp::acceptor` constructor that takes a local endpoint arg.
93 * - net_flow::Server_socket::accept() -> net_flow::Peer_socket: Return a fully connected peer socket object,
94 * if one is immediately ready due to the listening done so far.
95 * - Equivalent: `tcp::acceptor::accept()` after ensuring `tcp::acceptor::non_blocking(true)`.
96 * NetFlow null return value <-> TCP "would-block" error code.
97 * - net_flow::Node::connect() -> net_flow::Peer_socket: Return a connectING peer socket object immediately,
98 * as it tries to connected to the server socket on the remote side.
99 * - Equivalent: `tcp::socket::connect()` after ensuring `tcp::socket::non_blocking(true)`.
100 * - Variations: see also `*_with_metadata()` for an added feature without direct TCP equivalent.
101 * - net_flow::Peer_socket::send(): Queue up at least 1 of the N given bytes to send to peer ASAP; or
102 * indicate no bytes could be *immediately* queued.
103 * - Equivalent: `tcp::socket::send()` after ensuring `tcp::socket::non_blocking(true)`.
104 * NetFlow 0 return value <-> TCP "would-block" error code.
105 * - net_flow::Peer_socket::receive(): Dequeue at least 1 of the desired N bytes earlier received from peer; or
106 * indicate none were *immediately* available.
107 * - Equivalent: `tcp::socket::receive()` after ensuring `tcp::socket::non_blocking(true)`.
108 * NetFlow 0 return value <-> TCP "would-block" error code.
109 * - Blocking operations:
110 * - net_flow::Server_socket::sync_accept() -> net_flow::Peer_socket: Return a fully connected peer socket object,
111 * blocking thread if necessary to wait for a connection to come in and fully establish.
112 * - Equivalent: `tcp::acceptor::accept()` after ensuring `tcp::acceptor::non_blocking(false)`.
113 * - Variations: optional timeout can be specified.
114 * - Variations: `reactor_pattern` mode, where the accept() call itself is left to aftermath of sync_accept().
115 * - net_flow::Node::sync_connect() -> net_flow::Peer_socket: Return a connected peer socket object,
116 * blocking thread as needed to reach the remote server and fully establish connection.
117 * - Equivalent: `tcp::socket::connect()` after ensuring `tcp::socket::non_blocking(false)`.
118 * - Variations: see also `*_with_metadata()` as above.
119 * - net_flow::Peer_socket::sync_send(): Queue up at least 1 of the N given bytes to send to peer ASAP,
120 * blocking thread if necessary to wait for this to become possible (internally, as determined by
121 * such things as flow control, congestion control, buffer size limits).
122 * - Equivalent: `tcp::socket::send()` after ensuring `tcp::socket::non_blocking(false)`.
123 * - Variations: optional timeout can be specified.
124 * - Variations: `null_buffers` mode, where the `send()` call itself is left to aftermath of sync_send().
125 * - net_flow::Peer_socket::sync_receive(): Dequeue at least 1 of the desired N bytes from peer,
126 * blocking thread if necessary to wait for this to arrive from said remote peer.
127 * - Equivalent: `tcp::socket::receive()` after ensuring `tcp::socket::non_blocking(false)`.
128 * - Variations: optional timeout can be specified.
129 * - Variations: `null_buffers` mode, where the `receive()` call itself is left to aftermath of sync_receive().
130 * - Asynchronous operations (`asio::*` classes provide these):
131 * - asio::Server_socket::async_accept() -> asio::Peer_socket: Obtain a fully connected peer socket object,
132 * waiting as necessary in background for a connection to come in and fully establish,
133 * then invoking user-provided callback as if by `io_service::post()`.
134 * - Equivalent: `tcp::acceptor::async_accept()`.
135 * - Variations: optional timeout can be specified.
136 * - Variations: `reactor_pattern` mode, where the `accept()` call itself is left to user handler.
137 * - asio::Node::async_connect() -> net_flow::Peer_socket: Return a fully connected peer socket object,
138 * waiting as necessary in background to reach the remote server and fully establish connection,
139 * then invoking user-provided callback as if by `io_service::post()`.
140 * - Equivalent: `tcp::socket::async_connect()`.
141 * - Variations: optional timeout can be specified.
142 * - Variations: see also `*_with_metadata()` as above.
143 * - asio::Peer_socket::async_send(): Queue up at least 1 of the N given bytes to send to peer ASAP,
144 * waiting as necessary in background for this to become possible (as explained above),
145 * then invoking user-provided callback as if by `io_service::post()`.
146 * - Equivalent: `tcp::socket::async_send()`.
147 * - Variations: optional timeout can be specified.
148 * - Variations: `null_buffers` mode, where the `send()` call itself is left to user handler.
149 * - asio::Peer_socket::async_receive(): Dequeue at least 1 of the desired N bytes from peer,
150 * waiting as necessary in background for this to arrive from said remote peer,
151 * then invoking user-provided callback as if by `io_service::post()`.
152 * - Equivalent: `tcp::socket::async_receive()`.
153 * - Variations: optional timeout can be specified.
154 * - Variations: `null_buffers` mode, where the `receive()` call itself is left to user handler.
155 * - Awaiting socket events (status):
156 * - Note that this is both a fundamental building block making much of the above work and simultaneously
157 * best to avoid using directly. In particular, boost.asio is (subjectively speaking) a way to write a
158 * cleaner event loop, and "Asynchronous operations" above allow Flow-protocol use with boost.asio proactors.
159 * - Event_set::add_wanted_socket(), Event_set::remove_wanted_socket(), Event_set::swap_wanted_sockets(),
160 * Event_set::clear_wanted_sockets(), and related: Specify the precise socket(s) for which to check on
161 * particular status; and the status(es) for which to wait (e.g., Readable, Writable, Acceptable).
162 * - Equivalent (`select()`): `FD_SET()`, `FD_CLR()`, `FD_ZERO()`, etc.
163 * - Equivalent (`poll()`): The input events array.
164 * - Event_set::sync_wait(): Synchronous I/O multiplexing, with indefinite blocking or finite timeout blocking.
165 * - Equivalent: `select()` with null timeout (indefinite) or non-null, non-zero timeout (finite).
166 * - Equivalent: `poll()` with -1 timeout (indefinite) or positive timeout (finite).
167 * - Event_set::poll(): Non-blocking I/O multiplexing, with an instant check for the socket status(es) of interest.
168 * - Equivalent: `select()` with non-null, zero timeout.
169 * - Equivalent: `poll()` with 0 timeout.
170 * - Event_set::async_wait(): Asynchronous I/O multiplixing, like sync_wait() but waits in background and
171 * executes user-provided callback from an unspecified thread. (A typical callback might set a `future`
172 * result; `io_service::post()` a task to some boost.asio event loop; perhaps set a flag that is
173 * periodically checked by some user thread; or send a byte over some quick IPC mechanism like a POSIX
174 * domain socket or loopback UDP socket -- or even a condition variable.)
175 * - Equivalents: none in POSIX, that I know of. Windows "overlapped" async I/O sounds vaguely like a distant
176 * cousin. boost.asio `tcp::socket::async_*` (with `null_buffers` buffer argument if relevant) can simulate
177 * it also, but that's like using a modern PC to emulate an old graphing calculator... it's only
178 * conceivably useful if you need to work with some old-school 3rd party I/O library perhaps.
179 *
180 * ### Thread safety ###
181 * See net_flow::Node thread safety notes.
182 *
183 * The one exception to the "concurrent write access to one Node is thread-safe without added locking" is
184 * for set_async_task_engine(). See doc headers for async_task_engine() and set_async_task_engine() for more.
185 *
186 * ### Thread safety of destructor ###
187 * See net_flow::Node thready safety notes. Note, in particular, however, that as of this writing you may *not* let
188 * asio::Node destruct if there are outstanding `{Node|Server|Peer}_socket::async_*()` operations. The ability to
189 * early-destruct during `sync_*()` does not extend to `async_*()`.
190 *
191 * @see boost.asio doc: https://www.boost.org/doc/libs/1_63_0/doc/html/boost_asio.html.
192 * @see Superclass net_flow::Node, companion classes asio::Peer_socket and asio::Server_socket.
193 *
194 * @todo To enable reactor-style `async_*()` operations, meaning waiting for readability/writability/etc. but *not*
195 * performing the actual operation before calling the user handler, we provide a `null_buffers`-style interface;
196 * like newer boost.asio versions we should deprecate this in favor of simpler `async_wait()` APIs.
197 * This would apply to net_flow::asio::Peer_socket and net_flow::asio::Server_socket APIs.
198 * Similarly consider doing this for the `sync_*()` operations in the non-`asio` superclasses net_flow::Peer_socket
199 * and net_flow::Server_socket. Note that Event_set::async_wait() and Event_set::sync_wait() already exist; they
200 * are powerful but a bit complex to use in these simple situations. Hence the following hypothetical wrappers would
201 * be welcome replacements for the deprecated `null_buffers` and "reactor-style" APIs in these classes:
202 * `net_flow::Peer_socket::sync_wait(Event_set::Event_type)`,
203 * `net_flow::asio::Peer_socket::async_wait(Event_set::Event_type)`,
204 * `net_flow::Server_socket::sync_wait()`,
205 * `net_flow::Server_socket::async_wait()`. (Timeout-arg versions would be desired also, as they exist now.)
206 *
207 * @internal
208 *
209 * The decision to make set_async_task_engine() thread-unsafe, by refusing to put locking around internal uses of
210 * #m_target_task_engine -- and in contrast to the rest of the main classes, which are generally thread-safe in
211 * this sense -- was taken for the following reasons.
212 * 1. Less mandatory locking = avoids potential performance loss.
213 * 2. Realistically, set_async_task_engine() is not commonly used; and if it *is* used, it's tough to contrive
214 * a situation where one needs to call it *while* other threads are carrying on async operations -- so potential
215 * performance loss would be to provide a guarantee that wouldn't be very useful.
216 * 3. Furthermore, it seems wrong to encourage wildly changing around the service throughout the socket's life.
217 * Just don't.
218 *
219 * There are counter-arguments.
220 * 1. Saying Node is safe for concurrent access is easier and cleaner than saying that EXCEPT certain methods.
221 * - Well, that's a shame, but it's only a comment ultimately.
222 * 2. If one *wanted* to use set_async_task_engine() concurrently with the methods against which it is
223 * thread-unsafe, they couldn't: async operations would require access to async_task_engine() during their
224 * background work, and there's no good way to let user lock around those parts.
225 * In other words, we're not merely leaving locking to user; no user locking solution could possibly fully work.
226 * - That's a true concern, but on balance it is defeated by reasons 2 and 3 above. (Realistically -- doubt anyone
227 * would care; and don't want to encourage them to do strange stuff.)
228 *
229 * @todo The `sync_*()` early-destruction functionality explained in "Thread safety of destructor" above
230 * could be extended to `async_*()`. This is not a huge deal but would be nice for consistency. Not allowing it
231 * currently is really a result of internal implementation concerns (it is easier to not account for this corner case).
232 * Alternatively, maybe we should go the other way and not support the somewhat-weird corner case in `sync_*()`.
233 */
234class Node :
235 public net_flow::Node
236{
237public:
238 // Constructors/destructor.
239
240 /**
241 * Constructs Node. All docs from net_flow::Node::Node super-constructor apply.
242 *
243 * The difference is you may supply a flow::util::Task_engine. More details below.
244 *
245 * @param logger
246 * See net_flow::Node::Node().
247 * @param target_async_task_engine
248 * Initial value for async_task_engine().
249 * For the effect this has see async_task_engine() doc header.
250 * @param low_lvl_endpoint
251 * See net_flow::Node::Node().
252 * @param net_env_sim
253 * See net_flow::Node::Node().
254 * @param err_code
255 * See net_flow::Node::Node().
256 * @param opts
257 * See net_flow::Node::Node().
258 */
259 explicit Node(log::Logger* logger, util::Task_engine* target_async_task_engine,
260 const util::Udp_endpoint& low_lvl_endpoint,
261 Net_env_simulator* net_env_sim = 0, Error_code* err_code = 0,
262 const Node_options& opts = Node_options());
263
264 // Methods.
265
266 /**
267 * Pointer (possibly null) for the flow::util::Task_engine used by any coming async I/O calls and inherited by any
268 * subsequently generated Server_socket and Peer_socket objects.
269 *
270 * One, this is used by `Node::async_*()` I/O calls, namely Node::async_connect() and all its variants, both
271 * synchronously and during the async phases of such calls, whenever placing a user-supplied handler routine
272 * onto an `Task_engine`. Whatever async_task_engine() returns at that time is the `Task_engine` used.
273 *
274 * Two, when listen() creates Server_socket, or connect() creates Peer_socket, at that moment this is used.
275 * (Recall: `[a]sync_connect()` and variants will all call connect() internally.) Hence, the initial value
276 * returned by Peer_socket::async_task_engine() and Server_socket::async_task_engine() is thus inherited from
277 * the factory Node's async_task_engine() result.
278 *
279 * ### Thread safety (operating on a given Node) ###
280 *
281 * - set_async_task_engine() is the only way to change the returned value, upon Node construction.
282 * - It is not safe to call set_async_task_engine() at the same time as async_task_engine().
283 * - Define "any Node async op" as Node::async_connect() and all its variants, including actions it takes
284 * in the background (asynchronously).
285 * - async_task_engine() must return the same value -- and *not* null -- throughout "any Node async op."
286 * - Define "any Node async inheriting op" as any non-blocking Node method that creates a Peer_socket or
287 * Server_socket (namely, connect(), listen(), and any variants).
288 * - Note that `[a]sync_connect()` (and variants) each invokes this non-blocking connect() at the start.
289 * - async_task_engine() must return the same value -- possibly null -- throughout "any Node async inheriting
290 * op."
291 * - Put simply, a null `Task_engine` can be inherited by a generated socket, but a null cannot be used when
292 * performing an async op. Furthermore, set_async_task_engine() is unsafe through any of those and through
293 * async_task_engine() itself.
294 *
295 * Informal tip: Typically you'd never call set_async_task_engine(), hence there is no problem. If you DO need to
296 * call it, even then normally it's easy to ensure one does this before any actual async calls are made.
297 * Trouble only begins if one calls set_async_task_engine() "in the middle" of operating the socket.
298 * There is no way to add outside locking to make that work, either, due to async "tails" of some calls.
299 *
300 * @return Null or non-null pointer to flow::util::Task_engine.
301 */
303
304 /**
305 * Overwrites the value to be returned by next async_task_engine().
306 *
307 * See async_task_engine() doc header before using this.
308 *
309 * @param target_async_task_engine
310 * See async_task_engine().
311 */
312 void set_async_task_engine(util::Task_engine* target_async_task_engine);
313
314 /**
315 * The boost.asio asynchronous version of sync_connect(), performing any necessary wait and connection in the
316 * background, and queueing the user-provided callback on the given boost.asio flow::util::Task_engine.
317 * Acts just like connect() but instead of returning a connecting socket immediately, it returns and asycnhronously
318 * waits until the initial handshake either succeeds or fails, and then invokes the given callback (in manner
319 * equivalent to boost.asio `Task_engine::post()`), passing to it the connected socket or null, respectively,
320 * along with a success #Error_code or otherwise, respectively. Additionally, you can specify a timeout; not
321 * completing the connection by that time is a specific #Error_code.
322 *
323 * See subtleties about connect timeouts in Node::sync_connect() doc header.
324 *
325 * The following are the possible outcomes, in ALL cases ending with an `on_result()` invocation `post()`ed
326 * onto `Task_engine* async_task_engine()` as follows:
327 * `on_result(const Error_code& err_code, asio::Peer_socket::Ptr)`, with the 2 args determined as follows.
328 * 1. Connection succeeds before the given timeout expires (or succeeds, if no timeout given).
329 * Socket is at least Writable at time of return. The new socket and a success code are passed to
330 * callback.
331 * 2. Connection fails before the given timeout expires (or fails, if no timeout given).
332 * null is passed in, and a non-success #Error_code is set to reason for connection failure and passed in.
333 * (Note that a built-in handshake timeout -- NOT the given user timeout, if any -- falls under this category.)
334 * #Error_code error::Code::S_WAIT_INTERRUPTED means the wait was interrupted (similarly to POSIX's `EINTR`).
335 * 3. A user timeout is given, and the connection does not succeed before it expires. Reported similarly
336 * to 2, with specific code error::Code::S_WAIT_USER_TIMEOUT.
337 * (Rationale: consistent with Server_socket::sync_accept(),
338 * Peer_socket::sync_receive(), Peer_socket::sync_send() behavior.)
339 *
340 * On success, the `asio::Peer_socket::Ptr sock` passed to `on_result()` will have
341 * `sock->async_task_engine() == N->async_task_engine()`, where N is the current Node. You can overwrite it
342 * subsequently if desired.
343 *
344 * Tip: Typical types you might use for `max_wait`: `boost::chrono::milliseconds`,
345 * `boost::chrono::seconds`, `boost::chrono::high_resolution_clock::duration`.
346 *
347 * Corner case: `*this` Node must exist throughout the async op, including when `on_result()` is actually
348 * executed in the appropriate thread.
349 *
350 * error::Code other than success passed to `on_result()`:
351 * error::Code::S_WAIT_INTERRUPTED, error::Code::S_WAIT_USER_TIMEOUT, error::Code::S_NODE_NOT_RUNNING,
352 * error::Code::S_CANNOT_CONNECT_TO_IP_ANY, error::Code::S_OUT_OF_PORTS,
353 * error::Code::S_INTERNAL_ERROR_PORT_COLLISION,
354 * error::Code::S_CONN_TIMEOUT, error::Code::S_CONN_REFUSED,
355 * error::Code::S_CONN_RESET_BY_OTHER_SIDE, error::Code::S_NODE_SHUTTING_DOWN,
356 * error::Code::S_OPTION_CHECK_FAILED.
357 *
358 * @tparam Rep
359 * See `boost::chrono::duration` documentation (and see above tip).
360 * @tparam Period
361 * See `boost::chrono::duration` documentation (and see above tip).
362 * @tparam Handler
363 * A type such that if `Handler h`, then a function equivalent to `{ h(err_code, sock); }` can
364 * be `post()`ed onto an `Task_engine`, with `const Error_code& err_code` and `asio::Peer_socket::Ptr sock`.
365 * @param to
366 * See connect().
367 * @param on_result
368 * Callback whose invocation as explained above to add to `*async_task_engine()` as if by `post()`.
369 * Note: Use `bind_executor(S, F)` to bind your handler to the util::Strand `S`.
370 * @param max_wait
371 * The maximum amount of time from now to wait before giving up on the wait and invoking `on_result()`
372 * with a timeout error code.
373 * `"duration<Rep, Period>::max()"` will eliminate the time limit and cause indefinite wait
374 * -- however, not really, as there is a built-in connection timeout that will expire.
375 * @param opts
376 * See connect().
377 */
378 template<typename Rep, typename Period, typename Handler>
379 void async_connect(const Remote_endpoint& to,
380 const Handler& on_result,
381 const boost::chrono::duration<Rep, Period>& max_wait,
382 const Peer_socket_options* opts = 0);
383
384 /**
385 * A combination of async_connect() and connect_with_metadata() (asynchronously blocking connect, with supplied
386 * metadata).
387 *
388 * @tparam Rep
389 * See async_connect().
390 * @tparam Period
391 * See async_connect().
392 * @tparam Handler
393 * See async_connect().
394 * @param to
395 * See async_connect().
396 * @param on_result
397 * See async_connect().
398 * @param max_wait
399 * See async_connect().
400 * @param serialized_metadata
401 * See connect_with_metadata().
402 * @param opts
403 * See async_connect().
404 */
405 template<typename Rep, typename Period, typename Handler>
407 const Handler& on_result,
408 const boost::chrono::duration<Rep, Period>& max_wait,
409 const boost::asio::const_buffer& serialized_metadata,
410 const Peer_socket_options* opts = 0);
411
412 /**
413 * Equivalent to `async_connect(to, on_result, duration::max(), opts)`; i.e., async_connect()
414 * with no user timeout.
415 *
416 * @tparam Handler
417 * See other async_connect().
418 * @param to
419 * See other async_connect().
420 * @param on_result
421 * See other async_connect().
422 * @param opts
423 * See other async_connect().
424 */
425 template<typename Handler>
426 void async_connect(const Remote_endpoint& to,
427 const Handler& on_result,
428 const Peer_socket_options* opts = 0);
429
430 /**
431 * Equivalent to `async_connect_with_metadata(to, on_result, duration::max(),
432 * serialized_metadata, opts)`; i.e., async_connect_with_metadata() with no user timeout.
433 *
434 * @tparam Handler
435 * See other async_connect_with_metadata().
436 * @param to
437 * See other async_connect_with_metadata().
438 * @param on_result
439 * See other async_connect_with_metadata().
440 * @param serialized_metadata
441 * See other async_connect_with_metadata().
442 * @param opts
443 * See other async_connect_with_metadata().
444 */
445 template<typename Handler>
447 const Handler& on_result,
448 const boost::asio::const_buffer& serialized_metadata,
449 const Peer_socket_options* opts = 0);
450
451private:
452 // Friends.
453
454 /// Peer_socket must be able to forward to async_op(), etc.
455 friend class Peer_socket;
456 /// Server_socket must be able to forward to async_op(), etc.
457 friend class Server_socket;
458
459 // Types.
460
461 /// Short-hand for the `Task_engine`-compatible connect `Handler` concrete type for class-internal code.
462 using Handler_func = Function<void (const Error_code& err_code, Peer_socket::Ptr new_sock)>;
463
464 // Methods.
465
466 /**
467 * Implements superclass API.
468 *
469 * @param opts
470 * See superclass API.
471 * @return See superclass API.
472 */
474
475 /**
476 * Implements superclass API.
477 *
478 * @param child_sock_opts
479 * See superclass API.
480 * @return See superclass API.
481 */
482 net_flow::Server_socket* serv_create(const Peer_socket_options* child_sock_opts) override;
483
484 /**
485 * Implementation of core asynchronous transfer methods, namely asio::Peer_socket::async_send(),
486 * asio::Peer_socket::async_receive(), and asio::Server_socket::async_accept(), once the asio::Node has been
487 * obtain from socket in any one of those methods.
488 *
489 * It is inspired by design of net_flow::Node::sync_op().
490 *
491 * Note the precise nature of `on_result` (see below).
492 *
493 * @tparam Socket
494 * Underlying object of the transfer operation (asio::Peer_socket or asio::Server_socket).
495 * @tparam Base_socket
496 * Superclass of `Socket`.
497 * @tparam Non_blocking_func_ret_type
498 * The return type of the calling transfer operation (`size_t` or asio::Peer_socket::Ptr).
499 * @param sock
500 * Socket on which user called `async_*()`.
501 * @param non_blocking_func
502 * When this method believes it should attempt a non-blocking transfer op, it will execute
503 * `non_blocking_func(&err_code)` with some `Error_code err_code` existing.
504 * If `non_blocking_func.empty()`, do not call `non_blocking_func()` --
505 * invoke `on_result()` indicating no error so far, and let
506 * them do actual operation, if they want; we just tell them it should be ready for them. This is known
507 * as `null_buffers` mode or reactor pattern mode. Otherwise, do the operation and then
508 * invoke `on_result()` with the resulting error code, possibly success. This is the proactor pattern mode
509 * and arguably more typical.
510 * @param would_block_ret_val
511 * The value that `non_blocking_func()` returns to indicate it was unable to perform the
512 * non-blocking operation (i.e., no data/sockets available).
513 * @param ev_type
514 * Event type applicable to the type of operation this is. See Event_set::Event_type doc header.
515 * @param wait_until
516 * See `max_wait` argument on the originating `async_*()` method. This is absolute timeout time point
517 * derived from it; zero-valued if no timeout.
518 * @param on_result
519 * A function that will properly `post()` the original `on_result()` from the originating `async_*()` method
520 * onto `*async_task_engine()`. This must be done carefully, preserving any associated executor of that
521 * original `on_result` handler.
522 */
523 template<typename Socket, typename Base_socket, typename Non_blocking_func_ret_type>
524 void async_op(typename Socket::Ptr sock,
525 Function<Non_blocking_func_ret_type (Error_code*)>&& non_blocking_func,
526 Non_blocking_func_ret_type would_block_ret_val,
527 Event_set::Event_type ev_type,
528 const Fine_time_pt& wait_until,
529 Function<void (const Error_code&, Non_blocking_func_ret_type)>&& on_result);
530
531 /**
532 * Implementation core of `async_connect*()` that gets rid of templated or missing arguments thereof.
533 *
534 * E.g., the API would wrap this and supply a Fine_duration instead of generic `duration`; and supply
535 * `Fine_duration::max()` if user omitted the timeout argument; and convert a generic function object
536 * into a concrete `Function<>` object. Code bloat and possible circular definition issues are among the
537 * reasons for this "de-templating" pattern.
538 *
539 * @param to
540 * See connect().
541 * @param max_wait
542 * See the public `async_connect(timeout)`. `"duration<Rep, Period>::max()"` maps to the value
543 * `Fine_duration::max()` for this argument.
544 * @param serialized_metadata
545 * See connect_with_metadata().
546 * @param opts
547 * See connect().
548 * @param on_result
549 * `handler_func(on_result)`, where `on_result` is the user's `async_*()` method arg.
550 */
551 void async_connect_impl(const Remote_endpoint& to, const Fine_duration& max_wait,
552 const boost::asio::const_buffer& serialized_metadata,
553 const Peer_socket_options* opts,
554 Handler_func&& on_result);
555
556 /**
557 * Returns a functor that essentially performs `post()` `on_result` onto `*async_task_engine()` in a way suitable
558 * for a boost.asio-compatible async-op.
559 *
560 * ### Rationale ###
561 * See asio::Peer_socket::handler_func().
562 *
563 * @tparam Handler
564 * See async_connect().
565 * @param on_result
566 * See async_connect().
567 * @return Function to call from any context that will properly `post()` `on_result();` onto `*async_task_engine()`.
568 */
569 template<typename Handler>
570 Handler_func handler_func(Handler&& on_result);
571
572 // Data.
573
574 /// See async_task_engine().
576};
577
578// Template implementations.
579
580template<typename Socket, typename Base_socket, typename Non_blocking_func_ret_type>
581void Node::async_op(typename Socket::Ptr sock,
582 Function<Non_blocking_func_ret_type (Error_code*)>&& non_blocking_func,
583 Non_blocking_func_ret_type would_block_ret_val,
584 Event_set::Event_type ev_type,
585 const Fine_time_pt& wait_until,
586 Function<void (const Error_code&, Non_blocking_func_ret_type)>&& on_result)
587{
588 using boost::shared_ptr;
589 using boost::chrono::milliseconds;
590 using boost::chrono::round;
591 using boost::asio::bind_executor;
592
593 // We are in user thread U != W.
594
595 /* We create an Event_set with just the one event we care about (e.g., sock is Writable) and async_wait() for it.
596 * The handler for the latter can then perform the non-blocking operation (since the socket should be ready
597 * due to the active event). Finally, we would place the user's async handler onto the
598 * boost.asio Task_engine user provided to `sock`. on_result(), by contract, shall do that if we execute it
599 * from any context; hence when needed we simply do that below. See our doc header for a brief note on the
600 * requirements on on_result() which allow us to do that.
601 *
602 * The other cases to cover:
603 * - If reactor_pattern, do not perform the non-blocking op (user probably wants to do so in on_result();
604 * and we only inform it that they should now be able to). non_blocking_func can be garbage.
605 * - If interrupted (a-la EINTR), on_result() with the error code indicating interruption, no call
606 * to non-blocking op.
607 * - Timeout, if provided in wait_until, means we start a timer race against the async_wait(); if that
608 * wins, then stop the async_wait() and on_result() indicating timeout occurred (would-block result).
609 * It it loses, that timer is canceled at time of loss (async_wait() handler executing). */
610
611 /* Unlike for sync_op() (as of this writing), API contract demands Node exist through all of the potential
612 * background wait. Thus, `this` Node will continue existing throughout, and we can (for example) log without
613 * fear even after the wait finishes. There is a @todo in class header for making it more like sync_op() in
614 * this sense, but so far it hasn't seemed important enough to worry about it, consistency being the main
615 * reason to do it.
616 *
617 * Unlike in sync_op(), nothing is locked currently. So sock could become CLOSED between lines executing
618 * below. Remember that in below processing. */
619
620 util::Task_engine& task_engine = *(sock->async_task_engine());
621
622 // First create Event_set that'll await our one socket+status of interest.
623
624 Error_code err_code;
625 const Event_set::Ptr event_set = event_set_create(&err_code);
626 if (!event_set)
627 {
628 on_result(err_code, would_block_ret_val); // It post()s user's originally-passed-in handler.
629 // E.g., err_code might indicate Node is not running().
630 return;
631 }
632 // else event_set ready. In particular this means running() is true (it cannot become false up to ~Node()).
633
634 FLOW_LOG_TRACE("Begin async op (identified by Event_set [" << event_set << "]) of type [" << ev_type << "] on "
635 "object [" << sock << "].");
636
637 // Caution: remember event_set must be destroyed when async_op() finishes, synchronously or asynchronously.
638
639 // We care about just this event of type ev_type on this socket. Upcast is required for our use of boost::any().
640 if (!(event_set->add_wanted_socket<Base_socket>(Base_socket::ptr_cast(sock), ev_type, &err_code)))
641 {
642 on_result(err_code, would_block_ret_val); // It post()s user's originally-passed-in handler.
643 // Node must have shut down or something. Pretty weird. Code already logged.
644 return;
645 }
646 // else go ahead and asynchronously wait.
647
648 /* Timeout might be finite or infinite (non-existent). Latter case is much simpler, but for brevity we mix the code
649 * paths of the two cases. */
650 const bool timeout_given = wait_until != Fine_time_pt();
651
652 /* Explanation of why Strand is used / locking discussion:
653 *
654 * In the timeout case, we will be racing between a timer and the async_wait(). Their respective callbacks need
655 * to go on some Task_engine. The 2 services around are superclass Node's thread W event loop;
656 * and the user-provided one task_engine. Thread W is not suitable for 2 reasons: (1) in terms of clean design,
657 * it's dirty, as we'd be getting into the superclass's dirty laundry -- asio::Node should be a clean addendum
658 * to net_flow::node, for basic OOP hygiene; (2) thread W is not allowed to make external net_flow API calls itself;
659 * most code assumes thread U != W. So that leaves user-provided task_engine. Using it is ideal, because
660 * typically our internally placed handler will soon execute on_handler(), which we advertised is post()ed on
661 * task_engine. Only one subtle caveat is introduced: conceptually one handler must win the race against the
662 * other, meaning execute serially before it -- for synchronization. But post()ed handlers are only guaranteed
663 * to execute serially if the Task_engine is run()ning in one thread: user is allowed by boost.asio to run it from 2+
664 * (distributing tasks across threads). We have 2 choices to deal make them serial anyway: use explicit
665 * synchronization, or use Task_engine::strand to have Task_engine guarantee they can only run serially (choosing
666 * thread accordingly if applicable -- probably internally by using a mutex anyway). I use strand for a few reasons.
667 * One, it's a bit easier to use-or-not-use a strand depending on a Boolean, as opposed to use-or-not-use a mutex;
668 * in particular conditionally creating or not creating a strand seems a bit prettier than same with a mutex.
669 * Two, conceivably the strand mechanism will allow Task_engine to use a thread normally blocked by a mutex lock wait
670 * for something else, maybe another unrelated task might run instead.
671 *
672 * There is one more synchronization challenge. 2+ async_*() ops can run simultaneously on one socket object.
673 * We access no shared data in such a case except for the short moment when we actually run the non-blocking op, when
674 * events are ready simultanously for the 2+ async ops. No mutex is needed, however, because net_flow::*_socket
675 * are advertised as thread-safe for simultaneous method calls on one object. */
676 using Strand = util::Strand;
677
678 // This stuff is unused if !timeout_given.
679 struct Timeout_state // Collection of state allocated on heap until the (subject-to-timeout) op completes.
680 {
682 Strand m_make_serial;
683 Timeout_state(util::Task_engine& svc) : m_make_serial(svc) {}
684 };
685 using Timeout_state_ptr = shared_ptr<Timeout_state>; // Use a ref-counted pointer to ensure it's around until done.
686 Timeout_state_ptr timeout_state;
687
688 if (timeout_given)
689 {
690 if (wait_until <= Fine_clock::now()) // A little optimization of a corner case; why not?
691 {
692 on_result(error::Code::S_WAIT_USER_TIMEOUT, would_block_ret_val); // post()s user's originally-passed-in handler.
693 // Same as if wait actually occurred and timed out.
694 return;
695 }
696 // else
697
698 FLOW_LOG_TRACE("Timeout timer begin for async op [" << event_set << "] in "
699 "period [" << round<milliseconds>(wait_until - Fine_clock::now()) << "].");
700
701 // Start one of the racers: the timer that'll fire once timeout is finished.
702
703 // All the stuff needed by timeout can now be created (if !timeout_given, we save resources by not doing this).
704 timeout_state.reset(new Timeout_state(task_engine));
705
706 // Performance note: cannot move(on_result) here, as we still need on_result for 2nd closure made below. Copy.
707 timeout_state->sched_task
708 = util::schedule_task_at(get_logger(), wait_until,
709 true, // Single-threaded!
710 &task_engine,
711 bind_executor(timeout_state->m_make_serial,
712 [this, timeout_state, on_result, would_block_ret_val, event_set]
713 (bool)
714 {
715 // We are in thread V != W. V may or may not be U. V is any thread where user's Task_engine is run()ning.
716
717 FLOW_LOG_TRACE("[User event loop] "
718 "Timeout fired for async op [" << event_set << "]; clean up and report to user.");
719
720 Error_code dummy_prevents_throw;
721 event_set->async_wait_finish(&dummy_prevents_throw);
722 event_set->close(&dummy_prevents_throw);
723
724 FLOW_LOG_TRACE("[User event loop] User handler execution begins for async op [" << event_set << "].");
725 on_result(error::Code::S_WAIT_USER_TIMEOUT, would_block_ret_val); // post()s user's originally-passed-in handler.
726
727 FLOW_LOG_TRACE("[User event loop] User handler execution ends for async op [" << event_set << "].");
728 })); // timer.async_wait() callback.
729 } // if (timeout_given)
730
731 // If there's a timeout, timer started for it. Now prepare the mandatory Event_set::async_wait() for readiness.
732
733 /* This will run as post()ed task on user's given target Task_engine.
734 * Caution! non_blocking_func is potentially destroyed (by move()) by the following assignment.
735 * Caution! Same with on_result. We don't need on_result for any more closures below, so this is OK. */
736 auto on_async_wait_user_loop
737 = [this, sock, timeout_state, would_block_ret_val, ev_type, wait_until, event_set,
738 non_blocking_func = std::move(non_blocking_func),
739 on_result = std::move(on_result)]
740 (bool interrupted)
741 /* The two Function<>s may be move()d themselves inside, so those closure type members cannot be const.
742 * Caution! This pattern also makes on_async_wait_user_loop() a one-time-use closure. */
743 mutable
744 {
745 // We are in thread V != W. V may or may not be U. V is any thread where user's Task_engine is run()ning.
746
747 // Only watching for one event, so either interrupted is true, or the event is active. Can thus clean this now.
748 Error_code dummy_prevents_throw;
749 event_set->close(&dummy_prevents_throw);
750
751 if (timeout_state
752 &&
753 // We were racing timeout timer, and we may have won. Check if indeed we won.
754 (!util::scheduled_task_cancel(get_logger(), timeout_state->sched_task)))
755 {
756 /* Couldn't cancel it, so it *will* have run very soon. We lost.
757 * I didn't burrow in hard to see if this is really possible, but conceptually why not? Be ready for it. */
758 FLOW_LOG_INFO("[User event loop] "
759 "Events-ready in async op [" << event_set << "], but timeout already expired recently. "
760 "Interesting timing coincidence.");
761 return;
762 }
763 // else
764
765 // There is no timeout, or there is but has not expired before us. Is socket active, or were we interrupted?
766
767 if (interrupted)
768 {
769 // Conceptually like EINTR. Log and done.
770 Error_code err_code_val;
771 const auto err_code = &err_code_val;
773
774 FLOW_LOG_TRACE("[User event loop] "
775 "Events-ready in async op [" << event_set << "]; user handler execution begins.");
776 on_result(err_code_val, would_block_ret_val); // It post()s user's originally-passed-in handler.
777 }
778 else
779 {
780 assert(!interrupted);
781
782 /* OK. async_wait() reports event is ready (sock is active, e.g., Writable). Try to perform
783 * non-blocking operation (e.g., Peer_socket::send()). Another async_op() may asynchronously run a
784 * non-blocking op at the same time from different thread, but no mutex needed as explained in long-ish
785 * comment above. */
786
787 Non_blocking_func_ret_type op_result;
788 Error_code op_err_code;
789 const bool reactor_pattern = non_blocking_func.empty(); // How this mode is indicated.
790
791 if (reactor_pattern)
792 {
793 // They want to presumably perform the op themselves in handler. Skip; would-block-no-error as advertised.
794 assert(!op_err_code);
795
796 FLOW_LOG_TRACE("[User event loop] "
797 "Events-ready in async op [" << event_set << "]; reactor pattern mode on; "
798 "user handler execution begins.");
799 on_result(op_err_code, would_block_ret_val); // It post()s user's originally-passed-in handler.
800 }
801 else
802 {
803 assert(!reactor_pattern);
804
805 FLOW_LOG_TRACE("[User event loop] "
806 "Events-ready in async op [" << event_set << "]; reactor pattern mode off; "
807 "executing non-blocking operation now.");
808 op_result = non_blocking_func(&op_err_code); // send(), receive(), accept(), etc.
809
810 if (op_err_code)
811 {
812 // Any number of errors possible here; error on socket => socket is active.
813
814 FLOW_LOG_TRACE("[User event loop] "
815 "Error observed in instant op; code [" << op_err_code << '/' << op_err_code.message() << "]; "
816 "in async op [" << event_set << "]; "
817 "user handler execution begins.");
818 on_result(op_err_code, would_block_ret_val); // It post()s user's originally-passed-in handler.
819 }
820 else
821 {
822 assert(!op_err_code);
823 // No error, but did we get a result, or is it would-block?
824
825 if (op_result == would_block_ret_val)
826 {
827 FLOW_LOG_TRACE("[User event loop] "
828 "Instant op yielded would-block despite events-ready "
829 "in async op [" << event_set << "]; are there concurrent competing operations? "
830 "Trying async op again.");
831 assert(!reactor_pattern); // We can't know whether it worked in reactor_pattern mode. User finds out.
832
833 /* If timeout_given, then effectively this reduces the time left in timeout and tries again
834 * (time has passed; wait_until has remained constant).
835 * If !timeout_given, then this will just keep trying forever, until the jerk in another thread, or
836 * wherever, taking our stuff even when we get active events, stops jerking us around. */
838 <Socket, Base_socket, Non_blocking_func_ret_type>
839 (sock, std::move(non_blocking_func),
840 would_block_ret_val, ev_type, wait_until, std::move(on_result));
841 // on_result, non_blocking_func are now potentially destroyed, but we don't need them anymore:
842 return;
843 }
844 // else got actual result!
845
846 FLOW_LOG_TRACE("[User event loop] "
847 "Instant op yielded positive result [" << op_result << "] "
848 "in async op [" << event_set << "]; "
849 "user handler execution begins.");
850
851 assert(op_result != would_block_ret_val);
852 assert(!op_err_code);
853 on_result(op_err_code, op_result); // It post()s user's originally-passed-in handler.
854 } // else if (!op_err_code)
855 } // else if (!reactor_pattern)
856 }
857
858 // Note all paths lead here, except the `return;` above for when we try again.
859 FLOW_LOG_TRACE("[User event loop] User handler execution ends for async op [" << event_set << "].");
860 }; // on_async_wait_user_loop =
861
862 /* Now begin the wait for readiness.
863 * Aside: There are strong reasons not to do anything but post(), directly in the thread W-executed handler, and
864 * they are discussed above. However, in any case, note that async_wait() API doc says handler
865 * must not itself call anything related to the entire Node from which the Event_set comes.
866 * Our choosing to post() elsewhere is a very vanilla thing to do in an async_wait() handler per its docs. */
867
868 if (timeout_given)
869 {
870 // As above, `mutable` so that the captured closure can be move()d inside (no more than once ever).
871 event_set->async_wait([sock, timeout_state,
872 on_async_wait_user_loop = std::move(on_async_wait_user_loop)]
873 (bool interrupted) mutable
874 {
875 /* We are in thread W. Put the handler into user's event loop, as discussed, but make it serial with timeout.
876 * Note: if Task_engine E and Strand S(E), then post(E, b_e(S, f)) is an anti-pattern for subtle reasons, though
877 * in our case "tie-breaking" order doesn't matter, hence at worst it would only be slower. Still, use the proper
878 * pattern: post(S, f). Note on note: <some async op on I/O object associated with E>(..., b_e(S, f)) is just
879 * fine and in fact we do that elsewhere in this function. */
880 post(timeout_state->m_make_serial, [interrupted,
881 on_async_wait_user_loop = std::move(on_async_wait_user_loop)]
882 () mutable
883 {
884 on_async_wait_user_loop(interrupted); // (This call mutates on_async_wait_user_loop closure *itself*.)
885 });
886 // BTW strand docs say strand need only exist through the post() call itself. So this async path is done w/ it.
887 });
888 }
889 else
890 {
891 /* Simpler case; no race; so no strand to use. Keeping comments light.
892 * @todo Subtlety: There's no race, so in effect on_async_wait_user_loop() does some non-blocking stuff and
893 * executes on_result() -- which actually post()s, through the proper executor, onto *async_task_engine().
894 * If Event_set::async_wait() had a variant (maybe asio::Event_set::async_wait()?) that is a boost.asio-compatible
895 * async-op, in that it respects any associated executor for its completion handler, then it would be arguably
896 * a worth-while goal (for perf/to reduce asynchronicity) to grab the associated executor from the user's
897 * original completion handler and apply it to the arg given to the async_wait(). The to-do is fairly ambitious,
898 * definitely, but I (ygoldfel) leave it in as food for thought. Note, though, that I am not suggesting
899 * we do it in the timeout-race case above, as that involves scheduling a parallel timer... the implications of
900 * letting their executor (strand, or who knows what else) control our internal algorithm -- too risky/unclear. */
901 event_set->async_wait([sock, on_async_wait_user_loop = std::move(on_async_wait_user_loop)]
902 (bool interrupted) mutable
903 {
904 // No need to worry about making it serial with anything else.
905 post(*sock->async_task_engine(), [interrupted,
906 on_async_wait_user_loop = std::move(on_async_wait_user_loop)]
907 () mutable
908 {
909 on_async_wait_user_loop(interrupted);
910 });
911 });
912 }
913} // Node::async_op()
914
915template<typename Rep, typename Period, typename Handler>
917 const Handler& on_result,
918 const boost::chrono::duration<Rep, Period>& max_wait,
919 const Peer_socket_options* opts)
920{
921 using boost::asio::buffer;
922
923 async_connect_impl(to, max_wait,
925 opts,
926 handler_func(on_result));
927}
928
929template<typename Handler>
931 const Handler& on_result,
932 const Peer_socket_options* opts)
933{
934 using boost::asio::buffer;
935
936 async_connect_impl(to, Fine_duration::max(),
938 opts,
939 handler_func(on_result));
940}
941
942template<typename Rep, typename Period, typename Handler>
944 const Handler& on_result,
945 const boost::chrono::duration<Rep, Period>& max_wait,
946 const boost::asio::const_buffer& serialized_metadata,
947 const Peer_socket_options* opts)
948{
949 async_connect_impl(to, max_wait, serialized_metadata,
950 opts,
951 handler_func(on_result));
952}
953
954template<typename Handler>
956 const Handler& on_result,
957 const boost::asio::const_buffer& serialized_metadata,
958 const Peer_socket_options* opts)
959{
960 async_connect_impl(to, Fine_duration::max(), serialized_metadata,
961 opts,
962 handler_func(on_result));
963}
964
965template<typename Handler>
967{
968 using boost::asio::post;
969 using boost::asio::bind_executor;
970 using boost::asio::get_associated_executor;
971
972 /* This mirrors Peer_socket::handler_func() exactly (just different signature on on_result()). Comments light.
973 * @todo Maybe there's a way to generalize this with template param-packs or something. */
974
975 return [this, on_result = std::move(on_result)]
976 (const Error_code& err_code, Peer_socket::Ptr new_sock)
977 mutable
978 {
979 const auto executor = get_associated_executor(on_result);
980 post(*(async_task_engine()),
981 bind_executor(executor,
982 [err_code, new_sock, on_result = std::move(on_result)]
983 {
984 on_result(err_code, new_sock);
985 }));
986 };
987} // Node::handler_func()
988
989} // namespace flow::net_flow::asio
Logger * get_logger() const
Returns the stored Logger pointer, particularly as many FLOW_LOG_*() macros expect.
Definition: log.cpp:224
Interface that the user should implement, passing the implementing Logger into logging classes (Flow'...
Definition: log.hpp:1291
Event_type
Type of event or condition of interest supported by class Event_set.
Definition: event_set.hpp:307
Objects of this class can be fed to Node to make it internally simulate network conditions like loss,...
An object of this class is a single Flow-protocol networking node, in the sense that: (1) it has a di...
Definition: node.hpp:937
static const uint8_t S_DEFAULT_CONN_METADATA
Type and value to supply as user-supplied metadata in SYN, if user chooses to use [[a]sync_]connect()...
Definition: node.hpp:1403
Event_set::Ptr event_set_create(Error_code *err_code=0)
Creates a new Event_set in Event_set::State::S_INACTIVE state with no sockets/events stored; returns ...
Definition: event_set.cpp:881
A peer (non-server) socket operating over the Flow network protocol, with optional stream-of-bytes an...
A server socket able to listen on a single Flow port for incoming connections and return peer sockets...
A subclass of net_flow::Node that adds the ability to easily and directly use net_flow sockets in gen...
Definition: node.hpp:236
void async_op(typename Socket::Ptr sock, Function< Non_blocking_func_ret_type(Error_code *)> &&non_blocking_func, Non_blocking_func_ret_type would_block_ret_val, Event_set::Event_type ev_type, const Fine_time_pt &wait_until, Function< void(const Error_code &, Non_blocking_func_ret_type)> &&on_result)
Implementation of core asynchronous transfer methods, namely asio::Peer_socket::async_send(),...
Definition: node.hpp:581
void set_async_task_engine(util::Task_engine *target_async_task_engine)
Overwrites the value to be returned by next async_task_engine().
Definition: node.cpp:46
net_flow::Peer_socket * sock_create(const Peer_socket_options &opts) override
Implements superclass API.
util::Task_engine * m_target_task_engine
See async_task_engine().
Definition: node.hpp:575
net_flow::Server_socket * serv_create(const Peer_socket_options *child_sock_opts) override
Implements superclass API.
util::Task_engine * async_task_engine()
Pointer (possibly null) for the flow::util::Task_engine used by any coming async I/O calls and inheri...
Definition: node.cpp:40
Node(log::Logger *logger, util::Task_engine *target_async_task_engine, const util::Udp_endpoint &low_lvl_endpoint, Net_env_simulator *net_env_sim=0, Error_code *err_code=0, const Node_options &opts=Node_options())
Constructs Node.
Definition: node.cpp:29
void async_connect(const Remote_endpoint &to, const Handler &on_result, const boost::chrono::duration< Rep, Period > &max_wait, const Peer_socket_options *opts=0)
The boost.asio asynchronous version of sync_connect(), performing any necessary wait and connection i...
Definition: node.hpp:916
Handler_func handler_func(Handler &&on_result)
Returns a functor that essentially performs post() on_result onto *async_task_engine() in a way suita...
Definition: node.hpp:966
void async_connect_impl(const Remote_endpoint &to, const Fine_duration &max_wait, const boost::asio::const_buffer &serialized_metadata, const Peer_socket_options *opts, Handler_func &&on_result)
Implementation core of async_connect*() that gets rid of templated or missing arguments thereof.
void async_connect_with_metadata(const Remote_endpoint &to, const Handler &on_result, const boost::chrono::duration< Rep, Period > &max_wait, const boost::asio::const_buffer &serialized_metadata, const Peer_socket_options *opts=0)
A combination of async_connect() and connect_with_metadata() (asynchronously blocking connect,...
Definition: node.hpp:943
A net_flow::Peer_socket that adds integration with boost.asio.
Definition: peer_socket.hpp:41
boost::shared_ptr< Peer_socket > Ptr
Short-hand for shared_ptr to Peer_socket.
Definition: peer_socket.hpp:46
A net_flow::Server_socket that adds integration with boost.asio.
boost::shared_ptr< Event_set > Ptr
Short-hand for ref-counted pointer to mutable values of type Target_type::element_type (a-la T*).
#define FLOW_ERROR_EMIT_ERROR(ARG_val)
Sets *err_code to ARG_val and logs a warning about the error using FLOW_LOG_WARNING().
Definition: error.hpp:202
#define FLOW_LOG_INFO(ARG_stream_fragment)
Logs an INFO message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
Definition: log.hpp:197
#define FLOW_LOG_TRACE(ARG_stream_fragment)
Logs a TRACE message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
Definition: log.hpp:227
Contains classes that add boost.asio integration to the main Flow-protocol classes such as net_flow::...
Definition: node.cpp:25
@ S_WAIT_USER_TIMEOUT
A blocking (sync_) or background-blocking (async_) operation timed out versus user-supplied time limi...
@ S_WAIT_INTERRUPTED
A blocking (sync_) or background-blocking (async_) operation was interrupted, such as by a signal.
boost::shared_ptr< Scheduled_task_handle_state > Scheduled_task_handle
Black-box type that represents a handle to a scheduled task as scheduled by schedule_task_at() or sch...
Task_engine::strand Strand
Short-hand for boost.asio strand, an ancillary class that works with Task_engine for advanced task sc...
Definition: util_fwd.hpp:138
Scheduled_task_handle schedule_task_at(log::Logger *logger_ptr, const Fine_time_pt &at, bool single_threaded, Task_engine *task_engine, Scheduled_task_handler &&task_body_moved)
Identical to schedule_task_from_now() except the time is specified in absolute terms.
Definition: sched_task.hpp:245
bool scheduled_task_cancel(log::Logger *logger_ptr, Scheduled_task_handle task)
Attempts to prevent the execution of a previously scheduled (by schedule_task_from_now() or similar) ...
Definition: sched_task.cpp:26
boost::asio::io_service Task_engine
Short-hand for boost.asio event service, the central class of boost.asio.
Definition: util_fwd.hpp:135
boost::asio::ip::udp::endpoint Udp_endpoint
Short-hand for the UDP endpoint (IP/port) type.
Definition: util_fwd.hpp:208
boost::system::error_code Error_code
Short-hand for a boost.system error code (which basically encapsulates an integer/enum error code and...
Definition: common.hpp:502
Fine_clock::duration Fine_duration
A high-res time duration as computed from two Fine_time_pts.
Definition: common.hpp:410
Fine_clock::time_point Fine_time_pt
A high-res time point as returned by Fine_clock::now() and suitable for precise time math in general.
Definition: common.hpp:407
A set of low-level options affecting a single Flow Node, including Peer_socket objects and other obje...
Definition: options.hpp:449
A set of low-level options affecting a single Peer_socket.
Definition: options.hpp:36
Represents the remote endpoint of a Flow-protocol connection; identifies the UDP endpoint of the remo...
Definition: endpoint.hpp:93