Flow-IPC 1.0.2
Flow-IPC project: Full implementation reference.
Asynchronicity and Integrating with Your Event Loop
MANUAL NAVIGATION: Preceding Page - Next Page - Table of Contents - Reference

Before using the most useful aspects of Flow-IPC, we must know how to integrate with an application's event loop. (Or go back to preceding page: Prerequisites and Setup.)

At its core Flow-IPC is meant to send messages to a process and receive them in that other process. As a rule, a given basic send API in Flow-IPC is synchronous, non-blocking, and can never return a "would-block" error. That is:

  • It will return quickly.
  • Via return value and/or out-arg and/or lack/presence of thrown exception, it will report either total success; or failure indicating an unrecoverable error (meaning there is no point in trying the send operation later: the out-pipe at least is hosed).
    • There is no such error as "would-block." That is, "buffers full; try send again later" is not a thing in our APIs. (If you're worried about performance implications of this – don't be. However this is not the page to discuss that.)

However, by their very nature, this is not the case for receive APIs (as well as spiritually similar operations like "accept"). By definition, at a given point in time, if I ask to receive something over an IPC channel, there may be no data available to receive at that moment, so the application must wait until just when the data have arrived and only then inform the part of the application interested in the received message. This page discusses how to accomplish this integration between a conceptual background receive operation (on the part of Flow-IPC) and the rest of your application.

The topic of event loops, asynchronicity, reactors, proactors, etc., in a C++ program is complex and expansive, and any comprehensive discussion thereof is well outside our scope. We do try to be helpful and skim the topic as needed for our purposes, but ultimately some experience and/or outside research on your part may be necessary for deep understanding.

Your application's event loop versus an Flow-IPC async op

Your application has a main thread used for logic that would make use of the results of IPC receives (and similar async[hronous] op[eration]s). It may use several such IPC-interested worker threads concurrently as well, in which case it is your responsibility to synchronize (guard against concurrency) between them. For simplicity of exposition we will exclusively discuss the one-thread scenario here; the same discussion can be generalized to multiple threads by the reader.

Let's call this thread, thread U (U for user). (Again – if you have multiple threads, presumably to make user of multiple processors cores, then the same conversation can be generalized to such threads U1, U2, ....)

Now consider a simple example Flow-IPC async-op (the same principles will apply to all others except where noted); we'll use it for further discussion of the topic. The async op: ipc::transport::Blob_receiver being asked to receive a single message, in the form of a blob, when it becomes available. While Flow-IPC waits for an incoming blob (conceptually in the background), thread U must be usable for other tasks (e.g., sending or receiving HTTP traffic; periodic logging and reporting; who knows); but once a blob does arrive:

  • if thread U is sleeping, it must be woken up and given the resulting blob, which it will handle as it sees fit;
  • if it is doing something, then as soon as that work is done, it should be given the resulting blob, which it will handle as it sees fit.

The Blob_receiver having to conceptually background-wait for incoming traffic and report the result to thread U, in practice can occur in one of two places:

  • A separate worker thread (call it thread W): Flow-IPC may start thread W internally and have it sleep until the kernel, or some other mechanism, wakes it up and gives it some incoming data, which it would then potentially package into a complete message blob. Now thread W would signal thread U somehow, and give it the result-blob; thread U would handle it as soon as it is free (which might be immediately, if it's sleeping, or somewhat later if it's doing something else).
  • Thread U itself: Flow-IPC may cooperate with the coder of the application by sharing thread U (or multiple threads U1, U2, ...!) with its other tasks unknown to it. Thread U would sleep until the earlier of the following occurs:
    • There are incoming data to the Blob_receiver.
    • Some other event of interest to thread U has occurred.

Which setup you will use is an important decision, especially in terms of how your code is structured overall. Event loop integration comprises this decision – and further decisions downstream of it.

Flow-IPC supports both ways of structuring your application. Moreover, there are different API variations depending on which one is chosen. We shall soon discuss specifics.

Furthermore, in the rest of the manual, for reasons of clarity and brevity, we have to mostly choose one event-loop-integration approach. We expect the reader will tailor their understanding to the other event-loop-integration approaches if and as needed. That said let's discuss these various approaches.

Async-I/O pattern (default approach): Proactor pattern with separate Flow-IPC worker thread(s)

This pattern is the one used in most of this Manual, except when specifically discussing event loop integration approach alternatives. I.e., this is the aforementioned one event-loop-integration approach referred-to just above.

Regular users of boost.asio (on-track for the C++ standard), will be familiar with this pattern/approach. In it, thread U invokes a central run() method in which the entire event loop runs, until the decision is made to exit the loop (usually permanently). (run() can also be called in each of multiple threads U1, U2, ....) This method maintains an internal queue of tasks (basically, void functions); when the queue is empty it sleeps until a task is pushed onto it which it detects and immediately executes (and pops). Once that task exits, it executes any more tasks that may have been pushed onto the queue in the meantime – whether by 1+ of those tasks themselves, or other concurrent threads (spoiler alert: such as our thread W). Once there are no more tasks, it sleeps again.

Note
Furthermore this sleep-until-tasks-queued-then-wake operation inside run() is integrated with OS-level events: If some part of the application wants to do something once TCP socket X, or Unix domain socket Y, receives data, then the kernel can be told to wake up the run()'s special "sleep" the moment 1+ of those sockets are readable. (Internally this is accomplished via select(), poll(), epoll_wait(), etc., which can be told to sleep until 1+ native handles have an active event.) Because of this there is not always a need to start a separate blocking thread W that would enqueue items onto the run() task-queue; instead the kernel has "special privileges" due to being integrated, at a low level, with the poll()/whatever OS-call.

That note aside: The most basic operation of interest to us has the following form:

template<typename Task> // Task = function type matching signature `void F()`.
void post(Task&& task);
flow::async::Task Task
Short-hand for polymorphic function (a-la std::function<>) that takes no arguments and returns nothin...
Definition: util_fwd.hpp:122

This operation, invokable both from within an already-executing task on thread U, or from any other thread, enqueues std::move(task) onto the task-queue, so that run() either wakes up thread U (if currently sleeping) and executes task() (no args), or else it'll get invoked once ongoing tasks have been executed and popped. post() itself does not synchronously invoke task(); but if invoked from within a task (i.e., from thread U, i.e., from run() itself), then run() will possibly invoke the moment the current (posting) task returns.

Note
post() is thread-safe, in that it takes care of any synchronization of the internally-kept task-queue, itself.

Now, consider the aforementioned Blob_receiver receive op. Its API is as follows:

template<typename Handler> // Handler = function type matching signature `void F(const Error_code&, size_t n_sz)`.
void async_receive_blob(ipc::util::Blob_mutable target_buf, Handler&& handler);
boost::asio::mutable_buffer Blob_mutable
Short-hand for an mutable blob somewhere in memory, stored as exactly a void* and a size_t.
Definition: util_fwd.hpp:140

To use it, a task in the context of an event loop in thread U might be written as follows:

class My_loop
{
array<uint8_t, 8192> m_buf; // This must be alive until handler() runs; so we choose to keep it as a data member.
Blob_receiver m_rcv;
void start_receive()
{
// We are in thread U.
m_rcv.async_receive_blob(ipc::util::Blob_mutable(m_buf), // Target buffer starting at m_buf.begin(), length .size() == 8192.
[this](const Error_code& err_code, size_t n_sz)
{
// We are in "unspecified Flow-IPC thread" (really, thread W)! Must post() (unless shutting down anyway).
{
// Stuff is shutting down; just get out.
return;
}
// else
post([this, err_code, n_sz]()
{
// We are in thread U.
if (err_code)
{
// Fatal error; m_rcv hosed.
// ...Handle it appropriately here....
// And usually that's it.
return;
}
// else: All good. Result blob is at m_buf.begin(), is n_sz long.
on_receive(n_sz);
}
});
}
void on_receive(size_t n_sz)
{
// We are in thread U.
// ...Handle the received blob at m_buf.begin() of size n_sz....
// Perhaps async-receive the next one:
start_receive();
}
};
@ S_OBJECT_SHUTDOWN_ABORTED_COMPLETION_HANDLER
Async completion handler is being called prematurely, because underlying object is shutting down,...
flow::Error_code Error_code
Short-hand for flow::Error_code which is very common.
Definition: common.hpp:298

The key points:

  • When operating according to this pattern, the user handler is always invoked from (formally) "unspecified thread that is not thread U." In reality it is thread W in our case, but that should not matter.
  • In the proactor pattern – when there's only one thread U as opposed to concurrent workers U1, U2, ... – user tasks can safely assume they're never invoked concurrently with each other. Therefore, usually, the completion handler supplied to this Flow-IPC API (Blob_receiver::async_receive_blob() in this case) should perform the bulk of its work in thread U and thus must invoke post(), capturing any required args in the lambda first.
    • The only exception: The special-meaning Error_code S_OBJECT_SHUTDOWN_ABORTED_COMPLETION_HANDLER (spiritually equivalent to boost.asio's operation_aborted) indicates the async-op is of the one-off variety, where the completion handler is guaranteed to be invoked exactly once; and in particular if the source object is destroyed before the operation completes in the background, then it still invokes the completion handler but reports operation-aborted. This means the user should, usually, just no-op, as stuff (including quite possibly their own *this) is probably in the middle of destruction. In that case one might as well immediately return; rather than post() a thing that would basically do so anyway.
  • If the async-op targets some kind of user resource (in this case a simple buffer in memory), then that resource must be available until the completion handler is invoked. Otherwise undefined behavior results.
    • Other techniques beyond maintaining a long-lived data member (like m_buf above) exist. In particular it's common to allocate and lambda-capture a shared_ptr to the underlying resource.

To reiterate: In most of the rest of the Manual, we use the above pattern. In particular we assume the existence of a post() method as shown above and use it regularly in example code. Note that the exact API your application uses will be spiritually identical to post(F) (where F() is the task) but specifically different. In particular/for example:

  • If you are using boost.asio, it'll be post(E, F), where E is usually a boost::asio::io_context (or strand).
  • If you are using flow::async (a boost.asio-helper), it'll be L.post(F), where L is Single_thread_task_loop (1 user worker thread) or Concurrent_task_loop (2+ threads).
    • Or a lower-level utility is asio_exec_ctx(..., E, ..., F), which is a variant of boost.asio's post(E, F) with certain added options.

This pattern is referred to as async-I/O pattern.

I'm a boost.asio user. Why do I have to post()? Can't I just give your constructor my io_context, and then you'll place the completion handler directly onto it?

In other words, why didn't we just make Blob_receiver (among others) a boost.asio I/O-object, like boost::asio::ip::tcp::socket?

We feel you. We use boost.asio ourselves internally after all! However the first priority was to supply a more general API, so that one does not need to orient their event loop around boost.asio machinery specifically – but merely have a task-queue and post()-like API available. That said:

Todo:
We may supply an alternative API wherein Flow-IPC objects can be boost.asio I/O objects themselves, similarly to boost::asio::ip::tcp::socket.

Alternate approaches: sharing thread U; reactor pattern

Let's set aside, for the moment, the stylistic organization of the event loop. Purely in terms of performance, is there any problem with the approach outlined above, where thread(s) like thread W is/are launched by Flow-IPC in the background and then signal thread(s) like thread U when background events occur?

Yes and no. For most applications the only perf cost is that of (1) context switching between user threads U1, U2, ... versus Flow-IPC internal threads W1, W2, ....; and (2) the signaling necessary for a thread W to inform thread U to wake up. (2) is, essentially, a condition-variable ping and is rather cheap. (1) has a non-zero cost but for most applications, with modern hardware, is usually taken to be negligible.

However that is not an absolute truth. Some applications are extremely latency-sensitive and don't want to add even single microseconds of delay, if they can be avoided. Hence there is a non-zero market for, after all, having Flow-IPC and the application share thread U (or U1, U2, ...) – based on fine performance needs.

Furthermore some (most?) application event loops are not written in the proactor style but rather reactor style; that is they may not feature a post()-like API, or a task-queue, at all. Instead such an application, especially an old-school networking server, might manually use a select()/poll()/epoll_wait() in thread U, which sleeps until the kernel wakes it up due to a socket event, or a timeout expires (usually set to the remaining time on the next-soonest-to-fire timer event). Having woken up it would handle all active events and/or the applicable timer(s) and go back into the poll()/whatever.

It is possible and reasonable to modify such an event loop to integrate with the async-I/O-pattern API (that starts background worker threads W as needed) explained above, and some event-loop libraries already have that capability. It involves, basically, keeping a mutex-protected task (function pointer/function object) queue, plus a trivial IPC handle – e.g., to an unnamed pipe – which participates in the central poll()/whatever. In the thread W-invoked handler passed-to (e.g.) Blob_receiver::async_receive_blob(), one would invoke some API that locks the mutex, pushes the "true" handler (that the user wants to run in thread U) onto the task-queue, unlocks the mutex, and sends a byte to the aforementioned pipe. This would wake (if needed) the poll()/whatever, code subsequent to whose return would then synchronously invoke all the tasks in the task queue (locking the mutex of course).

However, in that case, the possible performance deficit due to the switching between threads U and W and the required signaling may still be a concern for some applications.

Let's discuss what one might do to share thread U instead while using Flow-IPC.

Proactor pattern while sharing thread U

Suppose we wanted to use Flow-IPC in a way as closely resembling the proactor approach outlined above (Blob_receiver::async_receive_blob() for example) as possible – but keep the work in thread U; that is avoiding the post()-onto-thread-U step in the handler passed-to async_receive_blob(). This might be the best of both worlds: fewer threads/context switching; code that is similar to the above already-reasonable-simple pattern – but even simpler still. Sounds good!

The best and de-facto-standard C++ proactor event-loop API is boost.asio. The authors are not aware of any better or more general way to discuss proactor-style event loops. (flow::async makes it easier to work with boost.asio, but it is still in essence working with boost.asio.) Therefore the problem of using proactor pattern, but with thread U alone, reduces to using boost.asio but with thread U alone.

Reactor (poll()-y) pattern while sharing thread U

Lastly, as noted before, the application might be oriented around an old-school – but nevertheless proven and effective – reactor-pattern event loop summarized above. Does Flow-IPC have a way of integrating with such an event loop without starting unnecessary threads W (i.e., sharing thread U with the application)?

Such an API shall be oriented around native handles; i.e., it will provide certain internal handles (FDs in Linux and friends) and expect the application to wait for activity on those handles, among others of interest to the application, and to invoke Flow-IPC processing to then handle the relevant events and produce relevant results – such as "a message blob has been received as the user has requested; here it is" – directly in thread U. Or – for that matter – from multiple (of your) threads U1, U2, ....

Okay, so then... how?

Both alternate approaches described above are possible by using the following Flow-IPC alternate (to the async-I/O pattern) API: the sync_io pattern.

sync_io pattern (alternate approach)

For every conceptual object type in Flow-IPC that conceptually performs asynchronous (long-running, blocking, not-non-blocking, annoying – take your pick) operations, there are indeed 2 mutually exclusive APIs available. So for each conceptual "thing," there are 2 similarly named classes or class templates. They are:

  • The (recommended by default) async-I/O-pattern API. Let's say it's called "N::X", where N is a namespace, while X is a class or class template (or alias to it).
    • Again: this is the one we use in almost all discussion and examples in this Manual elsewhere.
  • The (alternate, advanced) sync_io-pattern API. It shall typically be called "N::sync_io::X". (At any rate, it shall always live in a sub-namespace named sync_io.)

In actual fact, for perf-sensitive Xes, usually the sync_io::X is a lighter-weight core with the nitty-gritty logic of dealing with whatever it is that an X represents (Unix domain socket, MQ, channel...). Then "plain" X – the async-I/O variety – is built around such a sync_io core. In these cases, an X can always be constructed from an std::move(sync_io::X) at low computational cost. This might be counterintuitive at first, but it actually makes sense:

  • The sync_io core allows for great flexibility on the user's part, but is (all else being equal) harder to use.
  • The async-I/O adapter of that core adds more "stuff" to make it work on top of that core; but it is easier for the user to use.
    • In fact such an async-I/O adapter is eating our own dog food: it is always a use case of how a sync_io::X can be effectively leveraged in a 2+-thread situation (user thread U, worker thread W). If this use case is itself not sufficient for your application, then you can use the core directly yourself.

So we'd like you to use the async-I/O X when possible, otherwise sync_io::X when that is insufficient.

How does a sync_io-pattern API work? We won't restate it here, or indeed (as of this writing) feature a Manual page for it. It is an advanced topic and is fully covered in the reference documentation. Namely find it here: ipc::util::sync_io doc header. Loosely speaking it supplies an API analogous to OpenSSL's SSL_read() sometimes issuing SSL_ERROR_WANT_READ or SSL_ERROR_WANT_WRITE.

That said a quick note w/r/t to how one might use the ipc::util::sync_io APIs to implement the (1) "sharing thread U" and (2) "reactor pattern" possibilities mentioned above:

Here is a list of async-I/O-pattern APIs and their sync_io counterparts.

In all cases, with the following exceptions, the object type is considered perf-sensitive, and therefore is indeed arranged as follows: sync_io::X core; X adapts such a core and be constructed from one cheaply.

The exceptions:

  • (#) ipc::transport::Channel is a bundling template, essentially combining ..._sender and ..._receiver peer-objects in one object. It bundles either Xes or sync_io::Xes, but the same template handles both duties. It contains little logic: mainly just bundling. Hence it can pull it off without issue. Hence there's no sync_io::Channel.
    • An async-I/O-bearing Channel can be trivially created from a sync_io-bearing one: auto async_ch = sync_ch.async_io_obj().
  • (**) Not being perf-sensitive, internally the sync_io::Native_socket_stream_acceptor is actually built around an async-I/O Native_socket_stream_acceptor. Makes no difference to the user: the APIs work regardless of how it is implemented internally. That said one cannot construct one from the other or vice versa.
  • (*) Not being perf-sensitive, all the Session and Session_server types are written primarily in async-I/O fashion. However, for those wishing to more easily integrate a poll()-y event loop with session-connect (client) and session-accept (server) operations, a sync_io API is available. Use Client_session_adapter to adapt any Client_session-like type (namely the templates listed there) – simply supply the latter type as the template arg. Ditto Server_session_adapter for any Server_session-like type. Lastly Session_server_adapter adapts any (of the 3) Session_server-like types.

Lastly: for generic programming each relevant type features a couple of alias members:

  • Sync_io_obj: In an X: this is sync_io::X. In a sync_io::X: this is empty type Null_peer.
  • Async_io_obj: Conversely in a sync_io::X: this is X. In an X: this is empty type Null_peer.

E.g., Blob_stream_mq_sender<Posix_mq_handle>::Sync_obj is sync_io::Blob_stream_mq_sender<Posix_mq_handle> and conversely via "::Async_obj".


When one object generates/subsumes another object
As of this writing these are such cases:
  • struc::Channel::Channel(): The struc::Channel subsumes an unstructured, pre-opened ipc::transport::Channel via move() semantics.
  • Native_socket_stream_acceptor::async_accept(): The acceptor generates Native_socket_streams.
  • Session::open_channel() + Session passive-opening + Session init-channels: Session generates Channels.
  • Session_server::async_accept(): Session_server generates Server_sessions.
In the latter case: the generator guy (whether vanilla/async-I/O Session_server or sync_io::Session_server_adapter<Session_server>) generates an object of the same API-type. I.e., async-I/O guy generates async-I/O guys; sync_io guy generates sync_io guys. Rationale: These are not perf-critical, and we reasoned that if one went to the trouble of wanting to use a sync_io server, then they'll want it to generate sync_io sessions to integrate into the same, or similarly built, event loop.
In the former three cases: the generator guy (whether async-I/O-pattern or not!) generates sync_io cores. I.e., acceptor fills-out a sync_io socket-stream; open-channel method/passive-open handler/session-async-accept/session-sync-connect each fills-out a sync_io channel object. The subsumer guy (whether async-I/O-pattern or not!) subsumes a sync_io core. I.e., a structured-channel ctor takes over a sync_io channel (which may have in fact been generated by Session::open_channel()... synergy!). Rationale: sync_io cores are lighter-weight to generate and pass around, and one can always very easily then "upgrade" them to async-I/O objects as needed:

Recap

The simplest event-loop integration approach is the proactor-pattern-with-Flow-IPC-internally-starting-threads-as-needed: the async-I/O pattern. How to use this API style is explained here. Most of the rest of the Manual uses this approach in examples and discussion. In particular we'll routinely use post() as described there.

We do this (1) for simplicity of exposition; and (2) because, in the absence of compelling objections, we feel it is the best approach to use. However such objections may, indeed, sometimes apply; namely in the presence of (1) a legacy reactor loop; and/or (2) very stringent perf restrictions sensitive even to context switching. In that case, whether merging Flow-IPC worker thread(s) into your own or integrating with a *poll()-y loop, use the alternate sync_io pattern, per: ipc::util::sync_io doc header.

We're now ready to make Flow-IPC actually do something. Let's move on to Sessions: Setting Up an IPC Context.


MANUAL NAVIGATION: Preceding Page - Next Page - Table of Contents - Reference