Flow-IPC 2.0.0
Flow-IPC project: Full implementation reference.
timer_ev_emitter.cpp
Go to the documentation of this file.
1/* Flow-IPC: Core
2 * Copyright 2023 Akamai Technologies, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the
5 * "License"); you may not use this file except in
6 * compliance with the License. You may obtain a copy
7 * of the License at
8 *
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in
12 * writing, software distributed under the License is
13 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
14 * CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing
16 * permissions and limitations under the License. */
17
18/// @file
19
22#include <flow/error/error.hpp>
23#include <boost/move/make_unique.hpp>
24
25namespace ipc::util::sync_io
26{
27
28// Implementations.
29
30Timer_event_emitter::Timer_event_emitter(flow::log::Logger* logger_ptr, String_view nickname_str) :
31 flow::log::Log_context(logger_ptr, Log_component::S_UTIL),
32 m_nickname(nickname_str),
33 m_worker(get_logger(),
34 /* (Linux) OS thread name will truncate m_nickname to 15-5=10 chars here; high chance that'll include
35 * something decently useful; probably not everything though; depends on nickname. It's a decent attempt. */
36 std::string("TEvE-") + m_nickname)
37{
38 m_worker.start(flow::async::reset_this_thread_pinning);
39 // Don't inherit any strange core-affinity! ^-- Worker must float free.
40
41 FLOW_LOG_TRACE("Timer_event_emitter [" << *this << "]: Idle timer-emitter thread started.");
42}
43
45{
46 using flow::util::Timer;
47
48 return Timer(*(m_worker.task_engine()));
49}
50
52{
53 using boost::asio::writable_pipe;
54 using boost::asio::connect_pipe;
55 using boost::movelib::make_unique;
56
57 auto read_end = make_unique<Timer_fired_read_end>(*(m_worker.task_engine()));
58 auto write_end = make_unique<writable_pipe>(*(m_worker.task_engine()));
59 Error_code sys_err_code;
60 connect_pipe(*read_end, *write_end, sys_err_code);
61
62 if (sys_err_code)
63 {
64 // @todo Someday we should report this as an error, etc. Low-priority, as at this stage everything is kaput.
65 FLOW_LOG_FATAL("connect_pipe() failed; this should never happen. Details follow.");
66 FLOW_ERROR_SYS_ERROR_LOG_FATAL();
67 assert(false && "connect_pipe() failed; this should never happen. Details follow."); std::abort();
68 return nullptr;
69 }
70 // else
71
72 auto& read_end_ref = m_signal_pipe_readers.emplace_back(std::move(read_end));
73
74#ifndef NDEBUG
75 const auto result =
76#endif
77 m_signal_pipe_writers.insert({ read_end_ref.get(),
78 std::move(write_end) });
79 assert(result.second);
80
81 FLOW_LOG_TRACE("Timer_event_emitter [" << *this << "]: Pipe created; read-end ptr = [" << read_end_ref.get() << "].");
82
83 return read_end_ref.get();
84} // Timer_event_emitter::create_timer_signal_pipe()
85
86void Timer_event_emitter::timer_async_wait(flow::util::Timer* timer, Timer_fired_read_end* read_end)
87{
88 FLOW_LOG_TRACE("Timer_event_emitter [" << *this << "]: Starting timer async-wait; when/if it fires, "
89 "we will write to write-end corresponding to read-end ptr = [" << read_end << "].");
90
91 /* Careful: this is a user thread, but the handler below is in our worker thread.
92 * Accessing m_signal_pipe_writers inside there would be not thread-safe, as if some timer happens to fire,
93 * while they (say) create_timer_signal_pipe() for another future timer's purposes, things could explode.
94 * Save the value now; it is accurate and guaranteed valid until `*this` dies (which can only happen
95 * after thread is joined, meaning handler couldn't possibly be executing). */
96 const auto it = m_signal_pipe_writers.find(read_end);
97 assert((it != m_signal_pipe_writers.end()) && "timer_async_wait() invoked on read_end from another *this?");
98 const auto write_end = it->second.get();
99
100 timer->async_wait([this, write_end, read_end](const Error_code& async_err_code)
101 {
102 auto sys_err_code = async_err_code;
103
104 if (sys_err_code == boost::asio::error::operation_aborted)
105 {
106 // As promised we only report actual firings. Stuff is shutting down, or timer canceled. GTFO.
107 return;
108 }
109 // else
110
111 if (sys_err_code)
112 {
113 /* This decision (to simply pretend it was a success code) is borrowed from flow::async::schedule_*().
114 * assert() is another option, but use Flow wisdom. Note, e.g., snd_auto_ping_now() does the same. */
115 FLOW_ERROR_SYS_ERROR_LOG_WARNING();
116 FLOW_LOG_WARNING("Timer_event_emitter [" << *this << "]: "
117 "Timer system error; just logged; totally unexpected; pretending it fired normally.");
118 }
119
120 FLOW_LOG_TRACE("Timer_event_emitter [" << *this << "]: Timer fired. Pushing byte to pipe; "
121 "read_end ptr = [" << read_end << "]. User code should detect event soon and cause byte "
122 "to be popped.");
123
124 util::pipe_produce(get_logger(), write_end);
125 }); // timer->async_wait()
126} // Timer_event_emitter::timer_async_wait()
127
129{
130 FLOW_LOG_TRACE("Timer_event_emitter [" << *this << "]: Timer presumably fired and pushed byte to pipe; "
131 "we now pop it.");
132
133 util::pipe_consume(get_logger(), read_end);
134} // Timer_event_emitter::consume_timer_firing_signal()
135
136std::ostream& operator<<(std::ostream& os, const Timer_event_emitter& val)
137{
138 return
139 os << '[' << val.m_nickname << "]@" << static_cast<const void*>(&val);
140
141}
142
143} // namespace ipc::util::sync_io
An object of this type, used internally to implement sync_io-pattern objects that require timer event...
boost::asio::readable_pipe Timer_fired_read_end
Object representing the read end of IPC mechanism, where readable status indicates the associated tim...
const std::string m_nickname
Nickname as passed to ctor.
void consume_timer_firing_signal(Timer_fired_read_end *read_end)
Must be called after a timer_async_wait()-triggered timer fired, before invoking that method for the ...
void timer_async_wait(flow::util::Timer *timer, Timer_fired_read_end *read_end)
To be used on a timer T returned by create_timer(), this is the replacement for the usual T....
Timer_event_emitter(flow::log::Logger *logger_ptr, String_view nickname_str)
Constructs emitter, creating idle thread managing no timers.
flow::util::Timer create_timer()
Creates idle timer for use with timer_async_wait() subsequently.
boost::unordered_map< Timer_fired_read_end *, boost::movelib::unique_ptr< boost::asio::writable_pipe > > m_signal_pipe_writers
Stores the write-ends of the pipes created in create_timer_signal_pipe(), indexed by pointer to their...
flow::async::Single_thread_task_loop m_worker
The thread where (only) timer-firing events (from create_timer()-created Timers) execute.
Timer_fired_read_end * create_timer_signal_pipe()
Creates, and internally stores, an IPC mechanism instance intended for use with a given create_timer(...
std::vector< boost::movelib::unique_ptr< Timer_fired_read_end > > m_signal_pipe_readers
The readers (never null) returned by create_timer_signal_pipe().
Contains common code, as well as important explanatory documentation in the following text,...
Definition: util_fwd.hpp:209
std::ostream & operator<<(std::ostream &os, const Timer_event_emitter &val)
Prints string representation of the given Timer_event_emitter to the given ostream.
void pipe_produce(flow::log::Logger *logger_ptr, Pipe_writer *pipe)
Writes a byte to the given pipe writer.
Definition: util.cpp:67
void pipe_consume(flow::log::Logger *logger_ptr, Pipe_reader *pipe)
Reads a byte via the given pipe reader.
Definition: util.cpp:96
flow::util::String_view String_view
Short-hand for Flow's String_view.
Definition: util_fwd.hpp:115
Log_component
The flow::log::Component payload enumeration containing various log components used by Flow-IPC inter...
Definition: common.hpp:323
flow::Error_code Error_code
Short-hand for flow::Error_code which is very common.
Definition: common.hpp:298