Flow-IPC 1.0.2
Flow-IPC project: Full implementation reference.
session_adapter.hpp
Go to the documentation of this file.
1/* Flow-IPC: Sessions
2 * Copyright 2023 Akamai Technologies, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the
5 * "License"); you may not use this file except in
6 * compliance with the License. You may obtain a copy
7 * of the License at
8 *
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in
12 * writing, software distributed under the License is
13 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
14 * CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing
16 * permissions and limitations under the License. */
17
18/// @file
19#pragma once
20
24#include <boost/move/make_unique.hpp>
25
27{
28
29// Types.
30
31/* @todo Technically by the established conventions of this project, there should be a ..._fwd.hpp with
32 * forward-declaration of `template<typename Session> class Session_adapter;`. Maybe we should do that. It's just...
33 * exactly two things use it (Client_session_adapter, Server_session_adapter), and it's just such pointless
34 * boiler-plate at the moment to make a sync_io/detail/sync_io_fwd.hpp just for that, when in reality they're
35 * templates and are going to include the present file anyway. Just, eh. For now anyway. It's just in most other
36 * contexts postponing doing this just leads to needing to do it later anyway, at which point it feels more painful.
37 * Here though... ehhhh.... */
38
39/**
40 * Internal-use workhorse containing common elements of Client_session_adapter and Server_session_adapter.
41 * As such it contains the machinery for:
42 * - storing (deriving from) the adapted `Session` as accessible via Session_adapter::core();
43 * - adapting async-I/O on-error and on-passive-channel-open (if enabled) events to `sync_io` pattern;
44 * - storing and invoking the central util::sync_io::Event_wait_func supplied by the user;
45 * - type aliases common to all sessions.
46 *
47 * @tparam Session
48 * See, e.g., Client_session_adapter.
49 */
50template<typename Session>
52{
53public:
54 // Types.
55
56 /// See, e.g., Client_session_adapter.
58
59 /// See, e.g., Client_session_adapter.
61 /// See, e.g., Client_session_adapter.
63
64 /// Short-hand for session-openable Channel type.
66
67 /// Short-hand for session-open metadata reader.
69
70 /// Short-hand for passive-channel-open handler.
72
73 // Constructors/destructor.
74
75 /**
76 * Compilable only when #Session_obj is a `Client_session` variant, forwards to its ctor of identical form,
77 * except that the handlers are replaced with `sync_io`-adapting ones. Do not use init_handlers() after this ctor,
78 * as that is for `Server_session`.
79 *
80 * @param logger_ptr
81 * See Client_session_mv.
82 * @param cli_app_ref
83 * See Client_session_mv.
84 * @param srv_app_ref
85 * See Client_session_mv.
86 * @param on_err_func
87 * See Client_session_mv. However in `*this` case this shall be invoked -- though still only following
88 * successful `core()->sync_connect()` as usual -- according to `sync_io` pattern, synchronously inside
89 * an async-wait performed by you and reported via `(*on_active_ev_func)()`.
90 * @param on_passive_open_channel_func
91 * See Client_session_mv. However in `*this` case this shall be invoked -- though still only following
92 * successful `core()->sync_connect()` as usual -- according to `sync_io` pattern, synchronously inside
93 * an async-wait performed by you and reported via `(*on_active_ev_func)()`.
94 * @tparam On_passive_open_channel_handler
95 * See Client_session_mv.
96 * @tparam Task_err
97 * See Client_session_mv.
98 */
99 template<typename On_passive_open_channel_handler, typename Task_err>
100 explicit Session_adapter(flow::log::Logger* logger_ptr,
101 const Client_app& cli_app_ref, const Server_app& srv_app_ref,
102 Task_err&& on_err_func,
103 On_passive_open_channel_handler&& on_passive_open_channel_func);
104
105 /**
106 * Compilable only when #Session_obj is a `Client_session` variant, forwards to its ctor of identical form,
107 * except that the handler is replaced with `sync_io`-adapting one. Do not use init_handlers() after this ctor,
108 * as that is for `Server_session`.
109 *
110 * @param logger_ptr
111 * Logger to use for logging subsequently.
112 * @param cli_app_ref
113 * See Client_session_mv.
114 * @param srv_app_ref
115 * See Client_session_mv.
116 * @param on_err_func
117 * See Client_session_mv. However in `*this` case this shall be invoked -- though still only following
118 * successful `*_connect()` as usual -- according to `sync_io` pattern, synchronously inside
119 * an async-wait performed by you and reported via `(*on_active_ev_func)()`.
120 * @tparam Task_err
121 * See Client_session_mv.
122 */
123 template<typename Task_err>
124 explicit Session_adapter(flow::log::Logger* logger_ptr,
125 const Client_app& cli_app_ref, const Server_app& srv_app_ref,
126 Task_err&& on_err_func);
127
128 /// Forwards to the #Session_obj default ctor.
130
131 /**
132 * See, e.g., Client_session_adapter.
133 *
134 * @tparam Event_wait_func_t
135 * See above.
136 * @param ev_wait_func
137 * See above.
138 * @return See above.
139 */
140 template<typename Event_wait_func_t>
141 bool start_ops(Event_wait_func_t&& ev_wait_func);
142
143 /**
144 * See, e.g., Client_session_adapter.
145 *
146 * @tparam Create_ev_wait_hndl_func
147 * See above.
148 * @param create_ev_wait_hndl_func
149 * See above.
150 * @return See above.
151 */
152 template<typename Create_ev_wait_hndl_func>
153 bool replace_event_wait_handles(const Create_ev_wait_hndl_func& create_ev_wait_hndl_func);
154
155 /**
156 * Compilable only when #Session_obj is a `Server_session` variant, forwards to its method of identical form,
157 * except that the handlers are replaced with `sync_io`-adapting ones. Do not use non-default ctor before this,
158 * as that is for `Client_session`.
159 *
160 * @tparam Task_err
161 * See Session_server.
162 * @tparam On_passive_open_channel_handler
163 * See Session_server.
164 * @param on_err_func_arg
165 * See Session_server.
166 * @param on_passive_open_channel_func_arg
167 * See Session_server.
168 * @return See Session_server; in addition returns `false`/no-ops if invoked before start_ops().
169 */
170 template<typename Task_err, typename On_passive_open_channel_handler>
171 bool init_handlers(Task_err&& on_err_func_arg, On_passive_open_channel_handler&& on_passive_open_channel_func_arg);
172
173 /**
174 * Compilable only when #Session_obj is a `Server_session` variant, forwards to its method of identical form,
175 * except that the handlers are replaced with `sync_io`-adapting ones. Do not use non-default ctor before this,
176 * as that is for `Client_session`.
177 *
178 * @tparam Task_err
179 * See Session_server.
180 * @param on_err_func_arg
181 * See Session_server.
182 * @return See Session_server; in addition returns `false`/no-ops if invoked before start_ops().
183 */
184 template<typename Task_err>
185 bool init_handlers(Task_err&& on_err_func_arg);
186
187 /**
188 * See `flow::log::Log_context`.
189 * @return See above.
190 */
191 flow::log::Logger* get_logger() const;
192
193 /**
194 * See `flow::log::Log_context`.
195 * @return See above.
196 */
197 const flow::log::Component& get_log_component() const;
198
199protected:
200 // Methods.
201
202 /**
203 * Forwards to the util::sync_io::Event_wait_func saved in start_ops().
204 *
205 * @tparam Args
206 * See above.
207 * @param args
208 * See above.
209 */
210 template<typename... Args>
211 void async_wait(Args&&... args);
212
213 /**
214 * Utility that sets up an IPC-pipe in the given peer objects as well as loading a watcher-descriptor
215 * object for the read-end. This should be called before start_ops() or replace_event_wait_handles().
216 *
217 * @param reader
218 * Pipe read end to load. It should be constructed with its intended execution context/executor but
219 * not `.is_open()`.
220 * @param writer
221 * Pipe write end to load. It should be constructed with its intended execution context/executor but
222 * not `.is_open()`.
223 * @param ev_wait_hndl
224 * Watcher for `reader`. It should be constructed with its intended execution context/executor but
225 * not `.is_open()`.
226 */
227 void init_pipe(util::Pipe_reader* reader, util::Pipe_writer* writer,
229
230 /**
231 * The adapted mutable #Session_obj.
232 * @return See above.
233 */
234 Session_obj* core();
235
236 /**
237 * The adapted mutable #Session_obj.
238 * @return See above.
239 */
240 const Session_obj* core() const;
241
242private:
243 // Types.
244
245 /// Set of result arg values from a successful passive-channel-open from a #Session_obj invoking #On_channel_func.
247 {
248 // Types.
249
250 /// Short-hand for pointer wrapper around a `*this`.
251 using Ptr = boost::movelib::unique_ptr<Channel_open_result>;
252
253 // Data.
254
255 /// Result 1/2 given about to be given to #m_on_channel_func_or_empty.
257
258 /// Result 2/2 given about to be given to #m_on_channel_func_or_empty.
260 };
261
262 /// Queue of Channel_open_result.
263 using Channel_open_result_q = std::queue<typename Channel_open_result::Ptr>;
264
265 // Methods.
266
267 /**
268 * Signaled by the function returned by on_channel_func_sio(), it returns the IPC-pipe to steady-state (empty,
269 * not readable), invokes the original user handler passed to on_channel_func_sio(), and lastly
270 * begins the next async-wait for the procedure.
271 */
272 void on_ev_channel_open();
273
274 /**
275 * Returns the proper on-error handler to set up on the underlying #Session_obj (`Client_session`:
276 * via ctor; `Server_session`: via `init_handlers()`).
277 *
278 * The resulting handler must not be invoked before start_ops() and #m_on_err_func being set;
279 * else behavior undefined.
280 *
281 * @return See above.
282 */
283 flow::async::Task_asio_err on_err_func_sio();
284
285 /**
286 * Returns the proper on-passive-channel-open handler to set up on the underlying #Session_obj (`Client_session`:
287 * via ctor; `Server_session`: via `init_handlers()` 2-arg form).
288 *
289 * The resulting handler must not be invoked before start_ops() and #m_on_channel_func_or_empty being set;
290 * else behavior undefined.
291 *
292 * @return See above.
293 */
295
296 // Data.
297
298 /**
299 * The `Task_engine` for `m_ready_*`. It is necessary to construct those pipe-end objects, but we never
300 * use that guy's `->async_*()` APIs -- only non-blocking operations, essentially leveraging boost.asio's
301 * portable transmission APIs but not its actual, um, async-I/O abilities in this case. Accordingly we
302 * never load any tasks onto #m_nb_task_engine and certainly never `.run()` (or `.poll()` or ...) it.
303 *
304 * In the `sync_io` pattern the user's outside event loop is responsible for awaiting readability/writability
305 * of a guy like #m_ready_reader_chan via our exporting of its `.native_handle()`.
306 */
307 flow::util::Task_engine m_nb_task_engine;
308
309 /**
310 * The `Task_engine` for `m_ev_wait_hndl_*`, unless it is replaced via replace_event_wait_handles().
311 *
312 * This is to fulfill the `sync_io` pattern.
313 */
314 flow::util::Task_engine m_ev_hndl_task_engine_unused;
315
316 /**
317 * Read-end of IPC-pipe used by `*this` used to detect that the error-wait has completed. The signal byte
318 * is detected in #m_ready_reader_err. There is no need to read it, as an error can occur at most once.
319 *
320 * @see #m_ready_writer_err.
321 */
323
324 /// Write-end of IPC-pipe together with #m_ready_reader_err.
326
327 /**
328 * Descriptor waitable by outside event loop async-waits -- storing the same `Native_handle` as (and thus being
329 * used to wait on events from) #m_ready_reader_err.
330 */
332
333 /**
334 * Read-end of IPC-pipe used by `*this` used to detect that a channel-open-wait has completed. The signal byte
335 * is read out of #m_ready_reader_chan, after it was written there via #m_ready_writer_chan. As explained in
336 * #m_target_channel_open_q doc header, channel-opens can occur at any time from another thread, even while
337 * `*this` is synchronously dealing with an existing one; therefore this is a rare case where more than 1 byte
338 * could be readable at a given time. Nevertheless we issue one async-wait on #m_ready_reader_chan per
339 * channel-open, in series; eventually all of them get popped, meaning all the channel-opens have been served.
340 *
341 * This is not a rare occurrence necessarily: an opposing `Session` user can perform 2+ `open_channel()` right in
342 * a row, and since creating a `Channel` with all its 2-4 fat peer objects can be relatively lengthy on this
343 * side, the 2nd/3rd/... on-channel-open firing can "step on" a preceding one.
344 *
345 * @see #m_ready_writer_chan.
346 */
348
349 /// Write-end of IPC-pipe together with #m_ready_reader_chan.
351
352 /**
353 * Descriptor waitable by outside event loop async-waits -- storing the same `Native_handle` as (and thus being
354 * used to wait on events from) #m_ready_reader_chan.
355 */
357
358 /**
359 * Function (set forever in start_ops()) through which we invoke the outside event loop's
360 * async-wait facility for descriptors/events relevant to our ops. See util::sync_io::Event_wait_func
361 * doc header for a refresher on this mechanic.
362 */
364
365 /// `on_err_func` from init_handlers(); `.empty()` until then.
366 flow::async::Task_asio_err m_on_err_func;
367
368 /**
369 * `on_passive_open_channel_func_or_empty` from init_handlers() (possibly `.empty()` if not supplied); until then
370 * `.empty()`.
371 */
373
374 /// Result given to (or about to be given to) #m_on_err_func.
376
377 /**
378 * Queue of #On_channel_func handler arg sets received from async-I/O #Session_obj -- meaning
379 * the `Session`, in unspecified background thread, informing us (which we signal via #m_ready_writer_chan)
380 * a channel has been passively open -- and not yet fed to #m_on_channel_func_or_empty.
381 *
382 * Protected by #m_target_channel_open_q_mutex.
383 *
384 * ### Rationale ###
385 * There's a queue here, which stands in contrast to (as of this writing) all other `sync_io` pattern impls
386 * in the project, including the on-error stuff in `*this`: those always deal with a single async-op at a time,
387 * and therefore no queue is necessary (just a single arg-set; usually just an `Error_code`, sometimes
388 * with a `size_t sz` though).
389 *
390 * The reason this one is different is as follows.
391 * - It's unlike, e.g., a `"Client_session_adapter::async_connect()"`, because an async-connect is triggered by the
392 * user explicitly, which triggers a single async-wait, and we disallow in our API to trigger more, until
393 * that one completes. (Confusing note: There is no such async-connect method as of this writing, because there's
394 * no `async_connect()` on Client_session_mv, which the latter's doc header justifies... while also noting it
395 * *would* exist, if that guy were network-enabled; we'll probably add this at some point. So pretend it did to
396 * grok this bullet point. Or just ignore it for now.)
397 * - It's unlike, e.g., #m_on_err_func stuff in `*this`, because while that one can indeed happen at any time
398 * from another thread -- without the user explicitly beginning the async-op (it sort of begins by itself) --
399 * it can also only happen at most once per `*this` (modulo move-assignment).
400 *
401 * At any rate: the handler we register with #m_async_io -- on_channel_func_sio() -- can be invoked
402 * from a background thread at any time. This triggers signaling #m_ready_reader_chan (via #m_ready_writer_chan),
403 * on which there is always a `sync_io`-pattern async-wait outstanding. We are informed of a completed
404 * async-wait (byte is available on #m_ready_reader_chan); we consume the byte and pop and pass the result
405 * from top of #m_target_channel_open_q. Then we start another async-wait. If there are more items in the queue
406 * that were added in the meantime, the same number of bytes have been pushed into #m_ready_writer_chan;
407 * so those async-waits are satisfied (in series) quickly, leading to the progressive emptying of this queue
408 * and passing along those results to `m_on_channel_func_or_empty()`.
409 */
411
412 /// Protects #m_target_channel_open_q, accessed from user async-wait-reporter thread; and #Session_obj worker thread.
413 mutable flow::util::Mutex_non_recursive m_target_channel_open_q_mutex;
414
415 /// This guy does all the work. In our dtor this will be destroyed (hence thread stopped) first-thing.
417}; // class Session_adapter
418
419// Template implementations.
420
421template<typename Session>
423 m_ready_reader_err(m_nb_task_engine), // No handle inside but will be set-up soon below.
424 m_ready_writer_err(m_nb_task_engine), // Ditto.
425 m_ev_wait_hndl_err(m_ev_hndl_task_engine_unused), // This needs to be .assign()ed still.
426 m_ready_reader_chan(m_nb_task_engine),
427 m_ready_writer_chan(m_nb_task_engine),
428 m_ev_wait_hndl_chan(m_ev_hndl_task_engine_unused)
429{
432}
433
434template<typename Session>
435template<typename On_passive_open_channel_handler, typename Task_err>
436Session_adapter<Session>::Session_adapter(flow::log::Logger* logger_ptr,
437 const Client_app& cli_app_ref, const Server_app& srv_app_ref,
438 Task_err&& on_err_func,
439 On_passive_open_channel_handler&& on_passive_open_channel_func) :
440 m_ready_reader_err(m_nb_task_engine), // No handle inside but will be set-up soon below.
441 m_ready_writer_err(m_nb_task_engine), // Ditto.
442 m_ev_wait_hndl_err(m_ev_hndl_task_engine_unused), // This needs to be .assign()ed still.
443 m_ready_reader_chan(m_nb_task_engine),
444 m_ready_writer_chan(m_nb_task_engine),
445 m_ev_wait_hndl_chan(m_ev_hndl_task_engine_unused),
446 m_on_err_func(std::move(on_err_func)),
447 m_on_channel_func_or_empty(std::move(on_passive_open_channel_func)),
448 m_async_io(logger_ptr, cli_app_ref, srv_app_ref, on_err_func_sio(), on_channel_func_sio())
449{
452}
453
454template<typename Session>
455template<typename Task_err>
456Session_adapter<Session>::Session_adapter(flow::log::Logger* logger_ptr,
457 const Client_app& cli_app_ref, const Server_app& srv_app_ref,
458 Task_err&& on_err_func) :
459 m_ready_reader_err(m_nb_task_engine), // No handle inside but will be set-up soon below.
460 m_ready_writer_err(m_nb_task_engine), // Ditto.
461 m_ev_wait_hndl_err(m_ev_hndl_task_engine_unused), // This needs to be .assign()ed still.
462 m_ready_reader_chan(m_nb_task_engine),
463 m_ready_writer_chan(m_nb_task_engine),
464 m_ev_wait_hndl_chan(m_ev_hndl_task_engine_unused),
465 m_on_err_func(std::move(on_err_func)),
466 m_async_io(logger_ptr, cli_app_ref, srv_app_ref, on_err_func_sio())
467{
470}
471
472template<typename Session>
473template<typename Task_err, typename On_passive_open_channel_handler>
474bool Session_adapter<Session>::init_handlers(Task_err&& on_err_func_arg,
475 On_passive_open_channel_handler&& on_passive_open_channel_func_arg)
476{
477 if (!m_on_err_func.empty())
478 {
479 FLOW_LOG_WARNING("Session_adapter [" << m_async_io << "]: init_handlers() called duplicately. Ignoring.");
480 return false;
481 }
482 // else
483 assert(m_on_channel_func_or_empty.empty());
484
485 m_on_err_func = std::move(on_err_func_arg);
486 m_on_channel_func_or_empty = std::move(on_passive_open_channel_func_arg);
487
488#ifndef NDEBUG
489 const bool ok =
490#endif
491 core()->init_handlers(on_err_func_sio(), on_channel_func_sio());
492
493 assert(ok && "We should have caught this with the above guard.");
494 return true;
495} // Session_adapter::init_handlers()
496
497template<typename Session>
498template<typename Task_err>
499bool Session_adapter<Session>::init_handlers(Task_err&& on_err_func_arg)
500{
501 if (!m_on_err_func.empty())
502 {
503 FLOW_LOG_WARNING("Session_adapter [" << m_async_io << "]: init_handlers() called duplicately. Ignoring.");
504 return false;
505 }
506 // else
507
508 m_on_err_func = std::move(on_err_func_arg);
509
510#ifndef NDEBUG
511 const bool ok =
512#endif
513 core()->init_handlers(on_err_func_sio());
514
515 assert(ok && "We should have caught this with the above guard.");
516 return true;
517} // Session_adapter::init_handlers()
518
519template<typename Session>
522{
524 using boost::asio::connect_pipe;
525
526 Error_code sys_err_code;
527
528 connect_pipe(*reader, *writer, sys_err_code);
529 if (sys_err_code)
530 {
531 FLOW_LOG_FATAL("Session_adapter [" << m_async_io << "]: Constructing: connect-pipe failed. Details follow.");
532 FLOW_ERROR_SYS_ERROR_LOG_FATAL();
533 assert(false && "We chose not to complicate the code given how unlikely this is, and how hosed you'd have to be.");
534 std::abort();
535 }
536
537 ev_wait_hndl->assign(Native_handle(reader->native_handle()));
538}
539
540template<typename Session>
541template<typename Event_wait_func_t>
542bool Session_adapter<Session>::start_ops(Event_wait_func_t&& ev_wait_func)
543{
544 using util::Task;
545
546 if (!m_ev_wait_func.empty())
547 {
548 FLOW_LOG_WARNING("Session_adapter [" << m_async_io << "]: Start-ops requested, "
549 "but we are already started. Probably a user bug, but it is not for us to judge.");
550 return false;
551 }
552 // else
553
554 m_ev_wait_func = std::move(ev_wait_func);
555
556 /* No time to waste! on_err_func_sio() and on_channel_func_sio() had to have been called already; begin waiting
557 * for those events to be indicated via our IPC-pipes as set up in those methods. */
558
559 async_wait(&m_ev_wait_hndl_err,
560 false, // Wait for read.
561 boost::make_shared<Task>([this]()
562 {
563 FLOW_LOG_INFO("Session_adapter [" << m_async_io << "]: Async-IO core on-error event: informed via IPC-pipe; "
564 "invoking handler.");
565 util::pipe_consume(get_logger(), &m_ready_reader_err); // No need really -- it's a one-time thing -- but just....
566
567 auto on_done_func = std::move(m_on_err_func);
568 m_on_err_func.clear(); // In case move() didn't do it. Might as well forget it.
569
570 on_done_func(m_target_err_code_err);
571 FLOW_LOG_TRACE("Handler completed.");
572 }));
573
574 /* Subtlety: It is tempting to guard this with `if (!m_on_channel_func_or_empty.empty()) {}` in the effort to
575 * avoid an an async-wait that will never get satisfied. However that only works in the Client_session case,
576 * when m_on_channel_func_or_empty is known from ction. Server_session still needs init_handlers() by user,
577 * and indeed that would be after start_ops(), so => bug. We could add fancier logic to account for this,
578 * but who really cares? So it'll register an extra FD in some boost.asio epoll-set. Meh. */
579 async_wait(&m_ev_wait_hndl_chan,
580 false, // Wait for read.
581 boost::make_shared<Task>([this]() { on_ev_channel_open(); }));
582
583 FLOW_LOG_INFO("Session_adapter [" << m_async_io << "]: Start-ops requested. Done.");
584 return true;
585} // Session_adapter::start_ops()
586
587template<typename Session>
589{
590 using util::Task;
591 using flow::util::Lock_guard;
592
593 // We are in user calling thread.
594
595 typename Channel_open_result::Ptr result;
596 {
597 Lock_guard<decltype(m_target_channel_open_q_mutex)> lock(m_target_channel_open_q_mutex);
598
599 FLOW_LOG_INFO("Session_adapter [" << m_async_io << "]: Async-IO core passively-opened channel event: "
600 "informed via IPC-pipe; invoking handler. Including this one "
601 "[" << m_target_channel_open_q.size() << "] are pending.");
602
603 assert((!m_target_channel_open_q.empty())
604 && "Algorithm bug? Result-queue elements and pipe signal bytes must be 1-1, so either something "
605 "failed to correctly push, or something overzealously popped.");
606
607 result = std::move(m_target_channel_open_q.front());
608 m_target_channel_open_q.pop();
609 } // Lock_guard lock(m_target_channel_open_q_mutex);
610
611 m_on_channel_func_or_empty(std::move(result->m_channel), std::move(result->m_mdt_reader_ptr));
612 FLOW_LOG_TRACE("Handler completed. Beginning next async-wait immediately. If more is/are pending "
613 "it/they'll be popped quickly due to immediately-completing async-wait(s).");
614
615 util::pipe_consume(get_logger(), &m_ready_reader_chan);
616 async_wait(&m_ev_wait_hndl_chan,
617 false, // Wait for read -- again.
618 boost::make_shared<Task>([this]() { on_ev_channel_open(); }));
619} // Session_adapter::on_ev_channel_open()
620
621template<typename Session>
622template<typename Create_ev_wait_hndl_func>
623bool Session_adapter<Session>::replace_event_wait_handles(const Create_ev_wait_hndl_func& create_ev_wait_hndl_func)
624{
626
627 if (!m_ev_wait_func.empty())
628 {
629 FLOW_LOG_WARNING("Session_adapter [" << m_async_io << "]: Cannot replace event-wait handles after "
630 "a start-*-ops procedure has been executed. Ignoring.");
631 return false;
632 }
633 // else
634
635 FLOW_LOG_INFO("Session_adapter [" << m_async_io << "]: Replacing event-wait handles (probably to replace underlying "
636 "execution context without outside event loop's boost.asio Task_engine or similar).");
637
638 assert(m_ev_wait_hndl_err.is_open());
639 assert(m_ev_wait_hndl_chan.is_open());
640
641 Native_handle saved(m_ev_wait_hndl_err.release());
642 m_ev_wait_hndl_err = create_ev_wait_hndl_func();
643 m_ev_wait_hndl_err.assign(saved);
644
645 saved = m_ev_wait_hndl_chan.release();
646 m_ev_wait_hndl_chan = create_ev_wait_hndl_func();
647 m_ev_wait_hndl_chan.assign(saved);
648
649 return true;
650} // Session_adapter::replace_event_wait_handles()
651
652template<typename Session>
654{
655 using flow::async::Task_asio_err;
656
657 // Careful in here! *this may not have been constructed yet.
658
659 return [this](const Error_code& err_code)
660 {
661 FLOW_LOG_INFO("Session_adapter [" << m_async_io << "]: Async-IO core reports on-error event: tickling IPC-pipe to "
662 "inform user.");
663
664 assert((!m_target_err_code_err)
665 && "Error handler must never fire more than once per Session! Bug in the particular Session_obj type?");
666 m_target_err_code_err = err_code;
667
668 util::pipe_produce(get_logger(), &m_ready_writer_err);
669 };
670
671 // start_ops() will set up the (1-time) async-wait for m_ready_reader_err being readable.
672} // Session_adapter::on_err_func_sio()
673
674template<typename Session>
677{
678 using flow::util::Lock_guard;
679
680 // Careful in here! *this may not have been constructed yet.
681
682 return [this](Channel_obj&& new_channel, Mdt_reader_ptr&& new_channel_mdt)
683 {
684 // We are in Client_session_mv/Server_session_mv "unspecified" worker thread (called thread W in internal docs).
685 {
686 Lock_guard<decltype(m_target_channel_open_q_mutex)> lock(m_target_channel_open_q_mutex);
687
688 FLOW_LOG_INFO("Session_adapter [" << m_async_io << "]: Async-IO core reports passively-opened channel event: "
689 "tickling IPC-pipe to inform user. This will make the # of pending such events "
690 "[" << (m_target_channel_open_q.size() + 1) << "].");
691 m_target_channel_open_q.emplace(boost::movelib::make_unique<Channel_open_result>());
692 auto& result = *(m_target_channel_open_q.back());
693 result.m_channel = std::move(new_channel);
694 result.m_mdt_reader_ptr = std::move(new_channel_mdt);
695 } // Lock_guard lock(m_target_channel_open_q_mutex);
696
697 /* By Session contract, handlers are never called concurrently with each other. Though this
698 * in POSIX is thread-safe even otherwise (w/r/t another pipe_produce() and most certainly w/r/t
699 * pipe_consume() of the other end). */
700 util::pipe_produce(get_logger(), &m_ready_writer_chan);
701 }; // return [](){}
702
703 /* start_ops() will set up the (first) async-wait for m_ready_reader_err being readable, as well as
704 * the action on that wait being satisfied; namely: to pipe_consume() that signal + begin the next wait. */
705} // Session_adapter::on_channel_func_sio()
706
707template<typename Session>
708template<typename... Args>
710{
711 m_ev_wait_func(std::forward<Args>(args)...);
712}
713
714template<typename Session>
716{
717 return &m_async_io;
718}
719
720template<typename Session>
722{
723 return const_cast<Session_adapter*>(this)->core();
724}
725
726template<typename Session>
727flow::log::Logger* Session_adapter<Session>::get_logger() const
728{
729 return core()->get_logger();
730}
731
732template<typename Session>
733const flow::log::Component& Session_adapter<Session>::get_log_component() const
734{
735 return core()->get_log_component();
736}
737
738} // namespace ipc::session::sync_io
A documentation-only concept defining the local side of an IPC conversation (session) with another en...
Definition: session.hpp:216
unspecified Channel_obj
Each successful open_channel() and on-passive-open handler firing shall yield a concrete transport::C...
Definition: session.hpp:235
shared_ptr< typename transport::struc::schema::Metadata< Mdt_payload_obj >::Reader > Mdt_reader_ptr
Ref-counted handle to a capnp-generated Reader (and the payload it accesses) through which the user s...
Definition: session.hpp:262
typename Base::Session_obj Session_obj
Short-hand, for generic programming et al, for template parameter Session.
Internal-use workhorse containing common elements of Client_session_adapter and Server_session_adapte...
flow::async::Task_asio_err m_on_err_func
on_err_func from init_handlers(); .empty() until then.
const flow::log::Component & get_log_component() const
See flow::log::Log_context.
util::Pipe_writer m_ready_writer_err
Write-end of IPC-pipe together with m_ready_reader_err.
util::sync_io::Asio_waitable_native_handle m_ev_wait_hndl_err
Descriptor waitable by outside event loop async-waits – storing the same Native_handle as (and thus b...
typename Session_obj::Channel_obj Channel_obj
Short-hand for session-openable Channel type.
typename Session_obj::Mdt_reader_ptr Mdt_reader_ptr
Short-hand for session-open metadata reader.
void init_pipe(util::Pipe_reader *reader, util::Pipe_writer *writer, util::sync_io::Asio_waitable_native_handle *ev_wait_hndl)
Utility that sets up an IPC-pipe in the given peer objects as well as loading a watcher-descriptor ob...
void on_ev_channel_open()
Signaled by the function returned by on_channel_func_sio(), it returns the IPC-pipe to steady-state (...
On_channel_func on_channel_func_sio()
Returns the proper on-passive-channel-open handler to set up on the underlying Session_obj (Client_se...
flow::async::Task_asio_err on_err_func_sio()
Returns the proper on-error handler to set up on the underlying Session_obj (Client_session: via ctor...
bool replace_event_wait_handles(const Create_ev_wait_hndl_func &create_ev_wait_hndl_func)
See, e.g., Client_session_adapter.
Error_code m_target_err_code_err
Result given to (or about to be given to) m_on_err_func.
void async_wait(Args &&... args)
Forwards to the util::sync_io::Event_wait_func saved in start_ops().
util::Pipe_reader m_ready_reader_err
Read-end of IPC-pipe used by *this used to detect that the error-wait has completed.
flow::util::Mutex_non_recursive m_target_channel_open_q_mutex
Protects m_target_channel_open_q, accessed from user async-wait-reporter thread; and Session_obj work...
Session_adapter()
Forwards to the Session_obj default ctor.
util::sync_io::Asio_waitable_native_handle m_ev_wait_hndl_chan
Descriptor waitable by outside event loop async-waits – storing the same Native_handle as (and thus b...
Session_obj * core()
The adapted mutable Session_obj.
util::Pipe_reader m_ready_reader_chan
Read-end of IPC-pipe used by *this used to detect that a channel-open-wait has completed.
Function< void(Channel_obj &&, Mdt_reader_ptr &&)> On_channel_func
Short-hand for passive-channel-open handler.
flow::log::Logger * get_logger() const
See flow::log::Log_context.
Session Session_obj
See, e.g., Client_session_adapter.
bool start_ops(Event_wait_func_t &&ev_wait_func)
See, e.g., Client_session_adapter.
Async_io_obj m_async_io
This guy does all the work. In our dtor this will be destroyed (hence thread stopped) first-thing.
util::sync_io::Event_wait_func m_ev_wait_func
Function (set forever in start_ops()) through which we invoke the outside event loop's async-wait fac...
Channel_open_result_q m_target_channel_open_q
Queue of On_channel_func handler arg sets received from async-I/O Session_obj – meaning the Session,...
std::queue< typename Channel_open_result::Ptr > Channel_open_result_q
Queue of Channel_open_result.
bool init_handlers(Task_err &&on_err_func_arg, On_passive_open_channel_handler &&on_passive_open_channel_func_arg)
Compilable only when Session_obj is a Server_session variant, forwards to its method of identical for...
util::Pipe_writer m_ready_writer_chan
Write-end of IPC-pipe together with m_ready_reader_chan.
flow::util::Task_engine m_nb_task_engine
The Task_engine for m_ready_*.
On_channel_func m_on_channel_func_or_empty
on_passive_open_channel_func_or_empty from init_handlers() (possibly .empty() if not supplied); until...
flow::util::Task_engine m_ev_hndl_task_engine_unused
The Task_engine for m_ev_wait_hndl_*, unless it is replaced via replace_event_wait_handles().
Dummy type for use as a template param to Channel when either the blobs pipe or handles pipe is disab...
Definition: channel.hpp:1000
Useful if using the sync_io pattern within a user event loop built on boost.asio (optionally with flo...
void assign(Native_handle hndl)
Loads value to be returned by native_handle().
sync_io-pattern counterparts to async-I/O-pattern object types in parent namespace ipc::session.
util::Native_handle Native_handle
Convenience alias for the commonly used type util::Native_handle.
Function< void(Asio_waitable_native_handle *hndl_of_interest, bool ev_of_interest_snd_else_rcv, Task_ptr &&on_active_ev_func)> Event_wait_func
In sync_io pattern, concrete type storing user-supplied function invoked by pattern-implementing ipc:...
flow::async::Task Task
Short-hand for polymorphic function (a-la std::function<>) that takes no arguments and returns nothin...
Definition: util_fwd.hpp:122
void pipe_produce(flow::log::Logger *logger_ptr, Pipe_writer *pipe)
Writes a byte to the given pipe writer.
Definition: util.cpp:67
void pipe_consume(flow::log::Logger *logger_ptr, Pipe_reader *pipe)
Reads a byte via the given pipe reader.
Definition: util.cpp:96
boost::asio::writable_pipe Pipe_writer
Short-hand for anonymous pipe write end.
Definition: util_fwd.hpp:32
boost::asio::readable_pipe Pipe_reader
Short-hand for anonymous pipe read end.
Definition: util_fwd.hpp:35
flow::Error_code Error_code
Short-hand for flow::Error_code which is very common.
Definition: common.hpp:298
flow::Function< Signature > Function
Short-hand for polymorphic functor holder which is very common. This is essentially std::function.
Definition: common.hpp:302
An App that is used as a client in at least one client-server IPC split.
Definition: app.hpp:185
An App that is used as a server in at least one client-server IPC split.
Definition: app.hpp:206
Set of result arg values from a successful passive-channel-open from a Session_obj invoking On_channe...
Channel_obj m_channel
Result 1/2 given about to be given to m_on_channel_func_or_empty.
Mdt_reader_ptr m_mdt_reader_ptr
Result 2/2 given about to be given to m_on_channel_func_or_empty.
boost::movelib::unique_ptr< Channel_open_result > Ptr
Short-hand for pointer wrapper around a *this.
A monolayer-thin wrapper around a native handle, a/k/a descriptor a/k/a FD.