Flow-IPC 1.0.0
Flow-IPC project: Public API.
|
Before using the most useful aspects of Flow-IPC, we must know how to integrate with an application's event loop. (Or go back to preceding page: Prerequisites and Setup.)
At its core Flow-IPC is meant to send messages to a process and receive them in that other process. As a rule, a given basic send API in Flow-IPC is synchronous, non-blocking, and can never return a "would-block" error. That is:
However, by their very nature, this is not the case for receive APIs (as well as spiritually similar operations like "accept"). By definition, at a given point in time, if I ask to receive something over an IPC channel, there may be no data available to receive at that moment, so the application must wait until just when the data have arrived and only then inform the part of the application interested in the received message. This page discusses how to accomplish this integration between a conceptual background receive operation (on the part of Flow-IPC) and the rest of your application.
The topic of event loops, asynchronicity, reactors, proactors, etc., in a C++ program is complex and expansive, and any comprehensive discussion thereof is well outside our scope. We do try to be helpful and skim the topic as needed for our purposes, but ultimately some experience and/or outside research on your part may be necessary for deep understanding.
Your application has a main thread used for logic that would make use of the results of IPC receives (and similar async[hronous] op[eration]s). It may use several such IPC-interested worker threads concurrently as well, in which case it is your responsibility to synchronize (guard against concurrency) between them. For simplicity of exposition we will exclusively discuss the one-thread scenario here; the same discussion can be generalized to multiple threads by the reader.
Let's call this thread, thread U (U for user). (Again – if you have multiple threads, presumably to make user of multiple processors cores, then the same conversation can be generalized to such threads U1, U2, ....)
Now consider a simple example Flow-IPC async-op (the same principles will apply to all others except where noted); we'll use it for further discussion of the topic. The async op: ipc::transport::Blob_receiver being asked to receive a single message, in the form of a blob, when it becomes available. While Flow-IPC waits for an incoming blob (conceptually in the background), thread U must be usable for other tasks (e.g., sending or receiving HTTP traffic; periodic logging and reporting; who knows); but once a blob does arrive:
The Blob_receiver
having to conceptually background-wait for incoming traffic and report the result to thread U, in practice can occur in one of two places:
Blob_receiver
.Which setup you will use is an important decision, especially in terms of how your code is structured overall. Event loop integration comprises this decision – and further decisions downstream of it.
Flow-IPC supports both ways of structuring your application. Moreover, there are different API variations depending on which one is chosen. We shall soon discuss specifics.
Furthermore, in the rest of the manual, for reasons of clarity and brevity, we have to mostly choose one event-loop-integration approach. We expect the reader will tailor their understanding to the other event-loop-integration approaches if and as needed. That said let's discuss these various approaches.
This pattern is the one used in most of this Manual, except when specifically discussing event loop integration approach alternatives. I.e., this is the aforementioned one event-loop-integration approach referred-to just above.
Regular users of boost.asio (on-track for the C++ standard), will be familiar with this pattern/approach. In it, thread U invokes a central run()
method in which the entire event loop runs, until the decision is made to exit the loop (usually permanently). (run()
can also be called in each of multiple threads U1, U2, ....) This method maintains an internal queue of tasks (basically, void
functions); when the queue is empty it sleeps until a task is pushed onto it which it detects and immediately executes (and pops). Once that task exits, it executes any more tasks that may have been pushed onto the queue in the meantime – whether by 1+ of those tasks themselves, or other concurrent threads (spoiler alert: such as our thread W). Once there are no more tasks, it sleeps again.
run()
is integrated with OS-level events: If some part of the application wants to do something once TCP socket X, or Unix domain socket Y, receives data, then the kernel can be told to wake up the run()
's special "sleep" the moment 1+ of those sockets are readable. (Internally this is accomplished via select()
, poll()
, epoll_wait()
, etc., which can be told to sleep until 1+ native handles have an active event.) Because of this there is not always a need to start a separate blocking thread W that would enqueue items onto the run()
task-queue; instead the kernel has "special privileges" due to being integrated, at a low level, with the poll()
/whatever OS-call.That note aside: The most basic operation of interest to us has the following form:
This operation, invokable both from within an already-executing task on thread U, or from any other thread, enqueues std::move(task)
onto the task-queue, so that run()
either wakes up thread U (if currently sleeping) and executes task()
(no args), or else it'll get invoked once ongoing tasks have been executed and popped. post()
itself does not synchronously invoke task()
; but if invoked from within a task (i.e., from thread U, i.e., from run()
itself), then run()
will possibly invoke the moment the current (posting) task returns.
post()
is thread-safe, in that it takes care of any synchronization of the internally-kept task-queue, itself.Now, consider the aforementioned Blob_receiver
receive op. Its API is as follows:
To use it, a task in the context of an event loop in thread U might be written as follows:
The key points:
Blob_receiver::async_receive_blob()
in this case) should perform the bulk of its work in thread U and thus must invoke post()
, capturing any required args in the lambda first.Error_code S_OBJECT_SHUTDOWN_ABORTED_COMPLETION_HANDLER
(spiritually equivalent to boost.asio's operation_aborted
) indicates the async-op is of the one-off variety, where the completion handler is guaranteed to be invoked exactly once; and in particular if the source object is destroyed before the operation completes in the background, then it still invokes the completion handler but reports operation-aborted. This means the user should, usually, just no-op, as stuff (including quite possibly their own *this
) is probably in the middle of destruction. In that case one might as well immediately return;
rather than post()
a thing that would basically do so anyway.m_buf
above) exist. In particular it's common to allocate and lambda-capture a shared_ptr
to the underlying resource.To reiterate: In most of the rest of the Manual, we use the above pattern. In particular we assume the existence of a post()
method as shown above and use it regularly in example code. Note that the exact API your application uses will be spiritually identical to post(F)
(where F()
is the task) but specifically different. In particular/for example:
post(E, F)
, where E
is usually a boost::asio::io_context
(or strand
).flow::async
(a boost.asio-helper), it'll be L.post(F)
, where L
is Single_thread_task_loop
(1 user worker thread) or Concurrent_task_loop
(2+ threads).asio_exec_ctx(..., E, ..., F)
, which is a variant of boost.asio's post(E, F)
with certain added options.This pattern is referred to as async-I/O pattern.
post()
? Can't I just give your constructor my io_context
, and then you'll place the completion handler directly onto it?In other words, why didn't we just make Blob_receiver
(among others) a boost.asio I/O-object, like boost::asio::ip::tcp::socket
?
We feel you. We use boost.asio ourselves internally after all! However the first priority was to supply a more general API, so that one does not need to orient their event loop around boost.asio machinery specifically – but merely have a task-queue and post()
-like API available. That said:
boost::asio::ip::tcp::socket
.Let's set aside, for the moment, the stylistic organization of the event loop. Purely in terms of performance, is there any problem with the approach outlined above, where thread(s) like thread W is/are launched by Flow-IPC in the background and then signal thread(s) like thread U when background events occur?
Yes and no. For most applications the only perf cost is that of (1) context switching between user threads U1, U2, ... versus Flow-IPC internal threads W1, W2, ....; and (2) the signaling necessary for a thread W to inform thread U to wake up. (2) is, essentially, a condition-variable ping and is rather cheap. (1) has a non-zero cost but for most applications, with modern hardware, is usually taken to be negligible.
However that is not an absolute truth. Some applications are extremely latency-sensitive and don't want to add even single microseconds of delay, if they can be avoided. Hence there is a non-zero market for, after all, having Flow-IPC and the application share thread U (or U1, U2, ...) – based on fine performance needs.
Furthermore some (most?) application event loops are not written in the proactor style but rather reactor style; that is they may not feature a post()
-like API, or a task-queue, at all. Instead such an application, especially an old-school networking server, might manually use a select()
/poll()
/epoll_wait()
in thread U, which sleeps until the kernel wakes it up due to a socket event, or a timeout expires (usually set to the remaining time on the next-soonest-to-fire timer event). Having woken up it would handle all active events and/or the applicable timer(s) and go back into the poll()
/whatever.
It is possible and reasonable to modify such an event loop to integrate with the async-I/O-pattern API (that starts background worker threads W as needed) explained above, and some event-loop libraries already have that capability. It involves, basically, keeping a mutex-protected task (function pointer/function object) queue, plus a trivial IPC handle – e.g., to an unnamed pipe – which participates in the central poll()
/whatever. In the thread W-invoked handler passed-to (e.g.) Blob_receiver::async_receive_blob()
, one would invoke some API that locks the mutex, pushes the "true" handler (that the user wants to run in thread U) onto the task-queue, unlocks the mutex, and sends a byte to the aforementioned pipe. This would wake (if needed) the poll()
/whatever, code subsequent to whose return would then synchronously invoke all the tasks in the task queue (locking the mutex of course).
However, in that case, the possible performance deficit due to the switching between threads U and W and the required signaling may still be a concern for some applications.
Let's discuss what one might do to share thread U instead while using Flow-IPC.
Suppose we wanted to use Flow-IPC in a way as closely resembling the proactor approach outlined above (Blob_receiver::async_receive_blob()
for example) as possible – but keep the work in thread U; that is avoiding the post()
-onto-thread-U step in the handler passed-to async_receive_blob()
. This might be the best of both worlds: fewer threads/context switching; code that is similar to the above already-reasonable-simple pattern – but even simpler still. Sounds good!
The best and de-facto-standard C++ proactor event-loop API is boost.asio. The authors are not aware of any better or more general way to discuss proactor-style event loops. (flow::async
makes it easier to work with boost.asio, but it is still in essence working with boost.asio.) Therefore the problem of using proactor pattern, but with thread U alone, reduces to using boost.asio but with thread U alone.
poll()
-y) pattern while sharing thread ULastly, as noted before, the application might be oriented around an old-school – but nevertheless proven and effective – reactor-pattern event loop summarized above. Does Flow-IPC have a way of integrating with such an event loop without starting unnecessary threads W (i.e., sharing thread U with the application)?
Such an API shall be oriented around native handles; i.e., it will provide certain internal handles (FDs in Linux and friends) and expect the application to wait for activity on those handles, among others of interest to the application, and to invoke Flow-IPC processing to then handle the relevant events and produce relevant results – such as "a message blob has been received as the user has requested; here it is" – directly in thread U. Or – for that matter – from multiple (of your) threads U1, U2, ....
Both alternate approaches described above are possible by using the following Flow-IPC alternate (to the async-I/O pattern) API: the sync_io
pattern.
sync_io
pattern (alternate approach) For every conceptual object type in Flow-IPC that conceptually performs asynchronous (long-running, blocking, not-non-blocking, annoying – take your pick) operations, there are indeed 2 mutually exclusive APIs available. So for each conceptual "thing," there are 2 similarly named classes or class templates. They are:
"N::X"
, where N
is a namespace, while X
is a class or class template (or alias to it).sync_io
-pattern API. It shall typically be called "N::sync_io::X"
. (At any rate, it shall always live in a sub-namespace named sync_io
.)In actual fact, for perf-sensitive X
es, usually the sync_io::X
is a lighter-weight core with the nitty-gritty logic of dealing with whatever it is that an X
represents (Unix domain socket, MQ, channel...). Then "plain" X
– the async-I/O variety – is built around such a sync_io
core. In these cases, an X
can always be constructed from an std::move(sync_io::X)
at low computational cost. This might be counterintuitive at first, but it actually makes sense:
sync_io
core allows for great flexibility on the user's part, but is (all else being equal) harder to use.sync_io::X
can be effectively leveraged in a 2+-thread situation (user thread U, worker thread W). If this use case is itself not sufficient for your application, then you can use the core directly yourself.So we'd like you to use the async-I/O X
when possible, otherwise sync_io::X
when that is insufficient.
How does a sync_io
-pattern API work? We won't restate it here, or indeed (as of this writing) feature a Manual page for it. It is an advanced topic and is fully covered in the reference documentation. Namely find it here: ipc::util::sync_io doc header. Loosely speaking it supplies an API analogous to OpenSSL's SSL_read()
sometimes issuing SSL_ERROR_WANT_READ
or SSL_ERROR_WANT_WRITE
.
That said a quick note w/r/t to how one might use the ipc::util::sync_io APIs to implement the (1) "sharing thread U" and (2) "reactor pattern" possibilities mentioned above:
Here is a list of async-I/O-pattern APIs and their sync_io
counterparts.
sync_io
-pattern) ipc::transport::sync_io::Blob_sender, ipc::transport::sync_io::Native_handle_sender, ipc::transport::sync_io::Blob_receiver, ipc::transport::sync_io::Native_handle_receiver.sync_io
-pattern) ipc::transport::sync_io::Native_socket_stream_acceptor.sync_io
-pattern) ipc::transport::struc::sync_io::Channel.sync_io
-pattern) ipc::session::sync_io::Client_session_adapter (*), ipc::session::sync_io::Server_session_adapter (*).sync_io
-pattern) ipc::session::sync_io::Session_server_adapter (*).In all cases, with the following exceptions, the object type is considered perf-sensitive, and therefore is indeed arranged as follows: sync_io::X
core; X
adapts such a core and be constructed from one cheaply.
The exceptions:
..._sender
and ..._receiver
peer-objects in one object. It bundles either X
es or sync_io::X
es, but the same template handles both duties. It contains little logic: mainly just bundling. Hence it can pull it off without issue. Hence there's no sync_io::Channel
.Channel
can be trivially created from a sync_io
-bearing one: auto async_ch = sync_ch.async_io_obj()
.sync_io::Native_socket_stream_acceptor
is actually built around an async-I/O Native_socket_stream_acceptor
. Makes no difference to the user: the APIs work regardless of how it is implemented internally. That said one cannot construct one from the other or vice versa.Session
and Session_server
types are written primarily in async-I/O fashion. However, for those wishing to more easily integrate a poll()
-y event loop with session-connect (client) and session-accept (server) operations, a sync_io
API is available. Use Client_session_adapter
to adapt any Client_session
-like type (namely the templates listed there) – simply supply the latter type as the template arg. Ditto Server_session_adapter
for any Server_session
-like type. Lastly Session_server_adapter
adapts any (of the 3) Session_server
-like types.Lastly: for generic programming each relevant type features a couple of alias members:
Sync_io_obj
: In an X
: this is sync_io::X
. In a sync_io::X
: this is empty type Null_peer
.Async_io_obj
: Conversely in a sync_io::X
: this is X
. In an X
: this is empty type Null_peer
.E.g., Blob_stream_mq_sender<Posix_mq_handle>::Sync_obj
is sync_io::Blob_stream_mq_sender<Posix_mq_handle>
and conversely via "::Async_obj"
.
struc::Channel::Channel()
: The struc::Channel
subsumes an unstructured, pre-opened ipc::transport::Channel via move()
semantics.Native_socket_stream_acceptor::async_accept()
: The acceptor generates Native_socket_stream
s.Session::open_channel()
+ Session
passive-opening + Session
init-channels: Session
generates Channel
s.Session_server::async_accept()
: Session_server
generates Server_session
s.Session_server
or sync_io::Session_server_adapter<Session_server>
) generates an object of the same API-type. I.e., async-I/O guy generates async-I/O guys; sync_io
guy generates sync_io
guys. Rationale: These are not perf-critical, and we reasoned that if one went to the trouble of wanting to use a sync_io
server, then they'll want it to generate sync_io
sessions to integrate into the same, or similarly built, event loop.sync_io
cores. I.e., acceptor fills-out a sync_io
socket-stream; open-channel method/passive-open handler/session-async-accept/session-sync-connect each fills-out a sync_io
channel object. The subsumer guy (whether async-I/O-pattern or not!) subsumes a sync_io
core. I.e., a structured-channel ctor takes over a sync_io
channel (which may have in fact been generated by Session::open_channel()
... synergy!). Rationale: sync_io
cores are lighter-weight to generate and pass around, and one can always very easily then "upgrade" them to async-I/O objects as needed:sync_io::Native_socket_stream&&
.The simplest event-loop integration approach is the proactor-pattern-with-Flow-IPC-internally-starting-threads-as-needed: the async-I/O pattern. How to use this API style is explained here. Most of the rest of the Manual uses this approach in examples and discussion. In particular we'll routinely use post()
as described there.
We do this (1) for simplicity of exposition; and (2) because, in the absence of compelling objections, we feel it is the best approach to use. However such objections may, indeed, sometimes apply; namely in the presence of (1) a legacy reactor loop; and/or (2) very stringent perf restrictions sensitive even to context switching. In that case, whether merging Flow-IPC worker thread(s) into your own or integrating with a *poll()
-y loop, use the alternate sync_io
pattern, per: ipc::util::sync_io doc header.
We're now ready to make Flow-IPC actually do something. Let's move on to Sessions: Setting Up an IPC Context.