Flow-IPC 2.0.0
Flow-IPC project: Full implementation reference.
native_socket_stream_impl.cpp
Go to the documentation of this file.
1/* Flow-IPC: Core
2 * Copyright 2023 Akamai Technologies, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the
5 * "License"); you may not use this file except in
6 * compliance with the License. You may obtain a copy
7 * of the License at
8 *
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in
12 * writing, software distributed under the License is
13 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
14 * CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing
16 * permissions and limitations under the License. */
17
18/// @file
21#include "flow/error/error.hpp"
22#include <boost/move/make_unique.hpp>
23#include <cstddef>
24
25namespace ipc::transport
26{
27
28// Implementations.
29
30// General + connect-ops.
31
32// Delegated-to ctor (note the tag arg).
34 flow::log::Log_context(sync_io_core_moved.get_logger(), Log_component::S_TRANSPORT),
35
36 m_worker(boost::movelib::make_unique<flow::async::Single_thread_task_loop>
37 (get_logger(),
38 /* (Linux) OS thread name will truncate .nickname() to 15-4=11 chars here; high chance that'll include
39 * something decently useful; probably not everything though; depends on nickname.
40 * It's a decent attempt. */
41 flow::util::ostream_op_string("Sck-", sync_io_core_moved.nickname()))),
42 // Adopt the just-cted, idle sync_io::Native_socket_stream. It may be in NULL state or PEER state.
43 m_sync_io(std::move(sync_io_core_moved))
44{
47 using flow::async::reset_this_thread_pinning;
48
49 m_worker->start(reset_this_thread_pinning); // Don't inherit any strange core-affinity! Worker must float free.
50
51 // We're using a boost.asio event loop, so we need to base the async-waited-on handles on our Task_engine.
52#ifndef NDEBUG
53 bool ok =
54#endif
55 m_sync_io.replace_event_wait_handles([this]() -> Asio_waitable_native_handle
56 { return Asio_waitable_native_handle(*(m_worker->task_engine())); });
57 assert(ok && "Did you break contract by passing-in a non-fresh sync_io core object to ctor?");
58
59 /* Delegating PEER-state ctor shall deal with m_snd_sync_io_adapter/rcv.
60 * Otherwise NULL-state ctor shall do no such thing, but a successful sync_connect() will do just that. */
61
62 FLOW_LOG_TRACE("Socket stream [" << *this << "]: Created (NULL state).");
63} // Native_socket_stream::Impl::Impl()
64
65Native_socket_stream::Impl::Impl(flow::log::Logger* logger_ptr, util::String_view nickname_str) :
66 // Create core ourselves (NULL state); then delegate to other ctor.
67 Impl(sync_io::Native_socket_stream(logger_ptr, nickname_str), nullptr)
68{
69 // Done.
70}
71
72Native_socket_stream::Impl::Impl(flow::log::Logger* logger_ptr, util::String_view nickname_str,
73 Native_handle&& native_peer_socket_moved) :
74 // Create core ourselves (in PEER state); then delegate to other ctor.
75 Impl(sync_io::Native_socket_stream(logger_ptr, nickname_str, std::move(native_peer_socket_moved)), nullptr)
76{
77 using flow::util::ostream_op_string;
78
79 // Lastly, as we're in PEER state, set up send-ops and receive-ops state machines.
80
81 const auto log_pfx = ostream_op_string("Sck-", nickname()); // Brief-ish for use in OS thread names or some such.
82 m_snd_sync_io_adapter.emplace(get_logger(), log_pfx, m_worker.get(), &m_sync_io);
83 m_rcv_sync_io_adapter.emplace(get_logger(), log_pfx, m_worker.get(), &m_sync_io);
84
85 FLOW_LOG_TRACE("Socket stream [" << *this << "]: Created (PEER state) directly from pre-opened native handle.");
86} // Native_socket_stream::Impl::Impl()
87
89 // Adopt the PEER-state core given to us by user.
90 Impl(std::move(sync_io_core_in_peer_state_moved), nullptr)
91{
92 using flow::util::ostream_op_string;
93
94 // Lastly, as we're in PEER state, set up send-ops and receive-ops state machines.
95
96 const auto log_pfx = ostream_op_string("Socket stream [", *this, ']');
97 m_snd_sync_io_adapter.emplace(get_logger(), log_pfx, m_worker.get(), &m_sync_io);
98 m_rcv_sync_io_adapter.emplace(get_logger(), log_pfx, m_worker.get(), &m_sync_io);
99
100 FLOW_LOG_TRACE("Socket stream [" << *this << "]: "
101 "Created (PEER state) by adopting fresh sync_io::Native_socket_stream core.");
102}
103
105{
106 using flow::async::Single_thread_task_loop;
107 using flow::async::reset_thread_pinning;
108 using flow::util::ostream_op_string;
109
110 // We are in thread U. By contract in doc header, they must not call us from a completion handler (thread W).
111
112 FLOW_LOG_INFO("Socket stream [" << *this << "]: Shutting down. All our "
113 "internal async handlers will be canceled; and worker thread will be joined.");
114
115 /* This (1) stop()s the Task_engine thus possibly
116 * preventing any more handlers from running at all (any handler possibly running now is the last one to run); (2)
117 * at that point Task_engine::run() exits, hence thread W exits; (3) joins thread W (waits for it to
118 * exit); (4) returns. That's a lot, but it's non-blocking. */
119 m_worker->stop();
120 // Thread W is (synchronously!) no more.
121
122 /* As we promised in doc header(s), the destruction of *this shall cause any registered completion
123 * handlers (namely async_end_sending(), async_receive_*() ones)
124 * to be called from some unspecified thread(s) that aren't thread U, and once
125 * they've all finished, the dtor returns. So which thread should we use? Well, we always use thread W for handlers,
126 * so we should use it here too, right? Answer: Well, we can't; W is finished. Should we have kept it around,
127 * m_worker.post()ed a task that'd call them all synchronously (from thread W) and then satisfied a
128 * unique_future/promise; then here in thread U dtor awaited for this promise to be satisfied? I (ygoldfel)
129 * considered it, but that would mean first letting any other queued handlers run first as usual in boost.asio, before
130 * we'd get to the user-handler-calling task. That might work out fine, but (1) the entropy involved is a bit ???;
131 * and hence (2) it's stressful trying to reason about it. So I'd rather not. (On the other hand arguably that's
132 * equivalent to the destructor being called a little bit later during an async gap. Still, let's see if there's a
133 * clearly safer alternative.)
134 *
135 * Therefore, we'll just start a transient thread and take care of business. It is, after all, an unspecified thread
136 * that isn't thread U, as promised. Since the thread owning various m_* resources below is finished, we can
137 * access them from this new thread without issue. Perf should be fine; even if this were a heavy-weight op, we
138 * will not be called often; in any case it's at least a non-blocking op.
139 *
140 * Annoyingly, that's not all. Consider async_receive_native_handle(F). In thread W we will invoke F() on,
141 * say, successful receive. However, to avoid recursive mayhem (wherein we call user F(), user F() starts
142 * another async_receive_*() in thread W, which modifies our state while still inside our internal thread-W code,
143 * etc.): we always m_worker.post(F) by itself -- even though we are already in thread W. Now
144 * suppose our .stop() above stops/joins thread W before that post()ed thing even ran. Then the on_done_func
145 * (user handler) captured by the real-work lambda will just get thrown out; it won't be in
146 * m_rcv_sync_io_adapter.m_pending_user_requests_q anymore (popped from there before getting post()ed), so it won't
147 * get manually invoked below with S_OBJECT_SHUTDOWN_ABORTED_COMPLETION_HANDLER error. What to do? Well, the problem
148 * is not too different conceptually from the one solved using m_pending_user_requests_q though: a queue tracks the
149 * requests; it's just that in this case the queue is inside m_worker (really inside a boost.asio Task_engine), and
150 * the queued items are post()ed tasks, and the queue push op is post(). So we just want to invoke those tasks,
151 * as-if from thread W, first. This is equivalent to those tasks simply having executed just before the
152 * dtor got called. Fortunately boost.asio Task_engine has just the API for it: restart() undoes the .stop(),
153 * in that Task_engine::stopped() becomes false and allows us to call Task_engine::poll() which will
154 * *synchronously* execute any post()ed/not-yet-invoked tasks. (It's like Task_engine::run(), except it doesn't block
155 * waiting for async work to actually get done.)
156 *
157 * Anyway! Formally m_snd/rcv_sync_io_adapter requires us to do that .restart()/.poll() for the above reason.
158 * So we do it. Lastly m_snd/rcv_sync_io_adapter's dtors will do the one-off thread thing
159 * for their respective sets of pending completion handlers if any. */
160
161 FLOW_LOG_INFO("Socket stream [" << *this << "]: Continuing shutdown. Next we will run pending handlers from some "
162 "other thread. In this user thread we will await those handlers' completion and then return.");
163
164 Single_thread_task_loop one_thread(get_logger(), ostream_op_string("SckDeinit-", nickname()));
165 one_thread.start([&]()
166 {
167 reset_thread_pinning(get_logger()); // Don't inherit any strange core-affinity. Float free.
168
169 FLOW_LOG_INFO("Socket stream [" << *this << "]: "
170 "In transient finisher thread: Shall run all pending internal handlers (typically none).");
171
172 const auto task_engine = m_worker->task_engine();
173 task_engine->restart();
174 const auto count = task_engine->poll();
175 if (count != 0)
176 {
177 FLOW_LOG_INFO("Socket stream [" << *this << "]: "
178 "In transient finisher thread: Ran [" << count << "] internal handlers after all.");
179 }
180 task_engine->stop();
181
182 /* When m_*_sync_io_adapter dtors auto-execute, they'll issue OBJECT_SHUTDOWN_ABORTED_COMPLETION_HANDLER to
183 * their handlers if any. */
184 FLOW_LOG_INFO("Transient finisher exiting. (Send-ops and receive-ops de-init may follow.)");
185 }); // one_thread.start()
186 // Here thread exits/joins synchronously. (But the adapters might run their own similar ones.)
187} // Native_socket_stream::Impl::~Impl()
188
190{
191 using flow::util::ostream_op_string;
192
193 FLOW_ERROR_EXEC_AND_THROW_ON_ERROR(bool, sync_connect, absolute_name, _1);
194 // ^-- Call ourselves and return if err_code is null. If got to present line, err_code is not null.
195
196 if (!m_sync_io.sync_connect(absolute_name, err_code))
197 {
198 return false; // Wasn't in NULL state.
199 }
200 // else:
201
202 if (!*err_code)
203 {
204 // PEER state! Yay! Do the thing PEER-state ctor would have done.
205 const auto log_pfx = ostream_op_string("Socket stream [", *this, ']');
206 m_snd_sync_io_adapter.emplace(get_logger(), log_pfx, m_worker.get(), &m_sync_io);
207 m_rcv_sync_io_adapter.emplace(get_logger(), log_pfx, m_worker.get(), &m_sync_io);
208 }
209 // else { Back in NULL state. Perhaps they'll sync_connect() again later. }
210
211 return true;
212} // Native_socket_stream::Impl::sync_connect()
213
216{
217 return m_sync_io.remote_peer_process_credentials(err_code);
218}
219
220const std::string& Native_socket_stream::Impl::nickname() const
221{
222 return m_sync_io.nickname();
223}
224
225std::ostream& operator<<(std::ostream& os, const Native_socket_stream::Impl& val)
226{
227 return os << '[' << val.nickname() << "]@" << static_cast<const void*>(&val);
228}
229
230// Send-ops.
231
233{
234 return m_snd_sync_io_adapter
235 ? (m_snd_sync_io_adapter->send_blob(blob, err_code), true) // It's void.
236 : false; // Not in PEER state (ditto all over the place below).
237}
238
240 Error_code* err_code)
241{
242 return m_snd_sync_io_adapter
243 ? (m_snd_sync_io_adapter->send_native_handle(hndl, meta_blob, err_code), true) // It's void.
244 : false;
245}
246
248{
249 using flow::async::Task_asio_err;
250
251 return async_end_sending(Task_asio_err());
252}
253
254bool Native_socket_stream::Impl::async_end_sending(flow::async::Task_asio_err&& on_done_func)
255{
256 return m_snd_sync_io_adapter
257 ? m_snd_sync_io_adapter->async_end_sending(std::move(on_done_func))
258 : false;
259}
260
262{
263 return m_snd_sync_io_adapter
264 ? m_snd_sync_io_adapter->auto_ping(period)
265 : false;
266}
267
269{
270 return send_blob_max_size();
271}
272
274{
275 return m_sync_io.send_blob_max_size();
276}
277
278// Receive-ops.
279
281 const util::Blob_mutable& target_meta_blob,
282 flow::async::Task_asio_err_sz&& on_done_func)
283{
284 return m_rcv_sync_io_adapter
285 ? (m_rcv_sync_io_adapter->async_receive_native_handle
286 (target_hndl, target_meta_blob, std::move(on_done_func)),
287 true) // It's void.
288 : false;
289}
290
292 flow::async::Task_asio_err_sz&& on_done_func)
293{
294 return m_rcv_sync_io_adapter
295 ? (m_rcv_sync_io_adapter->async_receive_blob(target_blob, std::move(on_done_func)), true) // It's void.
296 : false;
297}
298
300{
301 return m_rcv_sync_io_adapter
302 ? m_rcv_sync_io_adapter->idle_timer_run(timeout)
303 : false;
304}
305
307{
308 return receive_blob_max_size();
309}
310
312{
313 return m_sync_io.receive_blob_max_size();
314}
315
316#if 0 // See the declaration in class { body }; explains why `if 0` yet still here.
317sync_io::Native_socket_stream Native_socket_stream::Impl::release()
318{
321 using flow::util::ostream_op_string;
322 using flow::async::reset_this_thread_pinning;
323
324 FLOW_LOG_TRACE("Socket stream [" << *this << "]: Releasing idle-state object to new socket-stream core object.");
325
326 // Keep in sync with NULL-state ctor!
327
328 assert(m_snd_sync_io_adapter && m_rcv_sync_io_adapter && "By contract must be in PEER state.");
329
330 /* The main thing is, we've got a PEER-state core m_sync_io; but it already is in non-as-if-just-cted state,
331 * because we've executed start_*_ops() on it. However m_sync_io.release() gets us that as-if-just-cted guy
332 * and makes m_sync_io as-if-default-cted.
333 *
334 * So we invoke that. */
335 auto core = m_sync_io.release();
336
337 /* Cool. That's done. Now we need to get the rest of *this into as-if-default-cted form.
338 * We need to basically repeat what a NULL-state ctor that takes null-Logger and empty nickname would do.
339 * First up is Mr. m_worker Single_thread_task_loop guy. `m_worker = make_unique<>(...)` would do it,
340 * but we can't let the existing one be destroyed until m_sync_io* ops below are finished, because they
341 * access things attached to m_worker->task_engine() before switching those things out. So save it. */
342 auto prev_worker = std::move(m_worker);
343
344 // Now repeat what ctor does.
345 m_worker
346 = boost::movelib::make_unique<flow::async::Single_thread_task_loop>
347 (core.get_logger(), core.nickname());
348 m_worker->start(reset_this_thread_pinning);
349
350 // m_sync_io is as-if-just-cted, so do what ctor does regarding .replace_event_wait_handles().
351#ifndef NDEBUG
352 bool ok =
353#endif
354 m_sync_io.replace_event_wait_handles([this]() -> Asio_waitable_native_handle
355 { return Asio_waitable_native_handle(*(m_worker->task_engine())); });
356 assert(ok && "Should work if m_sync_io.release() worked as advertised.");
357
358 /* Lastly do what (NULL-state) ctor does regarding m_*_sync_io_adapter... namely leaves them null.
359 * That is we must .reset() their optional<>s. However this will run their dtors. These will basically no-op,
360 * since in we've-done-nothing-since-entering-PEER-state state by contract -- so there will be no pending
361 * handlers to execute or anything. To prepare for their dtors to run, we do what our dtor does namely
362 * m_worker->stop(); except what they think of as m_worker (by reference) is prev_worker. */
363 prev_worker->stop();
364 // Now this should be fine.
365 m_rcv_sync_io_adapter.reset();
366 m_snd_sync_io_adapter.reset();
367
368 return core;
369} // Native_socket_stream::Impl::release()
370#endif
371
372} // namespace ipc::transport
Internal, non-movable pImpl implementation of Native_socket_stream class.
size_t send_blob_max_size() const
See Native_socket_stream counterpart.
~Impl()
See Native_socket_stream counterpart.
bool send_blob(const util::Blob_const &blob, Error_code *err_code)
See Native_socket_stream counterpart.
bool idle_timer_run(util::Fine_duration timeout)
See Native_socket_stream counterpart.
size_t send_meta_blob_max_size() const
See Native_socket_stream counterpart.
bool end_sending()
See Native_socket_stream counterpart.
bool sync_connect(const Shared_name &absolute_name, Error_code *err_code)
See Native_socket_stream counterpart.
bool async_receive_native_handle(Native_handle *target_hndl, const util::Blob_mutable &target_meta_blob, flow::async::Task_asio_err_sz &&on_done_func)
See Native_socket_stream counterpart.
util::Process_credentials remote_peer_process_credentials(Error_code *err_code) const
See Native_socket_stream counterpart.
boost::movelib::unique_ptr< flow::async::Single_thread_task_loop > m_worker
Single-thread worker pool for all internal async work in both directions.
size_t receive_blob_max_size() const
See Native_socket_stream counterpart.
bool async_receive_blob(const util::Blob_mutable &target_blob, flow::async::Task_asio_err_sz &&on_done_func)
See Native_socket_stream counterpart.
sync_io::Native_socket_stream m_sync_io
The core Native_socket_stream engine, implementing the sync_io pattern (see util::sync_io doc header)...
bool send_native_handle(Native_handle hndl_or_null, const util::Blob_const &meta_blob, Error_code *err_code)
See Native_socket_stream counterpart.
std::optional< sync_io::Async_adapter_receiver< decltype(m_sync_io)> > m_rcv_sync_io_adapter
Null until PEER state, this handles ~all receive-ops logic in that state.
const std::string & nickname() const
See Native_socket_stream counterpart.
bool async_end_sending(flow::async::Task_asio_err &&on_done_func)
See Native_socket_stream counterpart.
std::optional< sync_io::Async_adapter_sender< decltype(m_sync_io)> > m_snd_sync_io_adapter
Null until PEER state, this handles ~all send-ops logic in that state.
Impl(flow::log::Logger *logger_ptr, util::String_view nickname_str)
See Native_socket_stream counterpart.
bool auto_ping(util::Fine_duration period)
See Native_socket_stream counterpart.
size_t receive_meta_blob_max_size() const
See Native_socket_stream counterpart.
Implements both Native_handle_sender and Native_handle_receiver concepts by using a stream-oriented U...
size_t send_blob_max_size() const
Implements Blob_sender API per contract.
const std::string & nickname() const
Returns nickname, a brief string suitable for logging.
bool sync_connect(const Shared_name &absolute_name, Error_code *err_code=0)
To be invoked in NULL state only, it synchronously and non-blockingly attempts to connect to an oppos...
bool async_end_sending(Task_err &&on_done_func)
Implements Native_handle_sender, Blob_sender API per contract.
friend std::ostream & operator<<(std::ostream &os, const Native_socket_stream &val)
Friend of Native_socket_stream.
size_t receive_blob_max_size() const
Implements Blob_receiver API per contract.
Implements both sync_io::Native_handle_sender and sync_io::Native_handle_receiver concepts by using a...
bool replace_event_wait_handles(const Create_ev_wait_hndl_func &create_ev_wait_hndl_func)
Implements Native_handle_sender and Native_handle_receiver APIs at the same time, per their concept c...
A process's credentials (PID, UID, GID as of this writing).
String-wrapping abstraction representing a name uniquely distinguishing a kernel-persistent entity fr...
Useful if using the sync_io pattern within a user event loop built on boost.asio (optionally with flo...
Flow-IPC module providing transmission of structured messages and/or low-level blobs (and more) betwe...
boost::shared_ptr< Task > Task_ptr
Short-hand for ref-counted pointer to a Function<> that takes no arguments and returns nothing; in pa...
boost::asio::mutable_buffer Blob_mutable
Short-hand for an mutable blob somewhere in memory, stored as exactly a void* and a size_t.
Definition: util_fwd.hpp:140
flow::Fine_duration Fine_duration
Short-hand for Flow's Fine_duration.
Definition: util_fwd.hpp:117
boost::asio::const_buffer Blob_const
Short-hand for an immutable blob somewhere in memory, stored as exactly a void const * and a size_t.
Definition: util_fwd.hpp:134
flow::util::String_view String_view
Short-hand for Flow's String_view.
Definition: util_fwd.hpp:115
Log_component
The flow::log::Component payload enumeration containing various log components used by Flow-IPC inter...
Definition: common.hpp:323
flow::Error_code Error_code
Short-hand for flow::Error_code which is very common.
Definition: common.hpp:298
A monolayer-thin wrapper around a native handle, a/k/a descriptor a/k/a FD.