Flow-IPC 1.0.0
Flow-IPC project: Full implementation reference.
capnp_msg_builder.hpp
Go to the documentation of this file.
1/* Flow-IPC: Shared Memory
2 * Copyright 2023 Akamai Technologies, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the
5 * "License"); you may not use this file except in
6 * compliance with the License. You may obtain a copy
7 * of the License at
8 *
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in
12 * writing, software distributed under the License is
13 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
14 * CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing
16 * permissions and limitations under the License. */
17
18/// @file
19#pragma once
20
25#include "ipc/shm/shm.hpp"
26#include "ipc/transport/struc/shm/schema/detail/serialization.capnp.h"
27#include <boost/interprocess/containers/list.hpp>
28
30{
31
32// Types.
33
34/**
35 * A `capnp::MessageBuilder` used by shm::Builder: similar to a `MallocMessageBuilder`
36 * with the `GROW_HEURISTICALLY` alloc-strategy but allocating via a SHM provider (of template-arg-specific
37 * type) in SHM instead of the heap via `malloc()`.
38 *
39 * It can also be used as a #Capnp_msg_builder_interface (`capnp::MessageBuilder`) independently of the rest of
40 * ipc::transport::struc or even ::ipc, although that was not the impetus for its development.
41 *
42 * Its #Segments_in_shm type alias is `public`: shm::Reader must know/understand it in order to be able to
43 * interpret the SHM-stored data structure.
44 *
45 * Contrast this with Heap_fixed_builder_capnp_message_builder which allocates in regular heap.
46 * The `*this`-user-facing output API -- meaning the thing invoked by struc::Builder::emit_serialization() --
47 * is lend(). Cf. Heap_fixed_builder_capnp_message_builder::emit_segment_blobs().
48 * Why are they so different? Answer:
49 * - The latter is meant to emit M segments, each (some) bytes long, to all be transmitted directly over IPC.
50 * So it outputs them, to copy into the IPC transport!
51 * - We are meant to emit a *handle to a data structure storing those M segments* to be transmitted directly over
52 * IPC. The handle is transmitted; not the entire segments. So it outputs that handle! It just so happens
53 * to output it via a capnp mutator call. (It could instead emit a `flow::util::Blob` and let the caller
54 * transmit it however it wants. Why bother though? Just do it. However do see a related to-do in
55 * lend() doc header.)
56 *
57 * ### Move-ctible and move-assignable ###
58 * Please see similar section in Heap_fixed_builder_capnp_message_builder doc header; it applies
59 * very similarly to us. Spoiler alert: A move-from involves copying 200+ bytes; consider wrapping `*this`
60 * in a `unique_ptr` if moving `*this`.
61 *
62 * @tparam Shm_arena
63 * See shm::Builder doc header, same spot.
64 */
65template<typename Shm_arena>
68 public flow::log::Log_context
69{
70public:
71 // Types.
72
73 /// Short-hand for, you know.
74 using Arena = Shm_arena;
75
76 /// Short-hand for the SHM-aware allocator used in our central data structure holding the capnp serialization.
77 template<typename T>
79
80 /**
81 * For easier outside generic programming, this is the read-only-borrower counterpart to
82 * #Allocator. See also #Segments_in_shm_borrowed.
83 */
84 template<typename T>
87
88 /**
89 * The inner data structure stored in SHM representing one capnp-requested segment storing all or part of
90 * the serialization. `.capacity()` is how much was allocated which is at least what capnp-requested via
91 * allocateSegment() `virtual` API we implement. `.size()` is how many bytes of that were in fact ultimately
92 * used by capnp during the *last* serialization as capped by lend(). If `*this` is
93 * reused, then capnp may write past `.size()` (but not past `.capacity()`); lend()
94 * will then re-correct `.size()` to the true segment size used by capnp as reported by
95 * `this->getSegmentsForOutput()`.
96 *
97 * ### Choice of container type ###
98 * In the past this was, first, `std::vector<uint8_t>` (which needed `Default_init_allocator` to avoid
99 * 0-filling during `.resize()` -- see lend()); then `bipc::vector<uint8_t>` (which needed
100 * `.resize(n, default_init_t)` extension for the same reason ). Then, as intended originally, it became
101 * `flow::util::Basic_blob<>`. Why that over `vector<uint8_t>`? Answer: `Basic_blob`'s express purpose
102 * is to do just this; some of its main documented aspects (lack of zero-init, iron-clad known perf) are
103 * directly counted-upon by us. So we use it for similar reasons as using `flow::util::Blob` all over the
104 * code for such purposes -- maybe even more so.
105 *
106 * So really the only thing missing, before we could use it, was its SHM-friendly/custom-allocator support.
107 * `Blob` cannot do it. Once the latter was generalized to `Basic_blob<Allocator>` we could switch to it,
108 * leaving behind a number of rather annoying caveats of the various `vector<uint8_t>` impls
109 * (0-init especially on `.resize()`, slow destructor on large blobs, and more).
110 *
111 * For reasons stated in its doc header `Basic_blob` does not log in normal fashion (memorizing a `Logger*`
112 * via ctor) but only if supplied an optional `Logger*` in each particular call. (`Blob` is a sub-class
113 * that adds such functionality at the expense of a bit of RAM/perf, but this is impossible with a custom SHM
114 * allocator.) So that's why `get_logger()` is passed to the few APIs we call on our `Basic_blob`.
115 */
116 using Segment_in_shm = flow::util::Basic_blob<Allocator<uint8_t>>;
117
118 /**
119 * For easier outside generic programming, this is the read-only-borrower counterpart to
120 * #Segment_in_shm. See also #Segments_in_shm_borrowed.
121 */
122 using Segment_in_shm_borrowed = flow::util::Basic_blob<Borrower_allocator<uint8_t>>;
123
124 /**
125 * The outer data structured stored in SHM representing the entire list of capnp-requested segments #Segment_in_shm.
126 *
127 * ### Rationale (`bipc::` vs `std::`) ###
128 * Why `bipc::list` and not `std::list`? Answer:
129 * `std::list`, at least in gcc-8.3.0, gave a compile error fairly clearly implying `std::list` stores
130 * `Node*` instead of `Allocator<Node>::pointer`; in other words it is not compatible with SHM
131 * (which bipc docs did claim -- but that could easily have been outdated).
132 *
133 * Curiously `std::vector` did not have that problem and worked fine, as far as that went, but we prefer
134 * a linked-list here.
135 */
136 using Segments_in_shm = bipc::list<Segment_in_shm, Allocator<Segment_in_shm>>;
137
138 /**
139 * For easier outside generic programming, this is the read-only-borrower counterpart to
140 * #Segments_in_shm: identical but using #Borrower_allocator instead of #Allocator.
141 * This type shall be used with `borrow_object()` on the deserializing side when decoding
142 * the #Segments_in_shm written by a `*this`.
143 */
144 using Segments_in_shm_borrowed = bipc::list<Segment_in_shm_borrowed, Borrower_allocator<Segment_in_shm_borrowed>>;
145
146 // Constructors/destructor.
147
148 /**
149 * Constructs the message-builder, memorizing the SHM engine it shall use to construct/allocate data internally
150 * on-demand via allocateSegment() (capnp-invoked from capnp-generated mutator API as invoked by the user).
151 *
152 * @param logger_ptr
153 * Logger to use for logging subsequently.
154 * @param arena
155 * See shm::Builder ctor.
156 */
157 explicit Capnp_message_builder(flow::log::Logger* logger_ptr, Arena* arena);
158
159 /// Decrements owner-process count by 1; if current count is 1 deallocates SHM-stored data.
161
162 // Methods.
163
164 /**
165 * To be called after being done mutating underlying structured data, increments owner-process count
166 * by 1 via `shm_session->lend_object()`; and populates a capnp-`struct` field, saving the encoding of the
167 * outer SHM handle to the serialization-segment data structure #Segments_in_shm into that field.
168 *
169 * You may call this method more than once per `*this`. In particular this is necessary if sending the SHM-handle
170 * via IPC more than once -- even if one has already sent it to that same process (or another).
171 * Even if the bits populated into `*capnp_root` shall always be the same for a given `*this`, it is
172 * nevertheless required to call it repeatedly when sharing repeatedly.
173 *
174 * @todo Would be nice to provide a more-general counterpart to existing
175 * Capnp_message_builder::lend() (in addition to that one which outputs into a capnp structure),
176 * such as one that outputs a mere `Blob`. The existing one is suitable for the main use-case which is internally by
177 * shm::Builder; but Capnp_message_builder is also usable as a `capnp::MessageBuilder` directly. If a user were to
178 * indeed leverage it in that latter capacity, they may want to transmit/store the SHM-handle some other way.
179 * Note that as of this writing the direct-use-by-general-user-as-`MessageBuilder` use-case is supported "just
180 * because" it can be; nothing in particular needed it.
181 *
182 * @param capnp_root
183 * The target SHM-handle serialization root to populate as noted above.
184 * @param shm_session
185 * `Shm_session` to the opposing recipient to which we are lending.
186 */
189
190 /**
191 * Implements `MessageBuilder` API. Invoked by capnp, as the user mutates via `Builder`s. Do not invoke directly.
192 *
193 * Throws a `bad_alloc`-like exception if and only if the #Arena does so when allocating on behalf of the
194 * STL-compliant inner code of #Segments_in_shm.
195 *
196 * @note The strange capitalization (that goes against standard Flow-IPC style) is because we are implementing
197 * a capnp API.
198 *
199 * @param min_sz
200 * See `MessageBuilder` API.
201 * The allocated segment will allow for a serialization of at *least* `min_sz * sizeof(word)` bytes.
202 * The actual amount grows progressively similarly to the `MallocMessageBuilder` GROW_HEURISTICALLY
203 * strategy, starting at the same recommended first-segment size as `MallocMessageBuilder` as well.
204 * @return See `MessageBuilder` API.
205 * The ptr and size of the area for capnp to serialize-to.
206 */
207 kj::ArrayPtr<::capnp::word> allocateSegment(unsigned int min_sz) override;
208
209private:
210 // Types.
211
212 /// Short-hand for the SHM-arena activator coupled with #Allocator.
214
215 // Data.
216
217 /// See ctor.
219
220 /**
221 * Minimum size of the next segment allocated by allocateSegment. Roughly speaking the actual size will be
222 * the higher of `min_sz` or this. Its initial value (seg 1's) is a constant. Its subsequent value is
223 * the sum of sizes of the previous segments; meaning itself plus whatever allocateSegment() decided to allocate.
224 * This results in exponential growth... ish.
225 *
226 * This follows `MallocMessageBuilder` GROW_HEURISTICALLY logic, straight-up lifted from their source code.
227 */
229
230 /// Outer SHM handle to the data structured in SHM that stores the capnp-requested serialization segments.
231 typename Arena::template Handle<Segments_in_shm> m_serialization_segments;
232}; // class Capnp_message_builder
233
234// Free functions: in *_fwd.hpp.
235
236// Template implementations.
237
238template<typename Shm_arena>
240 (flow::log::Logger* logger_ptr, Arena* arena) :
241
242 flow::log::Log_context(logger_ptr, Log_component::S_TRANSPORT),
243 m_arena(arena),
244 // Borrow MallocMessageBuilder's heuristic:
245 m_segment_sz(::capnp::SUGGESTED_FIRST_SEGMENT_WORDS * sizeof(::capnp::word)),
246 // Construct the data structure holding the segments, saving a small shared_ptr handle into SHM.
247 m_serialization_segments(m_arena->template construct<Segments_in_shm>()) // Can throw.
248{
249 FLOW_LOG_TRACE("SHM builder [" << *this << "]: Created.");
250}
251
252template<typename Shm_arena>
254{
255 FLOW_LOG_TRACE("SHM builder [" << *this << "]: Destroyed. The following may SHM-dealloc the serialization, "
256 "if recipient was done with it before us, or if we hadn't done lend() yet.");
257 // m_serialization_segments Handle<> (shared_ptr<>) ref-count will decrement here (possibly to 0).
258}
259
260template<typename Shm_arena>
264{
265 using util::Blob_const;
266 using flow::util::buffers_dump_string;
267
268 assert(capnp_root);
269
270 /* Firstly read the paragraph about this method versus
271 * Heap_fixed_builder_capnp_message_builder::emit_segment_blobs() (in our class doc header).
272 * That sets up some mental context. Then come back here.
273 * Spiritually we're doing something similar here: they've got a list-of-Blobs; we've got the same;
274 * we need to adjust the latters' `.size()`s down from `capacity()` to actual space used in serialization.
275 * The differences are:
276 * - They're stored in SHM via Stateless_allocator; need to ensure thread-local active arena is m_arena.
277 * - To emit, we just emit the outer SHM handle to the whole list-o'-blobs (they emit the actual list, to be
278 * copied).
279 *
280 * Well... let's go then. */
281
282 {
283 /* As noted: activate the arena, in case the below .resize() causes allocation. (It shouldn't... we're
284 * resizing down. Better safe than sorry, plus it's more maintainable. (What if it becomes a deque<> later
285 * or something?)) */
286 Arena_activator arena_ctx(m_arena);
287
288 // All of the below is much like Heap_fixed_builder_capnp_message_builder::emit_segment_blobs() except as noted.
289
290 Segments_in_shm& blobs = *m_serialization_segments;
291 assert((!blobs.empty())
292 && "Should not be possible for serialization to be empty with our use cases. Investigate.");
293
294 const auto capnp_segs = getSegmentsForOutput();
295 assert((capnp_segs.size() == blobs.size())
296 && "Somehow our MessageBuilder created fewer or more segments than allocateSegment() was called?!");
297
298 size_t idx;
299 typename Segments_in_shm::iterator blob_it;
300 for (idx = 0, blob_it = blobs.begin(); idx != capnp_segs.size(); ++idx, ++blob_it)
301 {
302 const auto capnp_seg = capnp_segs[idx].asBytes();
303 const auto seg_sz = capnp_seg.size();
304
305 auto& blob = *blob_it;
306
307 assert((capnp_seg.begin() == &(blob.front()))
308 && "Somehow capnp-returned segments are out of order to allocateSegment() calls; or something....");
309 assert((seg_sz != 0)
310 && "capnp shouldn't be generating zero-sized segments.");
311 assert((seg_sz <= blob.capacity())
312 && "capnp somehow overflowed the area we gave it.");
313
314 /* This .resize() call is interesting (and was quite treacherous when Segment_in_shm was a vector<uint8_t>).
315 * A regular .resize(n) is uncontroversial when .size() exceeds or equals n.
316 * It just adjusts an internal m_size thing. Suppose `n <= capacity()` (always the case for us and ensured
317 * above). Suppose now though that `.size() < n`. It works fine in Blob: we wrote past .size() but not
318 * past .capacity(), and the .resize() "corrects" m_size accordingly. With vector<uint8_t>, without taking
319 * special measures (std::vector<Default_init_allocator<...>> or bipc::vector<>::resize(n, default_init))
320 * it would also catastrophically (for us) zero-fill the bytes between size() and n: If lend()
321 * is being called on a *this that has already been lend()ed -- the case in particular where an
322 * out-message is serialized, sent, modified (to require more space in an existing segment),
323 * serialized again, sent again. Then this .resize() would zero out the added new bytes in the serialization!
324 * Uncarefully-written user code might even .initX(n) (where x = List or Data, say) a field that
325 * was previously .initX(n)ed; capnp does not simply reuse the space but rather orphans the previous X
326 * and creates a new List/Data X in a later, new part in the same segment (if there's space left).
327 * Now the deserializing side will observe the X is all zeroes... WTF?!
328 *
329 * Anyway, I mention that for posterity/education and to point out the fact we might be writing past
330 * .size() temporarily, until the present method executes; and that's somewhat unusual (but legal).
331 * Segment_in_shm=Basic_blob does not have the zeroing problem. */
332 blob.resize(seg_sz,
333 flow::util::Blob::S_UNCHANGED, // Can be removed if next arg is removed.
334 get_logger()); // (TRACE-log if enabled.) Must be removed if Segment_in_shm becomes non-Blob.
335
336 FLOW_LOG_TRACE("SHM builder [" << *this << "]: "
337 "Serialization segment [" << idx << "] (0 based, of [" << capnp_segs.size() << "], 1-based): "
338 "SHM-arena buffer @[" << static_cast<const void*>(&(blob.front())) << "] "
339 "sized [" << seg_sz << "]: Serialization of segment complete.");
340 FLOW_LOG_DATA("Segment contents: "
341 "[\n" << buffers_dump_string(Blob_const(&(blob.front()), blob.size()), " ") << "].");
342 } // for (idx in [0, size()))
343 } // Arena_activator arena_ctx(m_arena);
344
345 /* And now just record the process-agnostic serialization of the handle to the whole thing. Nice and small!
346 * The rest is inside `blobs` which is wholly in SHM and needs no encoding. */
347
348 // Source blob (bits encoding handle):
349 const auto handle_serialization_blob = shm_session->template lend_object<Segments_in_shm>(m_serialization_segments);
350 // Target SHM handle (inside capnp struct). Avoid wasting internal serialization space if already init...()ed.
351 auto capnp_segment_list_in_shm = capnp_root->hasSegmentListInShm() ? capnp_root->getSegmentListInShm()
352 : capnp_root->initSegmentListInShm();
353 // Copy handle-encoding bits (only a few bytes, by Session contract) from source to target:
354 capnp_set_lent_shm_handle(&capnp_segment_list_in_shm, handle_serialization_blob);
355
356 /* Process-count in m_serialization_segments incremented ahead of transmission (this is logged), probably to 2
357 * (higher if lend() called more than 1x).
358 * Now underlying SHM-stored segments won't de dealloc-ed until the other side receives it and later indicates
359 * that process is done with them (if send succeeds) + *this is destroyed. */
360} // Capnp_message_builder::lend()
361
362template<typename Shm_arena>
363kj::ArrayPtr<::capnp::word>
365{
366 using Word = ::capnp::word;
367 using Capnp_word_buf = kj::ArrayPtr<Word>;
368 using flow::util::ceil_div;
369 using std::memset;
370 constexpr size_t WORD_SZ = sizeof(Word);
371
372 /* Background from capnp: They're saying the need the allocated space for serialization to store at least min_sz:
373 * probably they're going to store some object that needs at least this much space. So typically it's some
374 * scalar leaf thing, like 4 bytes or whatever; but it could be larger -- or even huge (e.g., a Data or List
375 * of huge size, because the user mutated it so via a ::Builder). Oh, and it has to be zeroed, as by calloc().
376 *
377 * So all we *have* to allocate is min_sz exactly in that sense. But the idea is to try to allocate more, so that
378 * capnp can efficiently shove more objects in there too without calling allocateSegment() for each one.
379 * And we're supposed to grow exponentially each time, so we keep track of the next size in m_segment_sz, same
380 * as capnp::MallocMessageBuilder internally does (check its source code). Of course, if min_sz exceeds that,
381 * then we have no choice but to allocate the larger amount min_sz. */
382
383 const size_t seg_sz
384 = std::max(size_t(min_sz), // Don't forget: in their API min_sz is in `word`s.
385 /* Seems prudent to give capnp an area that is a multiple of `word`s. Maybe required. Probably even.
386 * Exceeding it a little is okay. */
387 size_t(ceil_div(m_segment_sz, WORD_SZ)))
388 * WORD_SZ;
389
390 FLOW_LOG_TRACE("SHM builder [" << *this << "]: allocateSegment request for >=[" << min_sz << "] words; "
391 "SHM-allocing ~max(that x sizeof(word), next-size=[" << m_segment_sz << "]) = [" << seg_sz << "] "
392 "bytes.");
393
394 uint8_t* buf_ptr;
395 {
396 Arena_activator arena_ctx(m_arena);
397
398 // Go to it! This can throw (which as noted elsewhere is treated as a catastrophe a-la `new` bad_alloc for now).
399 buf_ptr = &(m_serialization_segments->emplace_back
400 (seg_sz,
401 // (TRACE-log in this ctor if enabled.) Must be removed if Segment_in_shm becomes non-Blob.
402 get_logger()).front());
403 } // Arena_activator arena_ctx(m_arena);
404
405 /* capnp requires: it must be zeroed. And Basic_blob ctor we used does *not* zero it. So memset() it.
406 * Caution! If you choose to change-over to vector<..., util::Default_init_allocator<...>> instead, then
407 * you'll need to add `std::memset(buf_ptr, 0, seg_sz)` here. */
408 memset(buf_ptr, 0, seg_sz);
409
410 // Since we are supposed to grow exponentially, increase this for next time (if any):
411 m_segment_sz += seg_sz;
412 /* @todo MallocMessageBuilder does some bounding according to some maximum. Probably we must do the same.
413 * Get back to this and follow capnp-interface reqs and/or follow what their internal logic does. */
414
415 FLOW_LOG_TRACE("SHM builder [" << *this << "]: Next-size grew exponentially to [" << m_segment_sz << "] "
416 "for next time.");
417
418 return Capnp_word_buf(reinterpret_cast<Word*>(buf_ptr),
419 reinterpret_cast<Word*>(buf_ptr + seg_sz));
420} // Capnp_message_builder::allocateSegment()
421
422template<typename Shm_arena>
423std::ostream& operator<<(std::ostream& os, const Capnp_message_builder<Shm_arena>& val)
424{
425 return os << '@' << &val;
426}
427
428} // namespace ipc::transport::struc::shm
RAII-style class operating a stack-like notion of a the given thread's currently active SHM-aware Are...
Stateless allocator usable with STL-compliant containers to store (or merely read) them directly in S...
A capnp::MessageBuilder used by shm::Builder: similar to a MallocMessageBuilder with the GROW_HEURIST...
Capnp_message_builder(flow::log::Logger *logger_ptr, Arena *arena)
Constructs the message-builder, memorizing the SHM engine it shall use to construct/allocate data int...
kj::ArrayPtr<::capnp::word > allocateSegment(unsigned int min_sz) override
Implements MessageBuilder API.
~Capnp_message_builder()
Decrements owner-process count by 1; if current count is 1 deallocates SHM-stored data.
flow::util::Basic_blob< Borrower_allocator< uint8_t > > Segment_in_shm_borrowed
For easier outside generic programming, this is the read-only-borrower counterpart to Segment_in_shm.
void lend(schema::detail::ShmTopSerialization::Builder *capnp_root, session::shm::Arena_to_shm_session_t< Arena > *shm_session)
To be called after being done mutating underlying structured data, increments owner-process count by ...
flow::util::Basic_blob< Allocator< uint8_t > > Segment_in_shm
The inner data structure stored in SHM representing one capnp-requested segment storing all or part o...
bipc::list< Segment_in_shm_borrowed, Borrower_allocator< Segment_in_shm_borrowed > > Segments_in_shm_borrowed
For easier outside generic programming, this is the read-only-borrower counterpart to Segments_in_shm...
size_t m_segment_sz
Minimum size of the next segment allocated by allocateSegment.
Arena::template Handle< Segments_in_shm > m_serialization_segments
Outer SHM handle to the data structured in SHM that stores the capnp-requested serialization segments...
bipc::list< Segment_in_shm, Allocator< Segment_in_shm > > Segments_in_shm
The outer data structured stored in SHM representing the entire list of capnp-requested segments Segm...
Small group of miscellaneous utilities to ease work with capnp (Cap'n Proto), joining its capnp names...
typename Arena_to_shm_session< Arena >::Type Arena_to_shm_session_t
Alias that, given an Arena type (with Arena::construct<T>() which allocates/constructs a T),...
Definition: shm.hpp:54
Builder< ipc::shm::classic::Pool_arena > Builder
Convenience alias: transport::struc::shm::Builder that works with boost.ipc.shm pools from ipc::shm::...
Definition: classic_fwd.hpp:36
Segregates zero-copy/SHM implementations of concepts residing in parent namespace ipc::transport::str...
void capnp_set_lent_shm_handle(schema::ShmHandle::Builder *shm_handle_root, const flow::util::Blob_sans_log_context &lend_result)
Utility that saves the result of a Shm_session1::lend_object<T>(const shared_ptr<T>&) result into the...
Definition: util.cpp:28
std::ostream & operator<<(std::ostream &os, const Capnp_message_builder< Shm_arena > &val)
Prints string representation of the given Capnp_message_builder to the given ostream.
boost::asio::const_buffer Blob_const
Short-hand for an immutable blob somewhere in memory, stored as exactly a void const * and a size_t.
Definition: util_fwd.hpp:128
Log_component
The flow::log::Component payload enumeration containing various log components used by Flow-IPC inter...
Definition: common.hpp:322