Flow-IPC 1.0.1
Flow-IPC project: Full implementation reference.
native_socket_stream_impl.cpp
Go to the documentation of this file.
1/* Flow-IPC: Core
2 * Copyright 2023 Akamai Technologies, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the
5 * "License"); you may not use this file except in
6 * compliance with the License. You may obtain a copy
7 * of the License at
8 *
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in
12 * writing, software distributed under the License is
13 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
14 * CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing
16 * permissions and limitations under the License. */
17
18/// @file
21#include "flow/error/error.hpp"
22#include <boost/move/make_unique.hpp>
23#include <cstddef>
24
25namespace ipc::transport
26{
27
28// Implementations.
29
30// General + connect-ops.
31
32// Delegated-to ctor (note the tag arg).
34 flow::log::Log_context(sync_io_core_moved.get_logger(), Log_component::S_TRANSPORT),
35
36 m_worker(boost::movelib::make_unique<flow::async::Single_thread_task_loop>
37 (sync_io_core_moved.get_logger(), sync_io_core_moved.nickname())),
38
39 // Adopt the just-cted, idle sync_io::Native_socket_stream. It may be in NULL state or PEER state.
40 m_sync_io(std::move(sync_io_core_moved))
41{
44
45 m_worker->start();
46
47 // We're using a boost.asio event loop, so we need to base the async-waited-on handles on our Task_engine.
48#ifndef NDEBUG
49 bool ok =
50#endif
51 m_sync_io.replace_event_wait_handles([this]() -> Asio_waitable_native_handle
52 { return Asio_waitable_native_handle(*(m_worker->task_engine())); });
53 assert(ok && "Did you break contract by passing-in a non-fresh sync_io core object to ctor?");
54
55 /* Delegating PEER-state ctor shall deal with m_snd_sync_io_adapter/rcv.
56 * Otherwise NULL-state ctor shall do no such thing, but a successful sync_connect() will do just that. */
57
58 FLOW_LOG_TRACE("Socket stream [" << *this << "]: Created (NULL state).");
59} // Native_socket_stream::Impl::Impl()
60
61Native_socket_stream::Impl::Impl(flow::log::Logger* logger_ptr, util::String_view nickname_str) :
62 // Create core ourselves (NULL state); then delegate to other ctor.
63 Impl(sync_io::Native_socket_stream(logger_ptr, nickname_str), nullptr)
64{
65 // Done.
66}
67
68Native_socket_stream::Impl::Impl(flow::log::Logger* logger_ptr, util::String_view nickname_str,
69 Native_handle&& native_peer_socket_moved) :
70 // Create core ourselves (in PEER state); then delegate to other ctor.
71 Impl(sync_io::Native_socket_stream(logger_ptr, nickname_str, std::move(native_peer_socket_moved)), nullptr)
72{
73 using flow::util::ostream_op_string;
74
75 // Lastly, as we're in PEER state, set up send-ops and receive-ops state machines.
76
77 const auto log_pfx = ostream_op_string("Socket stream [", *this, ']');
78 m_snd_sync_io_adapter.emplace(get_logger(), log_pfx, m_worker.get(), &m_sync_io);
79 m_rcv_sync_io_adapter.emplace(get_logger(), log_pfx, m_worker.get(), &m_sync_io);
80
81 FLOW_LOG_TRACE("Socket stream [" << *this << "]: Created (PEER state) directly from pre-opened native handle.");
82} // Native_socket_stream::Impl::Impl()
83
85 // Adopt the PEER-state core given to us by user.
86 Impl(std::move(sync_io_core_in_peer_state_moved), nullptr)
87{
88 using flow::util::ostream_op_string;
89
90 // Lastly, as we're in PEER state, set up send-ops and receive-ops state machines.
91
92 const auto log_pfx = ostream_op_string("Socket stream [", *this, ']');
93 m_snd_sync_io_adapter.emplace(get_logger(), log_pfx, m_worker.get(), &m_sync_io);
94 m_rcv_sync_io_adapter.emplace(get_logger(), log_pfx, m_worker.get(), &m_sync_io);
95
96 FLOW_LOG_TRACE("Socket stream [" << *this << "]: "
97 "Created (PEER state) by adopting fresh sync_io::Native_socket_stream core.");
98}
99
101{
102 using flow::async::Single_thread_task_loop;
103 using flow::util::ostream_op_string;
104
105 // We are in thread U. By contract in doc header, they must not call us from a completion handler (thread W).
106
107 FLOW_LOG_INFO("Socket stream [" << *this << "]: Shutting down. All our "
108 "internal async handlers will be canceled; and worker thread will be joined.");
109
110 /* This (1) stop()s the Task_engine thus possibly
111 * preventing any more handlers from running at all (any handler possibly running now is the last one to run); (2)
112 * at that point Task_engine::run() exits, hence thread W exits; (3) joins thread W (waits for it to
113 * exit); (4) returns. That's a lot, but it's non-blocking. */
114 m_worker->stop();
115 // Thread W is (synchronously!) no more.
116
117 /* As we promised in doc header(s), the destruction of *this shall cause any registered completion
118 * handlers (namely async_end_sending(), async_receive_*() ones)
119 * to be called from some unspecified thread(s) that aren't thread U, and once
120 * they've all finished, the dtor returns. So which thread should we use? Well, we always use thread W for handlers,
121 * so we should use it here too, right? Answer: Well, we can't; W is finished. Should we have kept it around,
122 * m_worker.post()ed a task that'd call them all synchronously (from thread W) and then satisfied a
123 * unique_future/promise; then here in thread U dtor awaited for this promise to be satisfied? I (ygoldfel)
124 * considered it, but that would mean first letting any other queued handlers run first as usual in boost.asio, before
125 * we'd get to the user-handler-calling task. That might work out fine, but (1) the entropy involved is a bit ???;
126 * and hence (2) it's stressful trying to reason about it. So I'd rather not. (On the other hand arguably that's
127 * equivalent to the destructor being called a little bit later during an async gap. Still, let's see if there's a
128 * clearly safer alternative.)
129 *
130 * Therefore, we'll just start a transient thread and take care of business. It is, after all, an unspecified thread
131 * that isn't thread U, as promised. Since the thread owning various m_* resources below is finished, we can
132 * access them from this new thread without issue. Perf should be fine; even if this were a heavy-weight op, we
133 * will not be called often; in any case it's at least a non-blocking op.
134 *
135 * Annoyingly, that's not all. Consider async_receive_native_handle(F). In thread W we will invoke F() on,
136 * say, successful receive. However, to avoid recursive mayhem (wherein we call user F(), user F() starts
137 * another async_receive_*() in thread W, which modifies our state while still inside our internal thread-W code,
138 * etc.): we always m_worker.post(F) by itself -- even though we are already in thread W. Now
139 * suppose our .stop() above stops/joins thread W before that post()ed thing even ran. Then the on_done_func
140 * (user handler) captured by the real-work lambda will just get thrown out; it won't be in
141 * m_rcv_sync_io_adapter.m_pending_user_requests_q anymore (popped from there before getting post()ed), so it won't
142 * get manually invoked below with S_OBJECT_SHUTDOWN_ABORTED_COMPLETION_HANDLER error. What to do? Well, the problem
143 * is not too different conceptually from the one solved using m_pending_user_requests_q though: a queue tracks the
144 * requests; it's just that in this case the queue is inside m_worker (really inside a boost.asio Task_engine), and
145 * the queued items are post()ed tasks, and the queue push op is post(). So we just want to invoke those tasks,
146 * as-if from thread W, first. This is equivalent to those tasks simply having executed just before the
147 * dtor got called. Fortunately boost.asio Task_engine has just the API for it: restart() undoes the .stop(),
148 * in that Task_engine::stopped() becomes false and allows us to call Task_engine::poll() which will
149 * *synchronously* execute any post()ed/not-yet-invoked tasks. (It's like Task_engine::run(), except it doesn't block
150 * waiting for async work to actually get done.)
151 *
152 * Anyway! Formally m_snd/rcv_sync_io_adapter requires us to do that .restart()/.poll() for the above reason.
153 * So we do it. Lastly m_snd/rcv_sync_io_adapter's dtors will do the one-off thread thing
154 * for their respective sets of pending completion handlers if any. */
155
156 FLOW_LOG_INFO("Socket stream [" << *this << "]: Continuing shutdown. Next we will run pending handlers from some "
157 "other thread. In this user thread we will await those handlers' completion and then return.");
158
159 Single_thread_task_loop one_thread(get_logger(), ostream_op_string(nickname(), "-temp_deinit"));
160 one_thread.start([&]()
161 {
162 FLOW_LOG_INFO("Socket stream [" << *this << "]: "
163 "In transient finisher thread: Shall run all pending internal handlers (typically none).");
164
165 const auto task_engine = m_worker->task_engine();
166 task_engine->restart();
167 const auto count = task_engine->poll();
168 if (count != 0)
169 {
170 FLOW_LOG_INFO("Socket stream [" << *this << "]: "
171 "In transient finisher thread: Ran [" << count << "] internal handlers after all.");
172 }
173 task_engine->stop();
174
175 /* When m_*_sync_io_adapter dtors auto-execute, they'll issue OBJECT_SHUTDOWN_ABORTED_COMPLETION_HANDLER to
176 * their handlers if any. */
177 FLOW_LOG_INFO("Transient finisher exiting. (Send-ops and receive-ops de-init may follow.)");
178 }); // one_thread.start()
179 // Here thread exits/joins synchronously. (But the adapters might run their own similar ones.)
180} // Native_socket_stream::Impl::~Impl()
181
183{
184 using flow::util::ostream_op_string;
185
186 FLOW_ERROR_EXEC_AND_THROW_ON_ERROR(bool, Native_socket_stream::Impl::sync_connect,
187 flow::util::bind_ns::cref(absolute_name), _1);
188 // ^-- Call ourselves and return if err_code is null. If got to present line, err_code is not null.
189
190 if (!m_sync_io.sync_connect(absolute_name, err_code))
191 {
192 return false; // Wasn't in NULL state.
193 }
194 // else:
195
196 if (!*err_code)
197 {
198 // PEER state! Yay! Do the thing PEER-state ctor would have done.
199 const auto log_pfx = ostream_op_string("Socket stream [", *this, ']');
200 m_snd_sync_io_adapter.emplace(get_logger(), log_pfx, m_worker.get(), &m_sync_io);
201 m_rcv_sync_io_adapter.emplace(get_logger(), log_pfx, m_worker.get(), &m_sync_io);
202 }
203 // else { Back in NULL state. Perhaps they'll sync_connect() again later. }
204
205 return true;
206} // Native_socket_stream::Impl::sync_connect()
207
210{
211 return m_sync_io.remote_peer_process_credentials(err_code);
212}
213
214const std::string& Native_socket_stream::Impl::nickname() const
215{
216 return m_sync_io.nickname();
217}
218
219std::ostream& operator<<(std::ostream& os, const Native_socket_stream::Impl& val)
220{
221 return os << '[' << val.nickname() << "]@" << static_cast<const void*>(&val);
222}
223
224// Send-ops.
225
227{
228 return m_snd_sync_io_adapter
229 ? (m_snd_sync_io_adapter->send_blob(blob, err_code), true) // It's void.
230 : false; // Not in PEER state (ditto all over the place below).
231}
232
234 Error_code* err_code)
235{
236 return m_snd_sync_io_adapter
237 ? (m_snd_sync_io_adapter->send_native_handle(hndl, meta_blob, err_code), true) // It's void.
238 : false;
239}
240
242{
243 using flow::async::Task_asio_err;
244
245 return async_end_sending(Task_asio_err());
246}
247
248bool Native_socket_stream::Impl::async_end_sending(flow::async::Task_asio_err&& on_done_func)
249{
250 return m_snd_sync_io_adapter
251 ? m_snd_sync_io_adapter->async_end_sending(std::move(on_done_func))
252 : false;
253}
254
256{
257 return m_snd_sync_io_adapter
258 ? m_snd_sync_io_adapter->auto_ping(period)
259 : false;
260}
261
263{
264 return send_blob_max_size();
265}
266
268{
269 return m_sync_io.send_blob_max_size();
270}
271
272// Receive-ops.
273
275 const util::Blob_mutable& target_meta_blob,
276 flow::async::Task_asio_err_sz&& on_done_func)
277{
278 return m_rcv_sync_io_adapter
279 ? (m_rcv_sync_io_adapter->async_receive_native_handle
280 (target_hndl, target_meta_blob, std::move(on_done_func)),
281 true) // It's void.
282 : false;
283}
284
286 flow::async::Task_asio_err_sz&& on_done_func)
287{
288 return m_rcv_sync_io_adapter
289 ? (m_rcv_sync_io_adapter->async_receive_blob(target_blob, std::move(on_done_func)), true) // It's void.
290 : false;
291}
292
294{
295 return m_rcv_sync_io_adapter
296 ? m_rcv_sync_io_adapter->idle_timer_run(timeout)
297 : false;
298}
299
301{
302 return receive_blob_max_size();
303}
304
306{
307 return m_sync_io.receive_blob_max_size();
308}
309
310#if 0 // See the declaration in class { body }; explains why `if 0` yet still here.
311sync_io::Native_socket_stream Native_socket_stream::Impl::release()
312{
315 using flow::util::ostream_op_string;
316
317 FLOW_LOG_TRACE("Socket stream [" << *this << "]: Releasing idle-state object to new socket-stream core object.");
318
319 // Keep in sync with NULL-state ctor!
320
321 assert(m_snd_sync_io_adapter && m_rcv_sync_io_adapter && "By contract must be in PEER state.");
322
323 /* The main thing is, we've got a PEER-state core m_sync_io; but it already is in non-as-if-just-cted state,
324 * because we've executed start_*_ops() on it. However m_sync_io.release() gets us that as-if-just-cted guy
325 * and makes m_sync_io as-if-default-cted.
326 *
327 * So we invoke that. */
328 auto core = m_sync_io.release();
329
330 /* Cool. That's done. Now we need to get the rest of *this into as-if-default-cted form.
331 * We need to basically repeat what a NULL-state ctor that takes null-Logger and empty nickname would do.
332 * First up is Mr. m_worker Single_thread_task_loop guy. `m_worker = make_unique<>(...)` would do it,
333 * but we can't let the existing one be destroyed until m_sync_io* ops below are finished, because they
334 * access things attached to m_worker->task_engine() before switching those things out. So save it. */
335 auto prev_worker = std::move(m_worker);
336
337 // Now repeat what ctor does.
338 m_worker
339 = boost::movelib::make_unique<flow::async::Single_thread_task_loop>
340 (core.get_logger(), core.nickname());
341 m_worker->start();
342
343 // m_sync_io is as-if-just-cted, so do what ctor does regarding .replace_event_wait_handles().
344#ifndef NDEBUG
345 bool ok =
346#endif
347 m_sync_io.replace_event_wait_handles([this]() -> Asio_waitable_native_handle
348 { return Asio_waitable_native_handle(*(m_worker->task_engine())); });
349 assert(ok && "Should work if m_sync_io.release() worked as advertised.");
350
351 /* Lastly do what (NULL-state) ctor does regarding m_*_sync_io_adapter... namely leaves them null.
352 * That is we must .reset() their optional<>s. However this will run their dtors. These will basically no-op,
353 * since in we've-done-nothing-since-entering-PEER-state state by contract -- so there will be no pending
354 * handlers to execute or anything. To prepare for their dtors to run, we do what our dtor does namely
355 * m_worker->stop(); except what they think of as m_worker (by reference) is prev_worker. */
356 prev_worker->stop();
357 // Now this should be fine.
358 m_rcv_sync_io_adapter.reset();
359 m_snd_sync_io_adapter.reset();
360
361 return core;
362} // Native_socket_stream::Impl::release()
363#endif
364
365} // namespace ipc::transport
Internal, non-movable pImpl implementation of Native_socket_stream class.
size_t send_blob_max_size() const
See Native_socket_stream counterpart.
~Impl()
See Native_socket_stream counterpart.
bool send_blob(const util::Blob_const &blob, Error_code *err_code)
See Native_socket_stream counterpart.
bool idle_timer_run(util::Fine_duration timeout)
See Native_socket_stream counterpart.
size_t send_meta_blob_max_size() const
See Native_socket_stream counterpart.
bool end_sending()
See Native_socket_stream counterpart.
bool sync_connect(const Shared_name &absolute_name, Error_code *err_code)
See Native_socket_stream counterpart.
bool async_receive_native_handle(Native_handle *target_hndl, const util::Blob_mutable &target_meta_blob, flow::async::Task_asio_err_sz &&on_done_func)
See Native_socket_stream counterpart.
util::Process_credentials remote_peer_process_credentials(Error_code *err_code) const
See Native_socket_stream counterpart.
boost::movelib::unique_ptr< flow::async::Single_thread_task_loop > m_worker
Single-thread worker pool for all internal async work in both directions.
size_t receive_blob_max_size() const
See Native_socket_stream counterpart.
bool async_receive_blob(const util::Blob_mutable &target_blob, flow::async::Task_asio_err_sz &&on_done_func)
See Native_socket_stream counterpart.
sync_io::Native_socket_stream m_sync_io
The core Native_socket_stream engine, implementing the sync_io pattern (see util::sync_io doc header)...
bool send_native_handle(Native_handle hndl_or_null, const util::Blob_const &meta_blob, Error_code *err_code)
See Native_socket_stream counterpart.
std::optional< sync_io::Async_adapter_receiver< decltype(m_sync_io)> > m_rcv_sync_io_adapter
Null until PEER state, this handles ~all receive-ops logic in that state.
const std::string & nickname() const
See Native_socket_stream counterpart.
bool async_end_sending(flow::async::Task_asio_err &&on_done_func)
See Native_socket_stream counterpart.
std::optional< sync_io::Async_adapter_sender< decltype(m_sync_io)> > m_snd_sync_io_adapter
Null until PEER state, this handles ~all send-ops logic in that state.
Impl(flow::log::Logger *logger_ptr, util::String_view nickname_str)
See Native_socket_stream counterpart.
bool auto_ping(util::Fine_duration period)
See Native_socket_stream counterpart.
size_t receive_meta_blob_max_size() const
See Native_socket_stream counterpart.
Implements both Native_handle_sender and Native_handle_receiver concepts by using a stream-oriented U...
size_t send_blob_max_size() const
Implements Blob_sender API per contract.
const std::string & nickname() const
Returns nickname, a brief string suitable for logging.
bool async_end_sending(Task_err &&on_done_func)
Implements Native_handle_sender, Blob_sender API per contract.
friend std::ostream & operator<<(std::ostream &os, const Native_socket_stream &val)
Friend of Native_socket_stream.
size_t receive_blob_max_size() const
Implements Blob_receiver API per contract.
Implements both sync_io::Native_handle_sender and sync_io::Native_handle_receiver concepts by using a...
bool replace_event_wait_handles(const Create_ev_wait_hndl_func &create_ev_wait_hndl_func)
Implements Native_handle_sender and Native_handle_receiver APIs at the same time, per their concept c...
A process's credentials (PID, UID, GID as of this writing).
String-wrapping abstraction representing a name uniquely distinguishing a kernel-persistent entity fr...
Useful if using the sync_io pattern within a user event loop built on boost.asio (optionally with flo...
Flow-IPC module providing transmission of structured messages and/or low-level blobs (and more) betwe...
boost::shared_ptr< Task > Task_ptr
Short-hand for ref-counted pointer to a Function<> that takes no arguments and returns nothing; in pa...
boost::asio::mutable_buffer Blob_mutable
Short-hand for an mutable blob somewhere in memory, stored as exactly a void* and a size_t.
Definition: util_fwd.hpp:134
flow::Fine_duration Fine_duration
Short-hand for Flow's Fine_duration.
Definition: util_fwd.hpp:111
boost::asio::const_buffer Blob_const
Short-hand for an immutable blob somewhere in memory, stored as exactly a void const * and a size_t.
Definition: util_fwd.hpp:128
flow::util::String_view String_view
Short-hand for Flow's String_view.
Definition: util_fwd.hpp:109
Log_component
The flow::log::Component payload enumeration containing various log components used by Flow-IPC inter...
Definition: common.hpp:322
flow::Error_code Error_code
Short-hand for flow::Error_code which is very common.
Definition: common.hpp:297
A monolayer-thin wrapper around a native handle, a/k/a descriptor a/k/a FD.