Flow-IPC 1.0.2
Flow-IPC project: Full implementation reference.
session_server_adapter.hpp
Go to the documentation of this file.
1/* Flow-IPC: Sessions
2 * Copyright 2023 Akamai Technologies, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the
5 * "License"); you may not use this file except in
6 * compliance with the License. You may obtain a copy
7 * of the License at
8 *
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in
12 * writing, software distributed under the License is
13 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
14 * CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing
16 * permissions and limitations under the License. */
17
18/// @file
19#pragma once
20
22
24{
25
26// Types.
27
28/**
29 * `sync_io`-pattern counterpart to async-I/O-pattern session::Session_server types and all their SHM-aware variations
30 * (at least shm::classic::Session_server and shm::arena_lend::jemalloc::Session_server). In point of fact:
31 * - Use this if and only if you desire a `sync_io`-pattern style of being informed of async events from a
32 * `Session_server` *and* `Server_session` of any kind. For example, you may find this convenient
33 * if your event loop is an old-school reactor using `poll()` or `epoll_wait()`. This affects exactly the
34 * following APIs:
35 * - `Session_server::async_accept()`,
36 * - This Session_server_adapter *adapts* a `Session_server` constructed and stored within `*this`.
37 * All APIs excluding the above -- that is to say all non-async APIs -- are to be invoked via core() accessor.
38 * - Trying to use `core()->async_accept()` leads to undefined behavior.
39 *
40 * @see util::sync_io doc header -- describes the general `sync_io` pattern we are following.
41 * @see session::Session_server, shm::classic::Session_server, shm::arena_lend::jemalloc::Session_server.
42 *
43 * As is generally the case when choosing `sync_io::X` versus `X`, we recommend using `X` due to it being easier.
44 * In this particular case (see below) there is no perf benefit to using `sync_io::X`, either, so the only reason
45 * to use `sync_io::X` in this case would be because you've got an old-school reactor event loop with
46 * a `poll()` or `epoll_wait()`, in which case the `sync_io` API may be easier to integrate.
47 *
48 * To use it:
49 * - Construct it explicitly. The ctor signature is exactly identical to that of session::Session_server.
50 * - Set up `sync_io` pattern using start_ops() (and if needed precede it with replace_event_wait_handles()).
51 * - Use async_accept() in similar fashion to async-I/O `Server_session` supplying a completion handler
52 * (though it will be invoked synchronously per `sync_io` pattern).
53 * - As normal, construct a blank `Server_session` to pass as the target of async_accept().
54 * The type of this object shall be Session_server_adapter::Session_obj; it shall be a concrete
55 * type of the class template Session_server_adapter.
56 * - On successful accept:
57 * - See doc header for Server_session_adapter on what to do next. Spoiler alert:
58 * Server_session_adapter::start_ops(),
59 * then Server_session_adapter::init_handlers() (mirroring a vanilla `Server_session`).
60 * - Use `core()->` for all other API needs (e.g., shm::classic::Session_server::app_shm()).
61 *
62 * ### Internal implementation ###
63 * Normally this would not be in the public docs for this public-use class, but indulge us...
64 *
65 * ...by reading an equally applicable note in Client_session_adapter doc header.
66 *
67 * @tparam Session_server
68 * The async-I/O `Session_server` concrete type being adapted. As of this writing that would be one of
69 * at least: `session::Session_server<knobs>`, `session::shm::classic::Session_server<knobs>`,
70 * `session::shm::jemalloc::Session_server<knobs>`.
71 */
72template<typename Session_server>
74{
75public:
76 // Types.
77
78 /// Short-hand, for generic programming et al, for template parameter `Session_server`.
80
81 /// Short-hand for object type targeted by async_accept().
83
84 /// Useful for generic programming, the async-I/O-pattern counterpart to `*this` type.
86 /// You may disregard.
88
89 // Constructors/destructor.
90
91 /**
92 * Forwards to the #Session_server_obj ctor. See Session_server ctor doc headers.
93 *
94 * @tparam Ctor_args
95 * See above.
96 * @param ctor_args
97 * See above.
98 */
99 template<typename... Ctor_args>
100 Session_server_adapter(Ctor_args&&... ctor_args);
101
102 // Methods.
103
104 /**
105 * All notes from Client_session_adapter::start_ops() apply equally.
106 *
107 * @tparam Event_wait_func_t
108 * See above.
109 * @param ev_wait_func
110 * See above.
111 * @return See above.
112 */
113 template<typename Event_wait_func_t>
114 bool start_ops(Event_wait_func_t&& ev_wait_func);
115
116 /**
117 * All notes from Client_session_adapter::replace_event_wait_handles() apply equally.
118 *
119 * @tparam Create_ev_wait_hndl_func
120 * See above.
121 * @param create_ev_wait_hndl_func
122 * See above.
123 * @return See above.
124 */
125 template<typename Create_ev_wait_hndl_func>
126 bool replace_event_wait_handles(const Create_ev_wait_hndl_func& create_ev_wait_hndl_func);
127
128 /**
129 * Acts identically to 1-arg overload of Session_server::async_accept(), except that the completion handler
130 * is invoked in the `sync_io`-pattern fashion, synchronously inside an async-wait performed by you and
131 * reported via `(*on_active_ev_func)()`. Returns `false` if invoked before start_ops().
132 *
133 * @tparam Task_err
134 * See above.
135 * @param target_session
136 * Pointer to #Session_obj which shall be assigned an almost-PEER-state (open, requires
137 * Server_session_adapter::init_handlers() to enter PEER state) as `on_done_func()`
138 * is called. Not touched on error. Recommend default-constructing a #Session_obj and passing
139 * pointer thereto here.
140 * @param on_done_func
141 * See above.
142 * @return `true` on successful start to async-accept; `false` if called before start_ops().
143 */
144 template<typename Task_err>
145 bool async_accept(Session_obj* target_session, Task_err&& on_done_func);
146
147 /**
148 * Acts identically to 7-arg overload of Session_server::async_accept(), except that the completion handler
149 * is invoked in the `sync_io`-pattern fashion, synchronously inside an async-wait performed by you and
150 * reported via `(*on_active_ev_func)()`. Returns `false` if invoked before start_ops().
151 *
152 * @tparam Task_err
153 * See above.
154 * @tparam N_init_channels_by_srv_req_func
155 * See above.
156 * @tparam Mdt_load_func
157 * See above.
158 * @param target_session
159 * See above.
160 * @param init_channels_by_srv_req
161 * See above.
162 * @param mdt_from_cli_or_null
163 * See above: null or pointer to `Reader` of metadata which shall be set for access on success.
164 * @param init_channels_by_cli_req
165 * See above.
166 * @param n_init_channels_by_srv_req_func
167 * See above.
168 * @param mdt_load_func
169 * See above.
170 * @param on_done_func
171 * See above.
172 * @return `true` on successful start to async-accept; `false` if called before start_ops().
173 */
174 template<typename Task_err,
175 typename N_init_channels_by_srv_req_func, typename Mdt_load_func>
176 bool async_accept(Session_obj* target_session,
177 typename Session_server_obj::Channels* init_channels_by_srv_req,
178 typename Session_server_obj::Mdt_reader_ptr* mdt_from_cli_or_null,
179 typename Session_server_obj::Channels* init_channels_by_cli_req,
180 N_init_channels_by_srv_req_func&& n_init_channels_by_srv_req_func,
181 Mdt_load_func&& mdt_load_func,
182 Task_err&& on_done_func);
183
184 /**
185 * The adapted mutable #Session_server_obj. It is safe to access any API except for `core()->async_accept()`
186 * (undefined behavior); use `this->async_accept()` instead. Remember that start_ops() is required first.
187 *
188 * @return See above.
189 */
191
192 /**
193 * The adapted immutable #Session_server_obj. Remember that start_ops() is required first.
194 *
195 * @return See above.
196 */
197 const Session_server_obj* core() const;
198
199 /**
200 * See `flow::log::Log_context`.
201 * @return See above.
202 */
203 flow::log::Logger* get_logger() const;
204
205 /**
206 * See `flow::log::Log_context`.
207 * @return See above.
208 */
209 const flow::log::Component& get_log_component() const;
210
211private:
212 // Methods.
213
214 /**
215 * The real handler given for `on_done_func` to `Session_server_obj::async_accept()`: it records the
216 * result of that async-accept to #m_target_err_code, then signals accept_read() via
217 * the IPC-pipe.
218 *
219 * @param err_code
220 * Result from `Session_server_obj::async_accept()`.
221 */
222 void accept_write(const Error_code& err_code);
223
224 /**
225 * Signaled by accept_write(), it returns the IPC-pipe to steady-state (empty, not readable), then invokes
226 * the original user `on_done_func()`.
227 */
228 void accept_read();
229
230 // Data.
231
232 /// Similar to the one in Session_adapter.
233 flow::util::Task_engine m_nb_task_engine;
234
235 /// Similar to the one in Session_adapter.
236 flow::util::Task_engine m_ev_hndl_task_engine_unused;
237
238 /// Similar to the one in Session_adapter, applied to async_accept().
240
241 /// Similar to the one in Session_adapter, applied to async_accept().
243
244 /// Similar to the one in Session_adapter, applied to async_accept().
246
247 /// Similar to the one in Session_adapter.
249
250 /// `on_done_func` from async_accept() if one is pending; otherwise `.empty()`.
251 flow::async::Task_asio_err m_on_done_func_or_empty;
252
253 /// Result given to (or about to be given to) #m_on_done_func_or_empty.
255
256 /// This guy does all the work. In our dtor this will be destroyed (hence thread stopped) first-thing.
258}; // class Session_server_adapter
259
260// Free functions: in *_fwd.hpp.
261
262// Template implementations.
263
264template<typename Session_server>
265template<typename... Ctor_args>
267 m_ready_reader(m_nb_task_engine), // No handle inside but will be set-up soon below.
268 m_ready_writer(m_nb_task_engine), // Ditto.
269 m_ev_wait_hndl(m_ev_hndl_task_engine_unused), // This needs to be .assign()ed still.
270 m_async_io(std::forward<Ctor_args>(ctor_args)...) // That had to have set up get_logger(), etc., by the way.
271{
273 using boost::asio::connect_pipe;
274
275 Error_code sys_err_code;
276
277 connect_pipe(m_ready_reader, m_ready_writer, sys_err_code);
278 if (sys_err_code)
279 {
280 FLOW_LOG_FATAL("Session_server_adapter [" << *this << "]: Constructing: connect-pipe failed. Details follow.");
281 FLOW_ERROR_SYS_ERROR_LOG_FATAL();
282 assert(false && "We chose not to complicate the code given how unlikely this is, and how hosed you'd have to be.");
283 std::abort();
284 }
285
287}
288
289template<typename Session_server>
290template<typename Event_wait_func_t>
291bool Session_server_adapter<Session_server>::start_ops(Event_wait_func_t&& ev_wait_func)
292{
293 using util::Task;
294
295 if (!m_ev_wait_func.empty())
296 {
297 FLOW_LOG_WARNING("Session_server_adapter [" << *this << "]: Start-ops requested, "
298 "but we are already started. Probably a user bug, but it is not for us to judge.");
299 return false;
300 }
301 // else
302
303 m_ev_wait_func = std::move(ev_wait_func);
304
305 FLOW_LOG_INFO("Session_server_adapter [" << *this << "]: Start-ops requested. Done.");
306 return true;
307
308 // That's it for now. async_accept() will start an actual async-wait.
309} // Session_adapter::start_ops()
310
311template<typename Session_server>
312template<typename Create_ev_wait_hndl_func>
314 (const Create_ev_wait_hndl_func& create_ev_wait_hndl_func)
315{
317
318 if (!m_ev_wait_func.empty())
319 {
320 FLOW_LOG_WARNING("Session_server_adapter [" << *this << "]: Cannot replace event-wait handles after "
321 "a start-*-ops procedure has been executed. Ignoring.");
322 return false;
323 }
324 // else
325
326 FLOW_LOG_INFO("Session_server_adapter [" << *this << "]: "
327 "Replacing event-wait handles (probably to replace underlying "
328 "execution context without outside event loop's boost.asio Task_engine or similar).");
329
330 assert(m_ev_wait_hndl.is_open());
331
332 Native_handle saved(m_ev_wait_hndl.release());
333 m_ev_wait_hndl = create_ev_wait_hndl_func();
334 m_ev_wait_hndl.assign(saved);
335
336 return true;
337} // Session_server_adapter::replace_event_wait_handles()
338
339template<typename Session_server>
340template<typename Task_err>
341bool Session_server_adapter<Session_server>::async_accept(Session_obj* target_session, Task_err&& on_done_func)
342{
343 using util::Task;
344
345 if (!m_on_done_func_or_empty.empty())
346 {
347 FLOW_LOG_WARNING("Session_server_adapter [" << *this << "]: "
348 "Async-accept requested during async-accept. Ignoring.");
349 return false;
350 }
351 // else
352 m_on_done_func_or_empty = std::move(on_done_func);
353
354 core()->async_accept(target_session->core(), // <-- ATTN! It's fine, because we don't do core()->init_handlers(). -*-
355 [this](const Error_code& err_code) { accept_write(err_code); });
356 // -*- They will need to do target_session->init_handlers() (analogously to vanilla would-be core()->init_handlers()).
357
358 m_ev_wait_func(&m_ev_wait_hndl,
359 false, // Wait for read.
360 boost::make_shared<Task>([this]() { accept_read(); }));
361
362 return true;
363} // Session_server_adapter::async_accept()
364
365template<typename Session_server>
366template<typename Task_err,
367 typename N_init_channels_by_srv_req_func, typename Mdt_load_func>
369 (Session_obj* target_session,
370 typename Session_server_obj::Channels* init_channels_by_srv_req,
371 typename Session_server_obj::Mdt_reader_ptr* mdt_from_cli_or_null,
372 typename Session_server_obj::Channels* init_channels_by_cli_req,
373 N_init_channels_by_srv_req_func&& n_init_channels_by_srv_req_func,
374 Mdt_load_func&& mdt_load_func,
375 Task_err&& on_done_func)
376{
377 using util::Task;
378
379 if (!m_on_done_func_or_empty.empty())
380 {
381 FLOW_LOG_WARNING("Session_server_adapter [" << *this << "]: "
382 "Async-accept requested during async-accept. Ignoring.");
383 return false;
384 }
385 // else
386 m_on_done_func_or_empty = std::move(on_done_func);
387
388 core()->async_accept(target_session->core(), // <-- ATTN! It's fine, because we don't do core()->init_handlers(). -*-
389 init_channels_by_srv_req, mdt_from_cli_or_null, init_channels_by_cli_req,
390 std::move(n_init_channels_by_srv_req_func), std::move(mdt_load_func),
391 [this](const Error_code& err_code) { accept_write(err_code); });
392 // -*- They will need to do target_session->init_handlers() (analogously to vanilla would-be core()->init_handlers()).
393
394 m_ev_wait_func(&m_ev_wait_hndl,
395 false, // Wait for read.
396 boost::make_shared<Task>([this]() { accept_read(); }));
397
398 return true;
399} // Session_server_adapter::async_accept()
400
401template<typename Session_server>
403{
405 {
406 return; // Stuff is shutting down. GTFO.
407 }
408 // else
409
410 FLOW_LOG_INFO("Session_server_adapter [" << *this << "]: Async-IO core reports accept-complete event: "
411 "tickling IPC-pipe to inform user.");
412
413 m_target_err_code = err_code;
414
415 util::pipe_produce(get_logger(), &m_ready_writer);
416}
417
418template<typename Session_server>
420{
421 FLOW_LOG_INFO("Session_server_adapter [" << *this << "]: Async-IO core accept-complete event: "
422 "informed via IPC-pipe; invoking handler.");
423 util::pipe_consume(get_logger(), &m_ready_reader); // They could in theory try again, if that actually failed.
424
425 auto on_done_func = std::move(m_on_done_func_or_empty);
426 m_on_done_func_or_empty.clear(); // In case move() didn't do it.
427
428 on_done_func(m_target_err_code);
429 FLOW_LOG_TRACE("Handler completed.");
430}
431
432template<typename Session_server>
435{
436 return &m_async_io;
437}
438
439template<typename Session_server>
442{
443 return const_cast<Session_server_adapter*>(this)->core();
444}
445
446template<typename Session_server>
448{
449 return core()->get_logger();
450}
451
452template<typename Session_server>
454{
455 return core()->get_log_component();
456}
457
458template<typename Session_server>
459std::ostream& operator<<(std::ostream& os,
461{
462 return os << "SIO@" << static_cast<const void*>(&val) << " sess_srv[" << (*(val.core())) << ']';
463}
464
465} // namespace ipc::session::sync_io
To be instantiated typically once in a given process, an object of this type asynchronously listens f...
typename Impl::Channels Channels
Short-hand for Session_mv::Channels.
typename Impl::Mdt_reader_ptr Mdt_reader_ptr
Short-hand for Session_mv::Mdt_reader_ptr.
sync_io-pattern counterpart to async-I/O-pattern session::Server_session types and all their SHM-awar...
Session_obj * core()
The adapted mutable Session_obj.
sync_io-pattern counterpart to async-I/O-pattern session::Session_server types and all their SHM-awar...
util::sync_io::Event_wait_func m_ev_wait_func
Similar to the one in Session_adapter.
Session_server_adapter(Ctor_args &&... ctor_args)
Forwards to the Session_server_obj ctor.
util::Pipe_writer m_ready_writer
Similar to the one in Session_adapter, applied to async_accept().
void accept_read()
Signaled by accept_write(), it returns the IPC-pipe to steady-state (empty, not readable),...
bool async_accept(Session_obj *target_session, Task_err &&on_done_func)
Acts identically to 1-arg overload of Session_server::async_accept(), except that the completion hand...
util::sync_io::Asio_waitable_native_handle m_ev_wait_hndl
Similar to the one in Session_adapter, applied to async_accept().
util::Pipe_reader m_ready_reader
Similar to the one in Session_adapter, applied to async_accept().
Error_code m_target_err_code
Result given to (or about to be given to) m_on_done_func_or_empty.
flow::util::Task_engine m_nb_task_engine
Similar to the one in Session_adapter.
Session_server_obj * core()
The adapted mutable Session_server_obj.
flow::util::Task_engine m_ev_hndl_task_engine_unused
Similar to the one in Session_adapter.
const flow::log::Component & get_log_component() const
See flow::log::Log_context.
bool start_ops(Event_wait_func_t &&ev_wait_func)
All notes from Client_session_adapter::start_ops() apply equally.
void accept_write(const Error_code &err_code)
The real handler given for on_done_func to Session_server_obj::async_accept(): it records the result ...
flow::async::Task_asio_err m_on_done_func_or_empty
on_done_func from async_accept() if one is pending; otherwise .empty().
Async_io_obj m_async_io
This guy does all the work. In our dtor this will be destroyed (hence thread stopped) first-thing.
flow::log::Logger * get_logger() const
See flow::log::Log_context.
bool replace_event_wait_handles(const Create_ev_wait_hndl_func &create_ev_wait_hndl_func)
All notes from Client_session_adapter::replace_event_wait_handles() apply equally.
Session_server Session_server_obj
Short-hand, for generic programming et al, for template parameter Session_server.
Dummy type for use as a template param to Channel when either the blobs pipe or handles pipe is disab...
Definition: channel.hpp:1000
Useful if using the sync_io pattern within a user event loop built on boost.asio (optionally with flo...
void assign(Native_handle hndl)
Loads value to be returned by native_handle().
@ S_OBJECT_SHUTDOWN_ABORTED_COMPLETION_HANDLER
Async completion handler is being called prematurely, because underlying object is shutting down,...
sync_io-pattern counterparts to async-I/O-pattern object types in parent namespace ipc::session.
std::ostream & operator<<(std::ostream &os, const Server_session_adapter< Session > &val)
Prints string representation of the given Server_session_adapter to the given ostream.
util::Native_handle Native_handle
Convenience alias for the commonly used type util::Native_handle.
Function< void(Asio_waitable_native_handle *hndl_of_interest, bool ev_of_interest_snd_else_rcv, Task_ptr &&on_active_ev_func)> Event_wait_func
In sync_io pattern, concrete type storing user-supplied function invoked by pattern-implementing ipc:...
flow::async::Task Task
Short-hand for polymorphic function (a-la std::function<>) that takes no arguments and returns nothin...
Definition: util_fwd.hpp:122
void pipe_produce(flow::log::Logger *logger_ptr, Pipe_writer *pipe)
Writes a byte to the given pipe writer.
Definition: util.cpp:67
void pipe_consume(flow::log::Logger *logger_ptr, Pipe_reader *pipe)
Reads a byte via the given pipe reader.
Definition: util.cpp:96
boost::asio::writable_pipe Pipe_writer
Short-hand for anonymous pipe write end.
Definition: util_fwd.hpp:32
boost::asio::readable_pipe Pipe_reader
Short-hand for anonymous pipe read end.
Definition: util_fwd.hpp:35
flow::Error_code Error_code
Short-hand for flow::Error_code which is very common.
Definition: common.hpp:298
A monolayer-thin wrapper around a native handle, a/k/a descriptor a/k/a FD.