Flow-IPC 1.0.2
Flow-IPC project: Full implementation reference.
|
Here we show how Flow-IPC fulfills its aims; let's take a high-level tour through the available API. (The preceding page (Manual Start) Flow-IPC: Bird's-eye View summarizes the aims of Flow-IPC.)
We provide a synopsis in ~each section below, to give a sense of what your code using that feature would look like. It's both a survey and a cheat sheet, but it is not comprehensive the way the individual subsequent pages, or the Reference, are. Please jump to a particular page to drill down on any topic of interest (Table of Contents).
No matter the particular IPC transport (local stream socket, MQ, SHM pool), an ever-present need is to establish a naming scheme and coordinate it between the participant processes. To connect one Unix domain socket, one needs to name a server socket, and the connecting process needs to know that name. An MQ requires a name, as does a SHM pool. Developing and maintaining a convention is, normally, both necessary and annoying. Instead:
There is a simple scheme that designates one side of a conversation as the session server, the other a client; the latter connects to the former. (A session server process can carry on multiple sessions with different client processes.) The user need only specify the basic facts of the IPC-participating applications (like executable names and owners, and which ones are servers, and which clients they'll accept as the opposing side). This is done via the straightforward struct
ipc::session::App + its children. The rest of ipc::session establishes internal naming conventions based off that information. Having specified those App
basics the user, opens (on each side) an equally capable ipc::session::Session that is the context for all subsequent IPC between those 2 processes.
Having obtained a Session
, the application can open transport channels (and, if desired, SHM arenas) without any naming decisions to make. In fact, in most situations, one can open the session and any desired channels at the same time (i.e., a certain number of channels are ready to use on each side, once the Session
is opened).
Describe the universe of communicating apps via simple
struct
s:using namespace ipc::session;// Describe the universe of communicating apps via simple `struct`s.// Example: 2 applications.// Basic properties for safety, authentication."/bin/app_a.exec", USER_ID_A, GROUP_ID };// Which app(s) will connect sessions (client) versus accept sessions (server)?const Client_app APP_B_AS_CLI{ APP_B };const Server_app APP_A_AS_SRV{ APP_A,{ APP_B.m_name }, // We are A; we allow only B to open sessions with us."", // Keep run-time files in /var/run by default.// Safety/auth preset. No need for `0644` and so forth.Flow-IPC module providing the broad lifecycle and shared-resource organization – via the session conc...Definition: app.cpp:27@ S_GROUP_ACCESSAllows access by resource-owning user's containing group(s) (in POSIX/Unix identified by GID) and no ...A description of an application in this ipc::session inter-process communication universe.Definition: app.hpp:78std::string m_nameBrief application name, readable to humans and unique across all other applications' names; used both...Definition: app.hpp:144An App that is used as a client in at least one client-server IPC split.Definition: app.hpp:185An App that is used as a server in at least one client-server IPC split.Definition: app.hpp:206Compile-time-configure your sessions:
// Specify zero-copy and SHared Memory (SHM) abilities if any.namespace ssn = shm::classic;// Use [ipc::]session - For non-SHM-enabled communication.// [ipc::session::]shm::* - For SHM-enabled comms (zero-copy structured messaging; explicit SHM access if desired).// [ipc::session::]shm::classic - Classic one-segment variant with simple allocation strategy by Boost.// shm::arena_lend::jemalloc - Adaptive multi-segment variant with allocation by jemalloc.// Subsequent API remains ~the same. Only your convenience alias above changes.// Specify the low-level transports used underneath in your future IPC channels.// Example: Your future channels will use POSIX MQs when possible for high perf; Unix domain sockets when you// transmit an FD (network socket, file handle, etc.).constexpr auto MQ_TYPE = schema::MqType::POSIX;constexpr bool TRANSMIT_FDS = true;// Client side:using Session = ssn::Client_session<MQ_TYPE, TRANSMIT_FDS>;// Server side:using Session_server = ssn::Session_server<MQ_TYPE, TRANSMIT_FDS>;To be instantiated typically once in a given process, an object of this type asynchronously listens f...Definition: session_server.hpp:197A documentation-only concept defining the local side of an IPC conversation (session) with another en...Definition: session.hpp:216Open session(s):
// Client side:APP_A_AS_SRV, // The opposing application (acceptor)....);Error_code err_code;session.sync_connect(&err_code); // Synchronous, non-blocking.if (!err_code) { ...`session` is ready.... } else { ...opposing server not active/listening... }// Server side:{ { APP_B_AS_CLI.m_name, APP_B_AS_CLI } }); // These are potential opposing apps.Session session;session_srv.async_accept(&session,[...](const auto& err_code) { if (!err_code) { ...`session` is ~ready.... } });// ^-- asio-style API. Non-blocking/synchronous API also available (integrate directly with poll(), epoll_...(), etc.).// NOTE: Upon opening session, capabilities of `session` on either side are **exactly the same**.// Client/server status matters only when establishing the IPC conversation;// the conversation itself once established is arbitrariy and up to you fully.flow::Error_code Error_codeShort-hand for flow::Error_code which is very common.Definition: common.hpp:298Open channel(s) in a session:
- Easiest: Have them be pre-opened, so that they're ready from the session start:
// Client side: Example: Expect 1 client-requested channel; N server-requested channels.Session::Channels my_init_channels(1);Session::Channels their_init_channels;Error_code err_code;session.sync_connect(..., &my_init_channels, ..., &their_init_channels, &err_code); // Synchronous, non-blocking.if (!err_code){// `session` is ready, but we might not even care that much because:// my_init_channels[0] = The 1 pre-opened Session::Channel_obj we requested.// their_init_channels = vector<Session::Channel_obj> = The N pre-opened channels opposing side requested.// Sessions are mostly a way to establish channels after all, and we've got ours!}// Server side: Example: Expect <cmd line arg> server-requested channels; N client-requested channels.Session::Channels my_init_channels(lexical_cast<size_t>(argv[2]));Session::Channels their_init_channels;session_srv.async_accept(&session, ..., &my_init_channels, ..., &their_init_channels, ..., ...,[...](const auto& err_code){if (!err_code){// session = Opened session.// my_init_channels = The 3 pre-opened `Session::Channel_obj`s we requested.// their_init_channels = The N pre-opened channels opposing side requested. (N=1 in this example; see above.)}});
- Max flexibility: Open them at any time. Either side can be initiator or acceptor of channel opening at any time.
// session.open_channel() is non-blocking, synchronous.// Active opener (side 1):Session::Channel_obj channel;session.open_channel(&channel); // Non-blocking, synchronous.// Active opener advanced form (allows to pass 1 structured message along with the channel-open action):auto mdt = session.mdt_builder();mdt->setCoolParameterFromMyCapnpSchema(42.42);session.open_channel(&channel, std::move(mdt));// Passive opener (side 2):session.init_handlers(..., [...](Session::Channel_obj&& channel, auto&& mdt){// `channel` is ready. Also the structured message (if desired) is immediately available:const float val = mdt->getCoolParameterFromMyCapnpSchema();std::cout << "Along with the channel-open request, other guy sent us this number: [" << val << "].");});unspecified Channel_objEach successful open_channel() and on-passive-open handler firing shall yield a concrete transport::C...Definition: session.hpp:235
Once you've got your session and/or channels, it's off to the races... namely:
A channel is a bundling of the resources required for, essentially, a bidirectional pipe capable of transmitting binary messages (blobs) and, optionally, native handles. A particular ipc::transport::Channel (as specified at compile-time via its template parameters) may, for example, consist of an outgoing POSIX MQ (message queue) handle and a similar incoming-MQ handle; or conceptually similar SHM-backed MQs (boost.ipc MQs). Alternatively it can consist of a Unix domain socket endpoint (which is bidirectional). (Other combinations are generically possible.)
If one uses the ipc::session API, one need not worry about any detail of opening these various handles and feeding them to the particular Channel
. This is shown in the Sessions synopsis above. The type of channel openable via a given session is compile-time-configured when declaring your convenience Session
alias. The Sessions synopsis above makes one example selection. You can make other choices as follows. Note that you just make these selections – the details are handled underneath. E.g., you need no knowledge of POSIX MQ API (from man mq_overview
) to take advantage of its performance characteristics.
There are two knobs to twiddle. If unsure about the first knob,
POSIX
is fine.constexpr auto MQ_TYPE = schema::MqType::POSIX; // Use POSIX MQs (as from `man mq_overview`) when possible.= schema::MqType::BIPC; // Use `boost.interprocess:message_queue`s when possible.= schema::MqType::NONE; // Do not use MQs; always use Unix domain sockets.constexpr bool TRANSMIT_FDS = true; // You need ability to send native handles (FDs). Use Unix domain sockets when needed.= false; // You don't need this ability. So Unix domain socket used only if MQ_TYPE=NONE.// Client:using Session = ssn::Client_session<MQ_TYPE, TRANSMIT_FDS>;// Server:using Session_server = ssn::Session_server<MQ_TYPE, TRANSMIT_FDS>;using Session = Session_server::Session_obj;
Alternatively a simple application or prototype may want to manually set them up without ipc::session. Though, in our humble opinions, using ipc::session is what's simple, as manual setup of a channel involves major annoyances including naming and transport-specific considerations (Unix domain sockets versus POSIX MQs versus... etc.). Perhaps the better term would have been "conventional" rather than "simple." We will briefly discuss how to manually open various channels without ipc::session a few paragraphs below.
transport::Channel
, whether via ipc::session or manually, in many (probably most) cases you will not need or want to understand the information in the rest of this section and instead skip straight to Transport (structured), below. That is, typically you'll want to transmit structured data – various fields, structures, lists thereof, etc. (+ possibly FDs) – as opposed to mere binary blobs (+ possibly FDs). To do so, one takes a freshly-opened Channel
and upgrades it to a structured channel (struc::Channel
). A good portion of the power of Flow-IPC is how it integrates with/leverages Cap'n Proto via ipc::transport::struc::Channel. However, for simpler or specialized use cases it is completely reasonable to want to operate at the lower, unstructured layer that is ipc::transport::Channel, or even the components it comprises (MQs, stream sockets at least). In that case, and/or to see how to manually open these, read on in this section.All transport APIs at this layer, as well the structured layer (see below), have an API that is as-synchronous-as-possible. In particular all send operations are non-blocking and synchronous and never return "would-block" – while guaranteeing good performance in practice. Receive APIs are asynchronous by their nature. A proactor-style API (a-la boost.asio) is provided for this and other async ops (such as connect and accept of various types).
sync_io
pattern, contrasting with the above async-I/O pattern. The sync_io
-pattern alternative for each given I/O object is similar in spirit to reactor-style (non-blocking) APIs a-la OpenSSL's. It may help you integrate with a reactor-style event loop such as one built on epoll()
or poll()
. It is also the way to strictly control the background threads in your application arising for Flow-IPC's operation: if desired you can have it operate entirely within your own threads and event loop(s). See Asynchronicity and Integrating with Your Event Loop for more discussion of this somewhat-hairy topic.sync_io
choice is available for all other parts of Flow-IPC, including: ipc::transport structured layer (struc::sync_io::Channel
versus just struc::Channel
for async-I/O alternative) and ipc::session.Lowest layer: MQs accessible via unified ipc::transport::Persistent_mq_handle concept. (This API are mutually uniform, more pleasant, and at times more powerful than the underlying APIs.) boost.asio is leveraged for Unix domain stream sockets. We use these internally, but we expose them publicly too, should you choose to work at that low level.
- Posix_mq_handle for POSIX message queues.
- Bipc_mq_handle for boost.interprocess MQs – internally SHM-based.
- We leverage boost::asio::local::stream_protocol::socket, aliased from ipc::transport::asio_local_stream_socket::Peer_socket. Namespace asio_local_stream_socket has advanced utilities (as free functions and type aliases).
Next layer: Sender/receiver concepts and their implementations. The key concepts:
- ipc::transport::Blob_sender:
- Method send_blob() to synchronously, never-would-blockingly send a binary buffer with message boundaries respected.
- Methods
*end_sending()
to send an end-of-stream graceful-close marker.- Method
auto_ping()
to enable periodic auto-pinging of opposing receiver with internal messages that are ignored except that they reset any idle timer as enabled via opposingBlob_receiver::idle_timer_run()
.- ipc::transport::Blob_receiver:
- Method async_receive_blob() to receive, with message boundaries respected, one message the opposing side sent via
send_blob()
, into a large-enough buffer supplied by caller. Receipt, or error, is indicated asynchronously via completion handler function supplied by user.- Method
idle_timer_run()
to enable an idle timer that would emit a connection-ending error, if no message (whether from a user blob or anauto_ping()
) arrives within a specific time period from the last such message or connection start.- ipc::transport::Native_handle_sender: Just like
Blob_sender
, exceptsend_blob()
is replaced by:
- Method send_native_handle() which sends: a message (blob); or a socket handle (FD); or both. User provides socket handle (if any) as essentially an
int
.- ipc::transport::Native_handle_receiver: What
Blob_receiver
is toBlob_sender
, this guy isNative_handle_sender
. Methodasync_receive_native_handle()
can receive a message, an FD, or both.- For each
ipc::transport::X
concept above,ipc::transport::sync_io::X
is thesync_io
-pattern counterpart. E.g.: ipc::transport::sync_io::Blob_receiver allows for integration of blob-receiving functionality above into anepoll
-based event loop, with only non-blocking calls and no internal background threads involved.These classes or class templates implement the above concepts:
- Unix domain (stream) sockets have exclusive FD-transmission ability; are great for blobs too.
- ipc::transport::Native_socket_stream implements all 4 concepts:
Blob_sender
,Blob_receiver
,Native_handle_sender
,Native_handle_receiver
.- MQs efficiently transmit blobs of limited size. One kernel-persistent MQ object, maintained by OS, is required for a unidirectional pipe. For bidirectional pipe, use 2 MQs arranged in mutually-facing fashion. For each one, user a sender object on one end and receiver on another.
- ipc::transport::Blob_stream_mq_sender implements Blob_sender.
- ipc::transport::Blob_stream_mq_receiver implements Blob_receiver.
The concept APIs above act the same regardless of concrete class and apply to each object once in opened (PEER) state. Each class also has non-concept aspects which depend on its specific characteristics. Most of these concern opening a connection (getting to PEER state).
A survey:
using namespace ipc::transport;namespace util = ipc::util;namespace asio_local = ipc::transport::asio_local_stream_socket::local_ns;// With MQs, you must open an MQ handle first, which is necessary can create the kernel-persistent OS MQ object.// ipc::session would figure all this out by itself; when working manually you have to do so yourself.Posix_mq_handle out_q(nullptr, util::Shared_name::ct("sharedMqNameA2B"), // Shared name known by other sideutil::OPEN_OR_CREATE, // Atomically create if needed, open if exists in system already.10, 1000, // Max # of unread messages, max size of message,0600, &err_code); // Permissions, if creation is needed; Error_code* in case of error.// Then to make yourself a Blob_sender/receiver, feed them into constructor.// With socket streams, you can either use two pre-opened FDs; or use client/server setup.// Pre-opened FDs:asio_local::Peer_socket a_raw_sock, b_raw_sock;asio_local::connect_pair(a_raw_sock, b_raw_sock); // Can also use native ::socketpair().// (b_raw_sock could be spread to another process via fork() or another Native_socket_stream and other techniques.)// Client/server: client.const auto SRV_ADDR = util::Shared_name::ct("sharedSockNameSrv");// ^-- Client and server need to agree on this, and it must not conflict. ipc::session would take care of this.Error_code err_code;sock.sync_connect(SRV_ADDR, &err_code); // Synchronous, non-blocking.// Client/server: server. A dedicated server-acceptor class exists for this.Native_socket_stream_acceptor srv(nullptr, SRV_ADDR, &err_code); // Error_code* to catch, e.g., name conflicts.srv.async_accept(&sock, [...](const auto& err_code) { ... });// Native_socket_stream adds some safety/informational features. ipc::session uses this internally.const util::Process_credentials creds = sock.remote_peer_process_credentials();std::cout << creds // Prints .process_id(), .user_id(), .group_id()<< ' ' << creds.process_invoked_as() << '\n'; // Prints the command-line argv[0] through which remote process was invoked.// Can bundle Blob_senders, Blob_receivers, etc., into `ipc::transport::Channel`s.// Convenience aliases and data-free subclasses exist to make it easier versus using an actual Channel<...>.// In this case we create a Channel with 2 bidirectional pipes, from 2 `Posix_mq_handle`s like out_q above; and// an already-opened Native_socket_stream.Mqs_socket_stream_channel<false, Posix_mq_handle> // false => async-I/O API; true => sync_io-pattern API.chan(nullptr, "two_pipe_chan_nickname", std::move(out_q), std::move(in_q), std::move(sock));// Use the blobs pipe to send:chan.send_blob(util::Blob_const(some_byte_vector.data(), some_byte_vector.size()));// Use the blobs-and-FDs pipe to receive:chan.async_receive_native_handle(&fd, util::Blob_mutable(some_target_vector.data(), some_target_vector.capacity()),[&](const auto& err_code, size_t n_rcvd){if (!err_code){some_target_vector.resize(n_rcvd);// some_target_vector.begin()..end() = received blob.// fd.m_native_handle = `int` file descriptor/raw handle.});// In this case we create a Channel with 1 bidirectional pipe, a Unix domain socket stream, for both FDs and blobs.Socket_stream_channel<false> chan(..., std::move(sock));// Can access the Native_handle_sender through the accessor:chan.hndl_snd()->send_native_handle(util::Native_handle(a_raw_int_fd), util::Blob_const(...));// Or through the Channel forwarding API:chan.send_native_handle({}, util::Blob_const(...)); // Send a blob but no FD this time.// Or create some `Channel`s with a blobs-only pipe each:Socket_stream_channel_of_blobs<false> chan(..., std::move(sock)); // <-- A Unix domain socket pipe only.chan.send_native_handle(...); // Compile error: Socket_stream_channel_of_blobs cannot transmit FDs.chan.send_blob(...); // No problem.Mqs_channel<false, Bipc_mq_handle> chan(..., std::move(out_q), std::move(in_q)); // <-- A boost.interprocess-MQ pipe only.chan.send_native_handle(...); // Compile error: MQs can certainly not transmit FDs.chan.send_blob(...); // No problem.Implements the Persistent_mq_handle concept by thinly wrapping bipc::message_queue,...Definition: bipc_mq_handle.hpp:52Implements Blob_receiver concept by using an adopted Persistent_mq_handle MQ handle to an MQ (message...Definition: blob_stream_mq_rcv.hpp:70Implements Blob_sender concept by using an adopted Persistent_mq_handle MQ handle to an MQ (message q...Definition: blob_stream_mq_snd.hpp:165A Channel with at least a blobs pipe consisting of two MQs of type Persistent_mq_handle (template arg...Definition: channel.hpp:1180A Channel with a blobs pipe consisting of 2 MQs of type Persistent_mq_handle (template arg); and a ha...Definition: channel.hpp:1278A server object that binds to a Shared_name and listens for incoming Native_socket_stream connect att...Definition: native_socket_stream_acceptor.hpp:105Implements both Native_handle_sender and Native_handle_receiver concepts by using a stream-oriented U...Definition: native_socket_stream.hpp:271Implements the Persistent_mq_handle concept by wrapping the POSIX message queue API (see man mq_overv...Definition: posix_mq_handle.hpp:63A Channel with a blobs pipe only (no handles pipe) that uses a Unix domain socket connection as the u...Definition: channel.hpp:1097A Channel with a handles pipe only (no blobs pipe) that uses a Unix domain socket connection as the u...Definition: channel.hpp:1029A process's credentials (PID, UID, GID as of this writing).Definition: process_credentials.hpp:39std::string process_invoked_as(Error_code *err_code=0) constObtains, from the OS, information as to the binary name via which process process_id() was started,...Definition: process_credentials.cpp:62Short-hand for boost.asio Unix domain socket namespace.Protocol::socket Peer_socketShort-hand for boost.asio Unix domain peer stream-socket (usually-connected-or-empty guy).Definition: asio_local_stream_socket_fwd.hpp:114Flow-IPC module providing transmission of structured messages and/or low-level blobs (and more) betwe...Definition: asio_local_stream_socket.cpp:32Flow-IPC module containing miscellaneous general-use facilities that ubiquitously used by ~all Flow-I...Definition: default_init_allocator.hpp:24const Open_or_create OPEN_OR_CREATETag value indicating an open-if-exists-else-create operation.Definition: util.cpp:30const Open_only OPEN_ONLYTag value indicating an atomic open-if-exists-else-fail operation.Definition: util.cpp:31boost::asio::mutable_buffer Blob_mutableShort-hand for an mutable blob somewhere in memory, stored as exactly a void* and a size_t.Definition: util_fwd.hpp:140boost::asio::const_buffer Blob_constShort-hand for an immutable blob somewhere in memory, stored as exactly a void const * and a size_t.Definition: util_fwd.hpp:134A monolayer-thin wrapper around a native handle, a/k/a descriptor a/k/a FD.Definition: native_handle.hpp:63
All those subleties about different types of pipes in a Channel
bundle completely disappear when one deals with structured-data struc::Channel
s. They are a higher-layer abstraction and will leverage whatever transport::Channel
it adapts. In addition to handling capnp-encoded structured data and SHM-backed zero-copy, it also provides basics like request/response, request-method multiplexing, and a bit more. So let's get into that.
While a Channel
transports blobs and/or native handles, it is likely the Flow-IPC user will want to be able to transmit schema-based structured data, gaining the benefits of that approach including arbitrary data-structuree complexity and forward/backward-compatibility. capnp (Cap'n Proto) is the best-in-class third-party framework for schema-based structured data; ipc::transport's structured layer works by integrating with capnp.
To deal with structured data instead of mere blobs (though a schema-based structure can, of course, itself store blobs such as images), one simply constructs an ipc::transport::struc::Channel, feeding it an std::move()
d already-opened Channel. This is called upgrading an unstructured Channel
to a struc::Channel
. A key template parameter to struc::Channel
is a capnp-generated root schema class of the user's choice. This declares, at compile-time, what data structures (messages) one can transmit via that struc::Channel
(and an identically-typed counterpart struc::Channel
in the opposing process).
struc::Channel provides fundamental niceties for a structured-channel protocol:
union
in the specified root schema), I ask struc::Channel
to please invoke a particular handler F(M)
(typically given as a lambda).struc::Channel
please invoke a particular handler F(M)
.struc::Msg_out is a container-like data structure representing an out-message M'. struc::Msg_in is a receiver's read-only view into such a message M upon its receipt.
Like any Cap'n Proto user, to define your messages you will write a schema file which capnp will compile into C++ code with getters and setters. A refresher (or intro):
# A schema file defines your potential structures.# ...$Cxx.namespace("my_meta_app::capnp"); # Resulting generated classes will be in this C++ `namespace`.struct CoolMsg{# You can use absolutely any schema you would as a general capnp user... except: We add *exactly one* requirement:# Your root `struct` must contain an anonymous union (we will use this to mux/demux in-messages to C++ callbacks).description @0 :Text;# You can, if desired, have arbitrary fields (like this one) outside the union, to be present in any message.union{# The union contains your possible messages (each of which can be of any type but usually a `struct`).sumRequest @1 :SumRequest;sumResponse @2 :SumResponse;}}struct SumRequest{# Asks that receiver add up termsToAdd, multiply sum by constantFactor, and return result in# SumResponse.termsToAdd @0 :List(Int64);constantFactor @1 :Int64;}struct SumResponse{result @0 :Int64;}You have already created an opened unstructured channel (
transport::Channel raw_chan
), most likely from a session. An example of transmitting structured data follows. The basic procedure:
- Upgrade it to a structured channel (
transport::struc::Channel
).- Create a message. (
struc::Channel::create_msg()
is typically easiest. You can also construct explicitly for advanced needs.)- Use ipc::transport::struc::Channel APIs to issue and expect messages of various types. Request/response and other paradigms are supported.
Example: On one side, send a request.
using namespace ipc::transport::struc;namespace ssn = ipc::session; // In this example, use non-zero-copy (internally, heap-based) transmission.using Session = ssn::...;// ...session+channel opening...// raw_chan = opened channel from session.// Upgrade it to a struc::Channel. Specify your capnp-generated schema as template parameter.Session::Structured_channel<my_meta_app::capnp::CoolMsg> // <-- Attention.chan(nullptr, std::move(raw_chan), // Eat it! `raw_chan` becomes empty.Channel_base::S_SERIALIZE_VIA_HEAP, session.session_token());chan.start(...); // Error handler (omitted).// Create a message -- no need to worry about capnp MessageBuilder work! In our case it allocates in heap.auto msg = cool_channel.create_msg();const auto body_root = msg.body_root();// From here... back to vanilla capnp land! To wit:auto root = body_root->initSumRequest();auto list = root.initTermsToAdd(3);list.set(0, 11);list.set(1, -22);list.set(2, 33);root.setConstantFactor(-2);body_root->setDescription("(11 - 22 + 33) x -2 = -44");// .sync_request(): send request; synchronously await response.auto rsp = cool_channel.sync_request(msg);// The other side had best know how to add stuff and multiply at the end! Check the in-message.assert(rsp->body_root().getSumResponse().getResult() == int64_t(-44));Owning and wrapping a pre-connected transport::Channel peer (an endpoint of an established channel ov...Definition: channel.hpp:589Sub-module of Flow-IPC module ipc::transport providing transmission of structured messages specifical...Definition: channel.cpp:31On the other side: Expect request. Send response.
// Same deal as on the other side.chan(nullptr, std::move(raw_chan), Channel_base::S_SERIALIZE_VIA_HEAP, session.session_token());chan.start(...);chan.expect_msgs(my_meta_app::capnp::CoolMsg::Which::SUM_REQUEST, // Demux to handler on receipt of SumRequest union-choice.[...](auto&& req) // Type is shared_ptr<M>, where M is: struc::Channel<...>::Msg_in, a/k/a struc::Msg_in<...>.{// ...event loop brief detail omitted...const auto body_root = req->body_root();// Vanilla capnp work:auto root = body_root.getSumRequest();auto list = root.getTermsToAdd();int64_t sum = 0;const auto sum_func = [&](int64_t val) { sum += val; };std::for_each(list.begin(), list.end(), sum_func);sum *= root.getConstantFactor();std::cout << "Got request [" << *req << "] including description ""[" << body_root.getDescription() << "]; responding with result [" << sum << "].");// Respond.auto msg = chan.create_msg();msg.body_root()->initSumResponse().setResult(sum);msg.body_root()->setDescription("the sum you ordered, sir");msg.send(msg, req.get()); // <-- originating_msg_or_null. We are responding.// ...event loop brief detail omitted...};
Beyond that, any kind of code-snippet synopsis would be too wordy for this page; see Structured Message Transport for in-depth coverage. We'll summarize the capabilities here:
struc::sync_io::Channel
) takes an "originating message" pointer: simply supply nullptr
for unsolicited messages or a Msg_in
for responses..send()
, .async_request()
, .sync_request()
..async_request()
(response arrives asynchronously) or .sync_request()
(as in the above example)..async_request()
can be one-off or indefinite-lifetime, depending on a certain argument. In the latter case 0+ responses can arrive, and you can explicitly unregister the expectation for more responses using .undo_expect_responses()
..sync_request()
is by definition one-off: It waits for 1 response exactly..expect_msg()
or .expect_msgs()
; you'll need to supply the union-selector enumeration value (SUM_REQUEST
in the above example) to demux to that message type's specific handler..expect_msgs()
(as in the above example) if you expect an arbitrary number of such unsolicited messages. Use .undo_expect_msgs()
if desired later..expect_msg()
if you expect exactly one such unsolicited message..expect_msg*()
call) is not dropped. It is cached. If a matching .expect_msg*()
is subsequently called, its handler is immediately fed with any applicable cached message(s)..set_unexpected_response_handler()
and .set_remote_unexpected_response_handler()
can detect such events, but you can also design your code/protocol to not need them. In general response-without-request is dropped: it is not a channel-hosing condition..send()
is for notifications. For example the SumResponse
sent in the example is a notification..*sync_request()
is for requests.For Cap'n Proto enthusiasts, ipc::transport::struc::Msg_out provides a couple ways of structuring your message-filling in addition to the vanilla way shown in the example above.
Msg_out::orphanage()
lets one operate a capnp orphanage. You can build things there in bottom-up fashion and then ...adopt....()
them into the root message.Msg_out
explicitly, by giving it an existing capnp::MessageBuilder
. This allows for completely arbitrary Cap'n Proto mutation work, without having to use any orphanage.For advanced needs, you can replace some of the default machinery in .create_msg()
. For example you can obtain backing memory for out- and in-messages from a mem-pool, or (if performing non-concurrent work) reuse a single block of memory. (See Struct_builder and Struct_reader concepts.)
In the example just above, the request out-message is copied into the IPC transport (e.g., Unix domain socket) internally on one side; and copied again out of the transport on the receiver side. The in-message's backing memory is (internally) allocated in heap; as is the original out-message's. It would be faster if no copying occurred. This is easily done in Flow-IPC.
Say I prepare a message M': I construct it (struc::Msg_out
), I mutate it via its capnp-generated mutators (as the required backing memory is gradually allocated behind the scenes), and then I struc::Channel::send() it. The receiver receives it as struc::Msg_in
M subsequently and accesses its various parts via capnp-generated accessors. End-to-end zero-copy means that the backing memory allocated for M' is never copied into any low-level transport. Instead only some tiny, constant-sized handle is (internally) copied into and out of the transport – and the accessors of M are directly reading the memory in M'.
The obvious benefit of this is performance. Notably, also, it is possible to subsequently modify M' and send it (and receive it) again. The backing memory is freed once M' and all in-message views like M have been destroyed across all processes. (The Ms are made available exclusively via shared_ptr
.)
While zero-copy is optional, in general we recommend using it. Fortunately it is quite easy to enable. Internally it is accomplished by using SHM; but in terms of the public API, ~all you have to do is – when writing the code for opening your ipc::session::Session – simply choose a SHM-backed session type. That is:
"ipc::session::shm::...::Client_session"
and "ipc::session::shm::...::Session_server"
.Whichever you choose, their APIs are identical. In particular, suppose you chose Session
type S
; then S::Structured_channel<Your_capnp_schema>
is the alias for the proper SHM-backed-or-not struc::Channel
type.
At no point do you have to worry about naming a SHM pool, removing it from the file-system on cleanup (including following a crash), and so on. All is handled internally. Simply choose a SHM-backed Session
type on each side at compile-time.
Consider the structured-transmission example just above. It lacks zero-copy performance end-to-end. Here is all you have to do to make it zero-copy end-to-end. Notice the tiny change and zero mentions of any SHM details.
// ...everything same...using Session = ssn::...;// --- CHANGE TO --vusing Session = ssn::shm::classic::...;// ...everything same...chan(nullptr, std::move(raw_chan),Channel_base::S_SERIALIZE_VIA_HEAP, session.session_token()); // <-- CHANGED LINE.// --- CHANGE TO --vchan(nullptr, std::move(raw_chan),Channel_base::S_SERIALIZE_VIA_SESSION_SHM, &session); // <-- CHANGED LINE.// ...everything same...Variation: switch SHM-providers:
// As written above, when allocation is necessary internally, SHM-classic provider will use a classic one-segment SHM setup,// with a simple allocation algorithm provided by boost.interprocess.using Session = ssn::shm::classic::...;// --- CHANGE TO --vusing Session = ssn::shm::arena_lend::jemalloc::...;// Now SHM-jemalloc provider is used instead; it will internally use multiple SHM segments as needed, driven by// commercial-grade `jemalloc` allocation algorithm. Otherwise example code remains unchanged.Variation: change max lifetime of messages created via
struc::Channel::create_msg()
:// As written above, the maximum lifetime of a `Msg_out` = as long as the session remains open.// This is often clean and sufficient.Session::Structured_channel<...> chan(...,Channel_base::S_SERIALIZE_VIA_SESSION_SHM, ...); // <-- CHANGED LINE.// --- CHANGE TO --v// Now the the message can live *past* the session: until the Session_server is destroyed, meaning// your process accepting incoming sessions exits. For example, suppose your session-server accepts// (sessions) from many session-client processes of one application over its lifetime; session-client A sends// a PutCache message containing a large file's contents to session-server which memorizes it (to serve later);// then session-client A closes session and exits; the message continues to exist, and the memory-cached file// can be served to session-clients B, C, ....Session::Structured_channel<...> chan(...,Channel_base::S_SERIALIZE_VIA_APP_SHM, ...); // <-- CHANGED LINE.
In our example in particular, there is simply no difference between using heap-backed allocation and SHM-backed allocation. Just change a couple lines, and you get better performance. The messages being exchanged can be utterly huge, with zero perf penalty.
That said, for more complex use-cases, there are natural differences in what one can do when comparing heap-backed versus SHM-backed allocation. For example, the out-message can be modified after sending it, and this change will be reflected for any process(es) holding the resulting in-message(s). (Of course you'll need to arrange some kind of synchronization of access in that case.)
In any case – whether heap-backed or SHM-backed – an out-message can be sent or re-sent over any channel and even in multiple sessions, depending on the details of its backing memory allocation.
This topic is optional: One can reap zero-copy performance by exclusively keeping IPC-communicated data structures in your struc::Channel
-transmitted message schema of choice. Internally, we use our own SHM capabilities to accomplish this, invisibly to the API user. We reasonably claim that if you can keep your shared data structures in a capnp-schema-backed object, then it is best to do so. That said:
Some applications may prefer to have the two (or more) conversant processes cooperate by reading/writing directly to a non-capnp-backed data structure but rather a straight C++ struct
. Flow-IPC provides this ability out-of-the-box. This means the user need not figure out a number of very hairy topics such as how to develop and use a SHM-friendly allocator. We summarize this here.
If and only if you used a SHM-enabled ipc::session::Session implementation (as noted in the preceding section – for example ipc::session::shm::classic::Client_session), then upon opening such a session, you immediately have access to 2 SHM arenas.
Session_server
is destroyed (and thus no further session are possible to open).Each of these returns an Arena
.
Arena::construct<T>(...) returns shared_ptr<T>
which points to a newly constructed-in-SHM-arena object of type T
. The sender prepares object *x
of type T
, then calls Session::lend_object(x) which returns a tiny payload to transmit over IPC (typically, but not necessarily, via a struc::Channel
as part of a capnp-encoded message). The receiver then invokes auto x = Arena::borrow_object(), passing in that aforementioned tiny payload. This recovers *x
of type T
in the receiver where it can be read like any C/C++ structure – because it is one. The backing RAM is auto-returned to the arena once all these handles (in this example x
on each side) (and other derived shared_ptr
s in their respective shared-groups – e.g., obtained via auto x2 = x
) are destroyed. In a sense x
is part of a cross-process shared_ptr
group.
The real power comes from the possibilities for what type T
can be. T
can be any combination (recursively speaking) of: the basic scalars (int
, float
, etc.); struct
s; and STL-compliant containers. So it can be, say, a vector
of unordered_map
s mapping from basic_string
to struct S { ... }
, where ...
itself stores anything equally arbitrarily complex. The STL-compliant container types must merely specify the allocator ipc::shm::stl::Stateless_allocator<Arena>
. We've provided the allocator implementation, so you need not worry about that stuff – just use it.
You can also directly store pointers in T
, as long as you use Stateless_allocator<Arena>::Pointer<P>
(not raw P*
); though we'd informally recommend against it for the maintainability of your own algorithms. That is what the allocator concept in STL is for after all. (This advice is independent of Flow-IPC; just a general opinion.)
Further capabilities are outside our scope here; but the main point is: At a minimum:
A SHM-compatible structure being declared and an instance constructed and modified:
// Define an allocator alias for convenience.// (For plain-old-datatypes (e.g., an `int` or `bool` or `struct` thereof or combinations thereof)// there is no need to use this.)template<typename T>using Shm_allocator = Session::Allocator<T>;// Example complex structure.struct Widget{struct Node{int m_int;// Normally would be just vector<float> m_float_vec`.boost::container::vector<float, Shm_allocator<float>> m_float_vec;};// Normally would be just `string`.using String = boost::container::basic_string<char, std::char_traits<char>, Shm_allocator<char>>;bool m_flag;String m_str;// Normally would be just unordered_map<string, Node> m_str_to_node_map.boost::unordered_map<String, Node, std::hash<String>, std::equal_to<String>,Shm_allocator<std::pair<const String, Node>>m_str_to_node_map;};// Construct it directly in SHM. (Default ctor used here; you can use any ctor.)auto x = session.session_shm()->construct<Widget>();// Modifications not involving more allocation are done as-normal.x->m_flag = true;x->m_str[0] = 'X'; // Assumes, of course, that by this point: x->m_str.size() > 0.// If allocation is required, apply a thread-local Activator. Can frame all modification code in such a block.using Activator = ipc::shm::stl::Arena_activator<Session::Arena>;auto y = session.app_shm()->construct<Widget>();{Activator ctx(session.session_shm());y->m_flag = true; // `ctx` is active but has zero effect on this (harmless and zero perf impact).y->m_str.resize(128); // `ctx` is active and will cause proper arena to be used for allocation.y->m_str[0] = 'X'; // `ctx` again has no effect and carries no perf penalty for this.}RAII-style class operating a stack-like notion of a the given thread's currently active SHM-aware Are...Definition: arena_activator.hpp:41Example of trasmitting a SHM-backed native data structure to another process follows. We transmit a handle through a capnp structured message here, but it can be done using any IPC mechanism whatsoever; even (e.g.) a file.
Schema which includes a native-object-in-SHM field:
# ...using Common = import "/ipc/transport/struc/shm/schema/common.capnp";using ShmHandle = Common.ShmHandle; # It's just a blob holder really.# ...struct SomeMsg{# ...widgetHandle @7 :ShmHandle;}Owner/lender process transmitting such a message:
auto x = session.session_shm()->construct<Widget>();// ...Fill out *x as above....auto msg = cool_channel.create_msg();// Ready the storage inside the out-message.auto widget_handle_root = msg.body_root()->initSomeMsg().initWidgetHandle(); // Note `widgetHandle: ShmHandle` in schema.// Perform the lend_object() step and load the result (a little blob) into the out-message.shm::capnp_set_lent_shm_handle(&widget_handle_root, session.lend_object(x));// IPC-transmit it via struc::Channel.cool_channel.send(msg);void capnp_set_lent_shm_handle(schema::ShmHandle::Builder *shm_handle_root, const flow::util::Blob_sans_log_context &lend_result)Utility that saves the result of a Shm_session1::lend_object<T>(const shared_ptr<T>&) result into the...Definition: util.cpp:28Borrower process receiving such a message and accessing the native data structure
Widget
:flow::util::Blob_sans_log_context lend_blob;shm::capnp_get_shm_handle_to_borrow(msg->body_root().getSomeMsg().getWidgetHandle(), &lend_blob);auto x = session.borrow_object<Widget>(lend_blob);FLOW_LOG_INFO("Hey, let's read inside SHM after receiving SHM-handle: [" << x_borrowed->m_flag << "].");// We can write to it too, in most cases.x->m_str[0] = 'Y';// (Can modify with allocations too, as long as you use an Activator context again, same is owner process above.)void capnp_get_shm_handle_to_borrow(const schema::ShmHandle::Reader &shm_handle_root, flow::util::Blob_sans_log_context *arg_to_borrow)Utility that's the reverse of capnp_set_lent_shm_handle() to be invoked on the deserializing side.Definition: util.cpp:50
When one gets into this relatively advanced area, there are differences and trade-offs between the available SHM-providers. If your alias at the very top is using ssn = ipc::session::shm::classic
, then you get total read/write abilities and maximized object lifetime. That is the SHM-classic provider. Instead, if you choose SHM-jemalloc (using ssn = ipc::session::shm::arena_lend::jemalloc
), then the borrower (receiver) process can only read, and there are some subtle differences in max object lifetime. (The latter applies only if you need objects to live beyond a given Session
.) However, you get in return: superior safety characteristics; and the commercial-grade jemalloc allocation algorithm (thread-caching, fragmentation avoidance, and more).
Flow-IPC focuses on IPC safety. Safety and Permissions page gets into the topic in detail. Here we'll point out merely one specific mechanism which might be of immediate interest. It shows that – while at various lower layers permissions can be specified in fine-grained fashion – at the top layer, ipc::session, we let you decide one of 3 simple safety approaches matching your setup, and Flow-IPC will take care of the details accordingly. To wit:
Refer to the very top of Synopsis: Sessions above. When defining our simple Server_app
structure that describes the application responsible for accepting sessions (IPC conversations/contexts), we chose:
Notice the GROUP_ACCESS
choice made there. In fact we could have chosen any one of these:
S_UNRESTRICTED
:S_USER_ACCESS
:S_GROUP_ACCESS
:So: GROUP_ACCESS
means your various apps should run as different users but in the same group. USER_ACCESS
means the apps should run all as one user. UNRESTRICTED
means it does not matter.
Next let's get to the nitty-gritty of actually using this library: Prerequisites and Setup.