21#include <flow/common.hpp>
22#include <boost/array.hpp>
25static_assert(
false,
"Should not have gotten to this line; should have required Linux; this .cpp file assumes it. "
26 "Might work in other POSIX OS (e.g., macOS) but must be checked/tested.");
29#include <sys/socket.h>
41 namespace bind_ns = flow::util::bind_ns;
42 using boost::system::system_category;
44 namespace sys_err_codes = boost::system::errc;
56 logger_ptr, peer_socket_ptr, payload_hndl, bind_ns::cref(payload_blob), _1);
63 assert(peer_socket_ptr);
64 auto& peer_socket = *peer_socket_ptr;
66 FLOW_LOG_SET_CONTEXT(logger_ptr, Log_component::S_TRANSPORT);
67 FLOW_LOG_TRACE(
"Connected local peer socket wants to write from location @ [" << payload_blob.data() <<
"] "
68 "plus native handle [" << payload_hndl <<
"]. Will try to send.");
84 iovec native_buf1 = {
const_cast<void*
>(payload_blob.data()), payload_blob.size() };
109 constexpr size_t N_PAYLOAD_FDS = 1;
132 } msg_control_as_union;
134 sendmsg_hdr.msg_control = msg_control_as_union.m_buf.c_array();
135 sendmsg_hdr.msg_controllen =
sizeof(msg_control_as_union.m_buf);
136 cmsghdr*
const sendmsg_hdr_cmsg_ptr = CMSG_FIRSTHDR(&sendmsg_hdr);
137 sendmsg_hdr_cmsg_ptr->cmsg_level = SOL_SOCKET;
138 sendmsg_hdr_cmsg_ptr->cmsg_type = SCM_RIGHTS;
142 static_assert(N_PAYLOAD_FDS == 1,
"Should be only passing one native handle into sendmsg() as of this writing.");
145 const auto n_sent_or_error
146 = sendmsg(peer_socket.native_handle(), &sendmsg_hdr,
152 MSG_DONTWAIT | MSG_NOSIGNAL);
155 if (n_sent_or_error == -1)
162 const Error_code sys_err_code(errno, system_category());
163 if ((sys_err_code == sys_err_codes::operation_would_block) ||
164 (sys_err_code == sys_err_codes::resource_unavailable_try_again))
166 FLOW_LOG_TRACE(
"Write attempt indicated would-block; not an error condition. Nothing sent.");
177 *err_code = boost::asio::error::would_block;
182 assert(sys_err_code);
185 FLOW_ERROR_SYS_ERROR_LOG_WARNING();
186 FLOW_LOG_WARNING(
"Connected local peer socket tried to write from "
187 "location @ [" << payload_blob.data() <<
"] plus native handle [" << payload_hndl <<
"]; "
188 "but an unrecoverable error occurred. Nothing sent.");
189 *err_code = sys_err_code;
194 assert (n_sent_or_error > 0);
196 FLOW_LOG_TRACE(
"sendmsg() reports the native handle [" << payload_hndl <<
"] was successfully sent; as "
197 "were [" << n_sent_or_error <<
"] of the blob's [" << payload_blob.size() <<
"] bytes.");
199 return n_sent_or_error;
209 namespace bind_ns = flow::util::bind_ns;
210 using boost::system::system_category;
212 namespace sys_err_codes = boost::system::errc;
219 using ::MSG_DONTWAIT;
224 logger_ptr, peer_socket_ptr, target_payload_hndl_ptr,
225 bind_ns::cref(target_payload_blob), _1, message_flags);
232 assert(peer_socket_ptr);
233 assert(target_payload_hndl_ptr);
235 auto& peer_socket = *peer_socket_ptr;
236 auto& target_payload_hndl = *target_payload_hndl_ptr;
238 FLOW_LOG_SET_CONTEXT(logger_ptr, Log_component::S_TRANSPORT);
239 FLOW_LOG_TRACE(
"Connected local peer socket wants to read up to [" << target_payload_blob.size() <<
"] bytes "
240 "to location @ [" << target_payload_blob.data() <<
"] "
241 "plus possibly a native handle. Will try to receive.");
243 assert(target_payload_hndl.null());
253 iovec native_buf1 = { target_payload_blob.data(), target_payload_blob.size() };
269 recvmsg_hdr.msg_flags = 0;
277 constexpr size_t N_PAYLOAD_FDS = 1;
283 } msg_control_as_union;
285 msg_control_as_union.m_buf.fill(0);
287 recvmsg_hdr.msg_control = msg_control_as_union.m_buf.c_array();
288 recvmsg_hdr.msg_controllen =
sizeof(msg_control_as_union.m_buf);
290 const auto n_rcvd_or_error
291 = recvmsg(peer_socket.native_handle(), &recvmsg_hdr,
294 MSG_DONTWAIT | message_flags);
298 if (n_rcvd_or_error == -1)
303 const Error_code sys_err_code(errno, system_category());
304 if ((sys_err_code == sys_err_codes::operation_would_block) ||
305 (sys_err_code == sys_err_codes::resource_unavailable_try_again))
309 FLOW_LOG_TRACE(
"Read attempt indicated would-block; not an error condition. Nothing received.");
311 *err_code = boost::asio::error::would_block;
316 assert(sys_err_code);
319 FLOW_ERROR_SYS_ERROR_LOG_WARNING();
320 FLOW_LOG_WARNING(
"Connected local peer socket tried to read up to [" << target_payload_blob.size() <<
"] bytes at "
321 "location @ [" << target_payload_blob.data() <<
"] plus possibly a native handle; "
322 "but an unrecoverable error occurred. Nothing received.");
323 *err_code = sys_err_code;
328 if (n_rcvd_or_error == 0)
333 FLOW_LOG_TRACE(
"Connected local peer socket tried to read up to [" << target_payload_blob.size() <<
"] bytes at "
334 "location @ [" << target_payload_blob.data() <<
"] plus possibly a native handle; "
335 "but it returned EOF meaning orderly connection shutdown by peer. Nothing received.");
336 *err_code = boost::asio::error::eof;
340 assert(n_rcvd_or_error > 0);
345 if (recvmsg_hdr.msg_flags != 0)
347 FLOW_LOG_INFO(
"Connected local peer socket tried to read up to [" << target_payload_blob.size() <<
"] bytes at "
348 "location @ [" << target_payload_blob.data() <<
"] plus possibly a native handle; "
349 "and it returned it read [" << n_rcvd_or_error <<
"] bytes successfully but also returned raw "
350 "out-flags value [0x" << std::hex << recvmsg_hdr.msg_flags << std::dec <<
"]. "
351 "Will check for relevant flags but otherwise "
352 "ignoring if nothing bad. Logging at elevated level because it's interesting; please investigate.");
354 if ((recvmsg_hdr.msg_flags & MSG_CTRUNC) != 0)
356 FLOW_LOG_WARNING(
"Connected local peer socket tried to read up to [" << target_payload_blob.size() <<
"] bytes "
357 "at location @ [" << target_payload_blob.data() <<
"] plus possibly a native handle; "
358 "and it returned it read [" << n_rcvd_or_error <<
"] bytes successfully but also returned raw "
359 "out-flags value [0x" << recvmsg_hdr.msg_flags <<
"] which includes MSG_CTRUNC. "
360 "That flag indicates more stuff was sent as ancillary data; but we expect at most 1 native "
361 "handle. Other side sent something strange. Acting as if nothing received + error.");
370 cmsghdr*
const recvmsg_hdr_cmsg_ptr = CMSG_FIRSTHDR(&recvmsg_hdr);
371 if (recvmsg_hdr_cmsg_ptr)
374 if ((recvmsg_hdr_cmsg_ptr->cmsg_level == SOL_SOCKET) &&
375 (recvmsg_hdr_cmsg_ptr->cmsg_type == SCM_RIGHTS))
377 static_assert(N_PAYLOAD_FDS == 1,
"Should be only dealing with one native handle with recvmsg() "
378 "as of this writing.");
379 target_payload_hndl.m_native_handle
384 FLOW_LOG_WARNING(
"Connected local peer socket tried to read up to [" << target_payload_blob.size() <<
"] bytes "
385 "at location @ [" << target_payload_blob.data() <<
"] plus possibly a native handle; "
386 "and it returned it read [" << n_rcvd_or_error <<
"] bytes successfully but also "
387 "unexpected ancillary data of csmg_level|cmsg_type "
388 "[" << recvmsg_hdr_cmsg_ptr->cmsg_level <<
'|' << recvmsg_hdr_cmsg_ptr->cmsg_type <<
"]. "
389 "Acting as if nothing received + error.");
394 if (CMSG_NXTHDR(&recvmsg_hdr, recvmsg_hdr_cmsg_ptr))
398 FLOW_LOG_WARNING(
"Connected local peer socket tried to read up to [" << target_payload_blob.size() <<
"] bytes "
399 "at location @ [" << target_payload_blob.data() <<
"] plus possibly a native handle; "
400 "and it returned it read [" << n_rcvd_or_error <<
"] bytes and native handle "
401 "[" << target_payload_hndl <<
"] but also more ancillary data; but we expect at most 1 native "
402 "handle. Other side sent something strange. Acting as if nothing received + error.");
411 FLOW_LOG_TRACE(
"recvmsg() reports receipt of possible native handle [" << target_payload_hndl <<
"]; as well as "
412 "[" << n_rcvd_or_error <<
"] of the blob's [" << target_payload_blob.size() <<
"]-byte capacity.");
414 return n_rcvd_or_error;
419 using flow::util::Task_engine;
422 if (peer_socket_native_or_null.null())
431 Task_engine task_engine;
432 [[maybe_unused]]
Peer_socket sock(task_engine,
Protocol(), peer_socket_native_or_null.m_native_handle);
Opt_peer_process_credentials()
Default ctor: each value is initialized to zero or equivalent.
Opt_peer_process_credentials & operator=(const Opt_peer_process_credentials &src)
Boring copy assignment.
Additional (versus boost.asio) APIs for advanced work with local stream (Unix domain) sockets includi...
local_ns::stream_protocol Protocol
Short-hand for boost.asio Unix domain stream-socket protocol.
size_t nb_write_some_with_native_handle(flow::log::Logger *logger_ptr, Peer_socket *peer_socket_ptr, Native_handle payload_hndl, const util::Blob_const &payload_blob, Error_code *err_code)
boost.asio extension similar to peer_socket->non_blocking(true); auto n = peer_socket->write_some(pay...
Protocol::socket Peer_socket
Short-hand for boost.asio Unix domain peer stream-socket (usually-connected-or-empty guy).
size_t nb_read_some_with_native_handle(flow::log::Logger *logger_ptr, Peer_socket *peer_socket_ptr, Native_handle *target_payload_hndl_ptr, const util::Blob_mutable &target_payload_blob, Error_code *err_code, int message_flags)
boost.asio extension similar to peer_socket->non_blocking(true); auto n = peer_socket->read_some(targ...
void release_native_peer_socket(Native_handle &&peer_socket_native_or_null)
Little utility that returns the raw Native_handle suitable for Peer_socket to the OS.
@ S_LOW_LVL_UNEXPECTED_STREAM_PAYLOAD_BEYOND_HNDL
Unable to receive incoming traffic: message contains more than: 1 blob plus 0-1 native handles.
util::Native_handle Native_handle
Convenience alias for the commonly used type util::Native_handle.
boost::asio::mutable_buffer Blob_mutable
Short-hand for an mutable blob somewhere in memory, stored as exactly a void* and a size_t.
boost::asio::const_buffer Blob_const
Short-hand for an immutable blob somewhere in memory, stored as exactly a void const * and a size_t.
flow::Error_code Error_code
Short-hand for flow::Error_code which is very common.
A monolayer-thin wrapper around a native handle, a/k/a descriptor a/k/a FD.
int handle_t
The native handle type. Much logic relies on this type being light-weight (fast to copy).
handle_t m_native_handle
The native handle (possibly equal to S_NULL_HANDLE), the exact payload of this Native_handle.