Flow-IPC 1.0.1
Flow-IPC project: Full implementation reference.
native_socket_stream_acceptor.hpp
Go to the documentation of this file.
1/* Flow-IPC: Core
2 * Copyright 2023 Akamai Technologies, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the
5 * "License"); you may not use this file except in
6 * compliance with the License. You may obtain a copy
7 * of the License at
8 *
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in
12 * writing, software distributed under the License is
13 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
14 * CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing
16 * permissions and limitations under the License. */
17
18/// @file
19#pragma once
20
25#include <flow/async/single_thread_task_loop.hpp>
26#include <queue>
27#include <utility>
28
29namespace ipc::transport
30{
31// Types.
32
33/**
34 * A server object that binds to a `Shared_name` and listens for incoming `Native_socket_stream` connect
35 * attempts to that name; and yields connected-peer sync_io::Native_socket_stream objects, one per counterpart
36 * opposing `*_connect()`.
37 *
38 * @note The method is async_accept(); and it yields a PEER-state sync_io::Native_socket_stream object.
39 * Such an object is called a `sync_io` *core*. It is often more convenient to work with an async-I/O-pattern
40 * object of type Native_socket_stream -- it'll perform work in the background without intervention, etc.
41 * To get this simply do: `Native_socket_stream async_x(std::move(sync_x))`, where
42 * `sync_x` is the aforementioned PEER-state object of type `sync_io::Native_socket_stream`.
43 * (`sync_x` then becomes as-if-default-constructed again. You can even async_accept() into it again.)
44 * @note Depending on your context you may also bundle `sync_x` into Channel `sync_c` -- then create an
45 * async-I/O-pattern Channel via: `auto async_c = sync_c.async_io_obj()`.
46 *
47 * @see Native_socket_stream doc header.
48 * @see Native_socket_stream::sync_connect() doc header.
49 *
50 * This object is straightforward to use, and really the only difficulty comes from (1) choosing a `Shared_name` and
51 * (2) the other side knowing that name. Before deciding to use it and how to use it, it is essential to read
52 * the "How to use" section of Native_socket_stream doc header. It discusses when to use this, versus an easier
53 * (name-free) way to yield a connected-peer Native_socket_stream. As you'll see, the only difficulty in using the
54 * latter is that it does require a *one*-time use of Native_socket_stream_acceptor after all. However
55 * ipc::session takes care of that internally -- so you would not need to set up a Native_socket_stream_acceptor
56 * after all.
57 *
58 * So all in all:
59 * - If you've got ipc::session stuff, you don't need Native_socket_stream_acceptor: you can just open
60 * a Channel with a native-handles pipe; it'll contain a PEER-state Native_socket_stream.
61 * - If you are operating outside ipc::session then this guy here will let you set up a client/server mechanism
62 * for `Native_socket_stream`s.
63 *
64 * @todo At the moment, *if* one decides to use a Native_socket_stream_acceptor directly -- not really necessary
65 * given ipc::session `Channel`-opening capabilities -- the the user must come up with their own naming scheme
66 * that avoids name clashes; we could supply an ipc::session-facilitated system for providing this service instead.
67 * I.e., ipc::session could either expose a facility for generating the `Shared_name absolute_name` arg to
68 * the Native_socket_stream_acceptor ctor (and opposing Native_socket_stream::sync_connect() call). Alternatively
69 * it could provide some kind of Native_socket_stream_acceptor factory and corresponding opposing facility.
70 * Long story short, within the ipc::session way of life literally only one acceptor exists, and it is set up
71 * (and named) internally to ipc::session. We could provide a way to facilitate the creation of more
72 * acceptors if desired by helping to choose their `Shared_name`s. (An original "paper" design did specify
73 * a naming scheme for this.)
74 *
75 * @internal
76 * ### Implementation ###
77 * It is fairly self-explanatory. Historic note: I (ygoldfel) wrote this after the far more complex
78 * Native_socket_stream. We do use the same concepts here: The ctor immediately starts a thread that reads off
79 * all incoming connects ASAP. There is a surplus queue of such connects; and a deficit queue of pending
80 * async_accept() requests from user. A steady state is maintained of either an empty surplus and possibly non-empty
81 * deficit; or vice versa. However there are no heavy-weight buffers involved, so copy avoidance is not a factor --
82 * it is pretty simple. Also there is no outgoing-direction pipe which further cuts complexity in at least half.
83 *
84 * Later, after the `sync_io` pattern was introduced, Native_socket_stream was split into
85 * Native_socket_stream (identical API and semantics) and `sync_io` core sync_io::Native_socket_stream; and
86 * the latter's core logic was moved to reused-elsewhere sync-to-async adapters sync_io::Async_adapter_sender
87 * and sync_io::Async_adapter_receiver. Furthermore (and this is more relevant here) internally
88 * the latter guy was changed *away* from reading as many in-messages as were available in the kernel receive buffer --
89 * even when no `async_receive_*()` user requests were pending. (Before the change: Such "cached" messages would
90 * need to be copied and stored until the user requests came -- a/k/a the *surplus* was resolved.) So that was
91 * changed; it would only read enough messages to satisfy the currently outstanding `async_receive_*()`s. The
92 * rest would be left in the kernel buffer. In other words there could be a *deficit* still; but there could never
93 * be a *surplus* in that changed (internal) design.
94 *
95 * Now, despite that change, Native_socket_stream_acceptor was left unchanged. It still, internally, accepts
96 * as many as are available; and caches a surplus internally if one occurs (until async_accept() x N come in to
97 * balance it iout). The rationale? Well, it was just a good feature. There's no copying (of anything sizable)
98 * involved, and it seemed like a decent idea to not leave handles languishing in some kernel queue. Essentially
99 * the reasoning for that change inside Native_socket_stream (and Blob_stream_mq_receiver) -- well outside our scope
100 * here -- simply did not apply to Native_socket_stream_acceptor.
101 */
103 public flow::log::Log_context,
104 private boost::noncopyable
105{
106public:
107 // Types.
108
109 /// Short-hand for type of target peer-socket objects targeted by async_accept().
111
112 /// Useful for generic programming, the `sync_io`-pattern counterpart to `*this` type.
114 /// You may disregard.
116
117 // Constants.
118
119 /// `Shared_name` relative-folder fragment (no separators) identifying this resource type.
121
122 // Constructors/destructor.
123
124 /**
125 * Creates the Native_socket_stream_acceptor and immediately begins listening in the background, so that other
126 * process(es) can connect to it -- at the specified name -- once the constructor returns successfully.
127 * The operation may fail; see `err_code` arg for how to detect this (either exception or via code return;
128 * your choice). An error will be logged on failure.
129 *
130 * On success, opposing processes can attempt Native_socket_stream::sync_connect() (or
131 * sync_io::Native_socket_stream::sync_connect()) which will quickly succeed
132 * yielding an opposing #Peer which will be connected. On this side, async_accept() is used to grab
133 * local peer #Peer. The connection need not have an async_accept() pending to complete connection
134 * as observed by the opposing process.
135 *
136 * Assuming this ctor succeeds, further background operation may detect an unrecoverable error. If this occurs,
137 * it will occur exactly once, and it is guaranteed no connections will be accepted subsequently; `async_accept()`s
138 * beyond that point (once all successfully queued-up connects, if any, are exhausted) will fail with that error.
139 *
140 * `absolute_name` should be considered carefully. It cannot clash with another acceptor in existence, and it
141 * might even be a mediocre idea to clash with one that *has* recently existed. Plus the opposing
142 * Native_socket_stream (or sync_io::Native_socket_stream) needs to know `absolute_name`.
143 *
144 * ### Rationale/context ###
145 * An Native_socket_stream_acceptor is unlikely to be used except by ipc::session, internally, in any case.
146 * Once a session is established, one is ~certain to use ipc::transport::Channel to establish connections, and that
147 * intentionally avoids Native_socket_stream_acceptor, because then one need not worry about this thorny naming issue.
148 * Nevertheless we provide Native_socket_stream_acceptor as a public API, in case it's generally useful.
149 *
150 * @param logger_ptr
151 * Logger to use for subsequently logging.
152 * @param absolute_name
153 * The absolute name at which to bind and listen for connections.
154 * @param err_code
155 * See `flow::Error_code` docs for error reporting semantics. #Error_code generated:
156 * `boost::asio::error::invalid_argument` (Native_socket_stream_acceptor failed to initialize specifically:
157 * because the given or computed address/name ended up too long to fit into natively-mandated data structures;
158 * or because there are invalid characters therein, most likely forward-slash),
159 * `boost::asio::error::address_in_use` (Native_socket_stream_acceptor failed to initialize due
160 * to a name clash), possibly other system codes (Native_socket_stream_acceptor failed to initialize for some
161 * other reason we could not predict here, but whatever it was was logged).
162 */
163 explicit Native_socket_stream_acceptor(flow::log::Logger* logger_ptr, const Shared_name& absolute_name,
164 Error_code* err_code = 0);
165
166 /**
167 * Destroys this acceptor which will stop listening in the background and cancel any pending
168 * completion handlers by invoking them ASAP with error::Code::S_OBJECT_SHUTDOWN_ABORTED_COMPLETION_HANDLER.
169 *
170 * You must not call this from directly within a completion handler; else undefined behavior.
171 *
172 * Each pending completion handler will be called from an unspecified thread that is not the calling thread.
173 * Any associated captured state for that handler will be freed shortly after the handler returns.
174 *
175 * We informally but very strongly recommend that your completion handler immediately return if the `Error_code`
176 * passed to it is error::Code::S_OBJECT_SHUTDOWN_ABORTED_COMPLETION_HANDLER. This is similar to what one should
177 * do when using boost.asio and receiving the conceptually identical `operation_aborted` error code to an
178 * `async_...()` completion handler. In both cases, this condition means, "we have decided to shut this thing down,
179 * so the completion handlers are simply being informed of this."
180 */
182
183 // Methods.
184
185 /**
186 * Returns the full name/address to which the constructor bound, or attempted to bind, the listening socket.
187 *
188 * @return See above.
189 */
190 const Shared_name& absolute_name() const;
191
192 /**
193 * Asynchronously awaits for a peer connection to be established and calls `on_done_func()`,
194 * once the connection occurs, or an error occurs, in the former case move-assigning a PEER-state
195 * Native_socket_stream object to the passed-in Native_socket_stream `*target_peer`.
196 * `on_done_func(Error_code())` is called on success. `on_done_func(E)`, where `E` is a non-success
197 * error code, is called otherwise. In the latter case `*this` has met an unrecoverable error and should
198 * be shut down via the destructor, as no further `async_accept()`s
199 * will succeed (they'll quickly yield the same error).
200 *
201 * Multiple async_accept() calls can be queued while no connection is pending;
202 * they will grab incoming connections in FIFO fashion as they arrive.
203 *
204 * The aforementioned #Peer generated and move-assigned to `*target_peer` on success shall
205 * inherit `this->get_logger()` as its `->get_logger()`; and its sync_op::Native_socket_stream::nickname() shall be
206 * something descriptive.
207 *
208 * `on_done_func()` shall be called from some unspecified thread, not the calling thread, but never concurrently with
209 * other such completion handlers. Your implementation must be non-blocking. Informally we recommend it place the
210 * true on-event logic onto some task loop of your own; so ideally it would consist of essentially a single `post(F)`
211 * statement of some kind. There are certainly reasons to not follow this recommendation, though, in some use cases.
212 *
213 * `on_done_func()` *will* be called; at the latest when the destructor is invoked (see below).
214 *
215 * You may call this from directly within a completion handler. Handlers will still always be called
216 * non-concurrently, and a handler will never be called from within a handler (so it is safe, e.g., to bracket
217 * your handler with a non-recursive mutex lock).
218 *
219 * #Error_code generated and passed to `on_done_func()`:
220 * error::Code::S_OBJECT_SHUTDOWN_ABORTED_COMPLETION_HANDLER (destructor called, canceling all pending ops;
221 * spiritually identical to `boost::asio::error::operation_aborted`),
222 * other system codes most likely from `boost::asio::error` or `boost::system::errc` (but never
223 * would-block), indicating the underlying transport is hosed for that specific reason.
224 *
225 * @tparam Task_err
226 * Handler type matching signature of `flow::async::Task_asio_err`.
227 * @param target_peer
228 * Pointer to sync_io::Native_socket_stream which shall be assigned a PEER-state (connected) as
229 * `on_done_func()` is called. Not touched on error.
230 * @param on_done_func
231 * Completion handler. See above. The captured state in this function object shall be freed shortly upon
232 * its completed execution from the unspecified thread.
233 */
234 template<typename Task_err>
235 void async_accept(Peer* target_peer, Task_err&& on_done_func);
236
237private:
238 // Types.
239
240 /// Short-hand for callback called on new peer-to-peer connection; or on unrecoverable error.
241 using On_peer_accepted_func = flow::async::Task_asio_err;
242
243 /// Short-hand for internally stored PEER-state sync_io::Native_socket_stream in #m_pending_results_q.
244 using Peer_ptr = boost::movelib::unique_ptr<Peer>;
245
246 /**
247 * Data store representing a deficit user async-accept request that had to be saved due to lacking surplus of
248 * finalized peer socket handles. Essentially this stores args to async_accept() which is an
249 * async-operating method. We store them in queue via #Ptr.
250 */
252 {
253 /// Short-hand for `unique_ptr` to this.
254 using Ptr = boost::movelib::unique_ptr<User_request>;
255
256 // Data.
257
258 /// See Native_socket_stream_acceptor::async_accept() `target_peer`.
260
261 /// See Native_socket_stream_acceptor::async_accept() `on_done_func`.
263 }; // struct User_request
264
265 // Methods.
266
267 /**
268 * Non-template impl of async_accept().
269 *
270 * @param target_peer
271 * See async_accept().
272 * @param on_done_func
273 * See async_accept().
274 */
275 void async_accept_impl(Peer* target_peer, On_peer_accepted_func&& on_done_func);
276
277 /**
278 * Handler for incoming connection on #m_acceptor.
279 *
280 * @param sys_err_code
281 * Result code from boost.asio.
282 */
283 void on_next_peer_socket_or_error(const Error_code& sys_err_code);
284
285 /**
286 * In thread W, in steady state, introduces the just-established peer socket handle into the state machine and
287 * synchronously advances the state machine into steady state again, with the possible side effect of synchronously
288 * invoking the head waiting async-accept user request, if any. The result, if not emitted, is enqueued to
289 * #m_pending_results_q for emission to the next request if any.
290 *
291 * Pre-condition: Incoming-direction state machine is in steady state; we are in thread W;
292 * #m_next_peer_socket is finalized and not empty.
293 */
295
296 /**
297 * In thread W, in steady state *except* for an `Error_code` just pushed to the back of #m_pending_results_q and
298 * thus introduced into the state machine, synchronously advances the state machine into steady state again, with the
299 * possible side effect of synchronously invoking *all* waiting async-accept user requests, if any.
300 * In particular, #m_pending_user_requests_q becomes empty if it was not, while #m_pending_results_q is not changed;
301 * in particular the just-pushed `Error_code` remains saved to be emitted to any future async-accept user requests.
302 *
303 * It is essential to understand the pre-condition that the state machine must be in steady state, followed by
304 * exactly one modification to it: namely `Error_code` being pushed onto #m_pending_results_q. For example it is
305 * a bug for #m_pending_user_requests_q to be non-empty, if the pre-push #m_pending_results_q is
306 * non-empty also; that isn't steady state since both a deficit and a surplus
307 * were in effect before the `Error_code` was pushed. In other words: this method takes an inductive step only;
308 * it doesn't "flush" the state machine.
309 */
311
312 /**
313 * In thread W, gets back to steady state by feeding the given #Error_code (which must be the sole element in
314 * #m_pending_results_q) to all queued user requests, popping them all. The pre-condition is that doing so *would*
315 * in fact get the deficit and surplus queues collectively to steady state; and also that there *is* in fact
316 * a deficit (at least one request in #m_pending_user_requests_q).
317 *
318 * @param err_code
319 * The code to feed (must be at top of #m_pending_results_q). This is supplied as an arg for perf at most.
320 */
321 void feed_error_result_to_deficit(const Error_code& err_code);
322
323 /**
324 * In thread W, gets back to steady state by feeding the given just-connected peer socket (which must have just been
325 * popped from #m_pending_results_q) to the first queued user request, popping it.
326 *
327 * @param peer
328 * The peer stream handle to feed (must have just been popped from #m_pending_results_q).
329 */
331
332 // Data.
333
334 /// See absolute_name().
336
337 /**
338 * Queue storing deficit async-accept requests queued up due to lacking pending ready peer socket handles in
339 * #m_pending_results_q at async_accept() time. In steady state the invariant is: either #m_pending_user_requests_q
340 * is empty, or #m_pending_results_q is empty, or both are empty. If in non-steady state each has at least one
341 * handle, then any handles in #m_pending_results_q are "fed" to the callbacks in #m_pending_user_requests_q, thus
342 * removing that number of entries in each queue and emptying #m_pending_results_q.
343 *
344 * Accessed from thread W only; hence needs no locking.
345 */
346 std::queue<User_request::Ptr> m_pending_user_requests_q;
347
348 /**
349 * Queue storing surplus finalized async-accept results queued up due to lacking async_accept() requests in
350 * #m_pending_user_requests_q at connection finalization time. There are 0+ peer socket handles, capped by 0 or 1
351 * `Error_code`.
352 *
353 * Accessed from thread W only; hence needs no locking.
354 */
355 std::queue<std::variant<Peer_ptr, Error_code>> m_pending_results_q;
356
357 /**
358 * A single-threaded async task loop that starts in constructor and ends in destructor. We refer to this informally
359 * as thread W in comments.
360 *
361 * Ordering: Should be declared before #m_acceptor: It should destruct before its attached `Task_engine` does.
362 */
363 flow::async::Single_thread_task_loop m_worker;
364
365 /**
366 * Unix domain socket acceptor. It is only accessed in thread W. Assuming successful setup, it's listening
367 * continuously in thread W, via async loop #m_worker.
368 */
369 boost::movelib::unique_ptr<asio_local_stream_socket::Acceptor> m_acceptor;
370
371 /**
372 * Unix domain peer socket, always empty/unconnected while a background `m_acceptor.async_accept()` is proceeding;
373 * then (assuming a successful accept op) connected at the start to the `async_accept()` callback; then back to
374 * empty/unconnected again just before the next `async_accept()` call; and so on. Only accessed in thread W.
375 *
376 * Since there's only one thread, we can keep reusing this one target socket. When the time comes (in the callback)
377 * to pass it to the rest of the program, a "new" socket is move-constructed from it, thus making it
378 * empty/unconnected again.
379 *
380 * ### Rationale/notes ###
381 * As noted, this setup relies on move-construction. Alternatively we could have used a `shared_ptr` pattern.
382 * Stylistically it should be at least as simple/elegant. Perf-wise, as of this writing, I (ygoldfel) have not
383 * rigorously compared the two; but by definition move constructions should be close to optimal perf-wise, unless
384 * boost.asio guys willfully made theirs slow.
385 *
386 * @todo Perform a rigorous analysis of the perf and style trade-offs between move-construction-based patterns
387 * versus `shared_ptr`-based ones, possibly focusing on boost.asio socket objects in particular. */
389}; // class Native_socket_stream_acceptor
390
391// Free functions: in *_fwd.hpp.
392
393// Template implementations.
394
395template<typename Task_err>
396void Native_socket_stream_acceptor::async_accept(Peer* target_peer, Task_err&& on_done_func)
397{
398 async_accept_impl(target_peer, On_peer_accepted_func(std::move(on_done_func)));
399}
400
401} // namespace ipc::transport
A server object that binds to a Shared_name and listens for incoming Native_socket_stream connect att...
static const Shared_name & S_RESOURCE_TYPE_ID
Shared_name relative-folder fragment (no separators) identifying this resource type.
void async_accept_impl(Peer *target_peer, On_peer_accepted_func &&on_done_func)
Non-template impl of async_accept().
flow::async::Single_thread_task_loop m_worker
A single-threaded async task loop that starts in constructor and ends in destructor.
boost::movelib::unique_ptr< Peer > Peer_ptr
Short-hand for internally stored PEER-state sync_io::Native_socket_stream in m_pending_results_q.
std::queue< User_request::Ptr > m_pending_user_requests_q
Queue storing deficit async-accept requests queued up due to lacking pending ready peer socket handle...
void finalize_q_surplus_on_error()
In thread W, in steady state except for an Error_code just pushed to the back of m_pending_results_q ...
~Native_socket_stream_acceptor()
Destroys this acceptor which will stop listening in the background and cancel any pending completion ...
Native_socket_stream_acceptor(flow::log::Logger *logger_ptr, const Shared_name &absolute_name, Error_code *err_code=0)
Creates the Native_socket_stream_acceptor and immediately begins listening in the background,...
asio_local_stream_socket::Peer_socket m_next_peer_socket
Unix domain peer socket, always empty/unconnected while a background m_acceptor.async_accept() is pro...
void async_accept(Peer *target_peer, Task_err &&on_done_func)
Asynchronously awaits for a peer connection to be established and calls on_done_func(),...
void feed_error_result_to_deficit(const Error_code &err_code)
In thread W, gets back to steady state by feeding the given Error_code (which must be the sole elemen...
boost::movelib::unique_ptr< asio_local_stream_socket::Acceptor > m_acceptor
Unix domain socket acceptor.
flow::async::Task_asio_err On_peer_accepted_func
Short-hand for callback called on new peer-to-peer connection; or on unrecoverable error.
void finalize_q_surplus_on_success()
In thread W, in steady state, introduces the just-established peer socket handle into the state machi...
std::queue< std::variant< Peer_ptr, Error_code > > m_pending_results_q
Queue storing surplus finalized async-accept results queued up due to lacking async_accept() requests...
const Shared_name & absolute_name() const
Returns the full name/address to which the constructor bound, or attempted to bind,...
void feed_success_result_to_deficit(Peer_ptr &&peer)
In thread W, gets back to steady state by feeding the given just-connected peer socket (which must ha...
void on_next_peer_socket_or_error(const Error_code &sys_err_code)
Handler for incoming connection on m_acceptor.
sync_io::Native_socket_stream Sync_io_obj
Useful for generic programming, the sync_io-pattern counterpart to *this type.
Dummy type for use as a template param to Channel when either the blobs pipe or handles pipe is disab...
Definition: channel.hpp:1000
sync_io-pattern counterpart to async-I/O-pattern transport::Native_socket_stream_acceptor.
Implements both sync_io::Native_handle_sender and sync_io::Native_handle_receiver concepts by using a...
String-wrapping abstraction representing a name uniquely distinguishing a kernel-persistent entity fr...
Protocol::socket Peer_socket
Short-hand for boost.asio Unix domain peer stream-socket (usually-connected-or-empty guy).
Flow-IPC module providing transmission of structured messages and/or low-level blobs (and more) betwe...
flow::Error_code Error_code
Short-hand for flow::Error_code which is very common.
Definition: common.hpp:297
Data store representing a deficit user async-accept request that had to be saved due to lacking surpl...
boost::movelib::unique_ptr< User_request > Ptr
Short-hand for unique_ptr to this.
On_peer_accepted_func m_on_done_func
See Native_socket_stream_acceptor::async_accept() on_done_func.
Peer * m_target_peer
See Native_socket_stream_acceptor::async_accept() target_peer.