24#include <boost/thread.hpp>
25#include <boost/unordered_map.hpp>
26#include <boost/shared_ptr.hpp>
27#include <boost/weak_ptr.hpp>
141template<
typename Thread_local_state_t>
144 private boost::noncopyable
178 static_assert(!std::is_convertible_v<Thread_local_state*, log::Log_context*>,
179 "Thread_local_state_t template param type should not derive from log::Log_context, as "
180 "then set_logger() is not thread-safe; please use Log_context_mt (but be mindful of "
181 "locking-while-logging perf effects in fast-paths).");
416 template<
typename Task>
585 boost::thread_specific_ptr<Tl_context>
m_tsp;
594 template<
typename... Ctor_args>
684 boost::shared_ptr<Registry_ctl>
m_ctl;
815template<
typename Shared_state_t>
817 private boost::noncopyable
841 template<
typename... Ctor_args>
858 template<
typename Task>
916template<
typename Thread_local_state_t>
919 decltype(m_create_state_func)&& create_state_func) :
923 m_create_state_func(std::move(create_state_func)),
924 m_nickname(nickname_str),
925 m_this_thread_state_or_null(cleanup),
929 "Registry created (watched type has ID [" <<
typeid(
Thread_local_state).name() <<
"]).");
932template<
typename Thread_local_state_t>
936 const auto ctx = m_this_thread_state_or_null.m_tsp.get();
937 return ctx ? ctx->m_state :
nullptr;
940template<
typename Thread_local_state_t>
946 auto ctx = m_this_thread_state_or_null.m_tsp.get();
957 while_locked([&](
auto&&...)
960 decltype(m_create_state_func) create_state_func;
961 if (m_create_state_func.empty())
963 if constexpr(S_TL_STATE_HAS_MT_LOG_CONTEXT && std::is_constructible_v<Thread_local_state, Logger*>)
967 else if constexpr((!S_TL_STATE_HAS_MT_LOG_CONTEXT) && std::is_default_constructible_v<Thread_local_state>)
973 FLOW_LOG_FATAL(
"Chose not to supply m_create_state_func at time of needing a new Thread_local_state. "
974 "In this case you must either have <derived from Log_context_mt and supplied ctor "
975 "form T_l_s{lgr} (where lgr is a Logger*)> or <*not* derived from Log_context_mt but "
976 "supplied made T_l_s default-ctible>. Breaks contract; aborting.");
977 assert(
false &&
"Chose not to supply m_create_state_func at time of needing a new Thread_local_state. "
978 "In this case you must either have <derived from Log_context_mt and supplied ctor "
979 "form T_l_s{lgr} (where lgr is a Logger*)> or <*not* derived from Log_context_mt but "
980 "supplied made T_l_s default-ctible>. Breaks contract.");
994 create_state_func = m_create_state_func;
997 ctx =
new Tl_context;
998 ctx->m_ctl_observer = m_ctl;
999 ctx->m_state = create_state_func();
1001 m_this_thread_state_or_null.m_tsp.reset(ctx);
1006 typename decltype(Registry_ctl::m_state_per_thread)::value_type state_and_mdt{ctx->m_state, Metadata{}};
1007 auto& mdt = state_and_mdt.second;
1014 mdt.m_thread_id = this_thread::get_id();
1018 FLOW_LOG_INFO(
"Tl_registry[" << *
this <<
"]: Adding thread-local-state @[" << ctx->m_state <<
"] "
1019 "for thread ID [" << mdt.m_thread_id <<
"]/nickname [" << mdt.m_thread_nickname <<
"]; "
1020 "watched type has ID [" <<
typeid(Thread_local_state).name() <<
"]).");
1024 m_ctl->m_state_per_thread.insert(state_and_mdt);
1025 assert(result.second &&
"How did `state` ptr value get `new`ed, if another thread has not cleaned up same yet?");
1030 return ctx->m_state;
1033template<
typename Thread_local_state_t>
1039 "Registry shutting down (watched type has ID [" <<
typeid(
Thread_local_state).name() <<
"]). "
1040 "Will now delete thread-local-state for each thread that has not exited before this point.");
1041 vector<Thread_local_state*> states_to_delete;
1042 while_locked([&](
auto&&...)
1044 for (
const auto& state_and_mdt : m_ctl->m_state_per_thread)
1046 const auto state = state_and_mdt.first;
1047 const auto& mdt = state_and_mdt.second;
1048 FLOW_LOG_INFO(
"Tl_registry[" << *this <<
"]: Deleting thread-local-state @[" << state <<
"] "
1049 "for thread ID [" << mdt.m_thread_id <<
"]/nickname [" << mdt.m_thread_nickname <<
"].");
1052 states_to_delete.push_back(state);
1054 m_ctl->m_state_per_thread.clear();
1057 for (
const auto state : states_to_delete)
1074 m_this_thread_state_or_null.m_tsp.release();
1079template<
typename Thread_local_state_t>
1092 const auto shared_ptr_to_ctl = weak_ptr_to_ctl.lock();
1093 if (!shared_ptr_to_ctl)
1111 Lock lock{shared_ptr_to_ctl->m_mutex};
1112 do_delete = (shared_ptr_to_ctl->m_state_per_thread.erase(ctx->
m_state) == 1);
1129template<
typename Thread_local_state_t>
1130template<
typename Task>
1133 Lock lock{m_ctl->m_mutex};
1137template<
typename Thread_local_state_t>
1141 assert(safety_lock.owns_lock() &&
"Please call with value while_locked() passed to your task().");
1143 return m_ctl->m_state_per_thread;
1146template<
typename Thread_local_state_t>
1152template<
typename Thread_local_state_t>
1157 Log_context_mt::set_logger(logger_ptr);
1159 if constexpr(S_TL_STATE_HAS_MT_LOG_CONTEXT)
1161 while_locked([&](
auto&&...)
1163 for (
const auto& state_and_mdt : m_ctl->m_state_per_thread)
1165 const auto state = state_and_mdt.first;
1167 state->set_logger(logger_ptr);
1173template<
typename Thread_local_state_t>
1174template<
typename... Ctor_args>
1176 m_tsp(std::forward<Ctor_args>(ctor_args)...)
1181template<
typename Thread_local_state_t>
1184 return os <<
'[' << val.
nickname() <<
"]@" << &val;
1189template<
typename Shared_state_t>
1190template<
typename... Ctor_args>
1192 m_poll_flag_registry(nullptr,
"",
1195 return new Atomic_flag{
false}; }),
1196 m_shared_state(std::forward<Ctor_args>(shared_state_ctor_args)...)
1201template<
typename Shared_state_t>
1202template<
typename Task>
1205 static_assert(!(std::is_empty_v<Shared_state>),
1206 "There is no need to call while_locked(), when your Shared_state type is empty; "
1207 "doing the latter is useful when only Polled_shared_state thread-local flag arm/poll feature "
1208 "is needed; but then there's no need to lock anything.");
1211 task(&m_shared_state);
1214template<
typename Shared_state_t>
1217 return static_cast<void*
>(m_poll_flag_registry.this_thread_state());
1220template<
typename Shared_state_t>
1223 using Atomic_flag =
typename decltype(m_poll_flag_registry)::Thread_local_state;
1225 static_cast<Atomic_flag*
>(thread_poll_state)->store(
true, std::memory_order_acquire);
1250template<
typename Shared_state_t>
1253 using Atomic_flag =
typename decltype(m_poll_flag_registry)::Thread_local_state;
1258 return static_cast<Atomic_flag*
>(thread_poll_state)->exchange(
false, std::memory_order_release);
Identical to Log_context but is safe w/r/t to set_logger(), assignment, and swap() done concurrently ...
Interface that the user should implement, passing the implementing Logger into logging classes (Flow'...
static void set_thread_info(std::string *call_thread_nickname, util::Thread_id *call_thread_id)
Same as set_thread_info_in_msg_metadata() but targets the given two variables as opposed to a Msg_met...
Optional-use companion to Thread_local_state_registry that enables the Polled_share_state pattern whe...
void arm_next_poll(void *thread_poll_state)
To be called from any context (typically not the targeted thread-local context in which you'll be che...
Thread_local_state_registry< std::atomic< bool > > m_poll_flag_registry
An atomic "do-something" flag per thread; usually/initially false; armed to true by arm_next_poll() a...
bool poll_armed(void *thread_poll_state)
If the given thread's poll-flag is not armed, no-ops and returns false; otherwise returns true and re...
Shared_state_t Shared_state
Short-hand for template parameter type.
void * this_thread_poll_state()
To be called from a thread-local context in which you'll be checking poll_armed(),...
void while_locked(const Task &task)
Locks the non-recursive shared-state mutex, such that no access or modification of the contents of th...
Shared_state m_shared_state
The managed Shared_state.
Mutex_non_recursive m_shared_state_mutex
Protects m_shared_state.
Polled_shared_state(Ctor_args &&... shared_state_ctor_args)
Forwards to the stored object's Shared_state ctor.
Similar to boost::thread_specific_ptr<T> but with built-in lazy-init semantics; and more importantly ...
Thread_local_state * this_thread_state()
Returns pointer to this thread's thread-local object, first constructing it via m_create_state_func i...
Thread_local_state_registry(log::Logger *logger_ptr, String_view nickname_str, decltype(m_create_state_func)&&create_state_func={})
Create empty registry.
~Thread_local_state_registry()
Deletes each Thread_local_state to have been created so far by calls to this_thread_state() from vari...
std::ostream & operator<<(std::ostream &os, const Thread_local_state_registry< Thread_local_state_t > &val)
Serializes a Thread_local_state_registry to a standard output stream.
Function< Thread_local_state *()> m_create_state_func
this_thread_state(), when needing to create a thread's local new Thread_local_state to return,...
void set_logger(log::Logger *logger_ptr)
Performs Log_context_mt::set_logger(logger_ptr); and – if S_TL_STATE_HAS_MT_LOG_CONTEXT is true – pro...
Lock::mutex_type Mutex
Short-hand for mutex type.
void while_locked(const Task &task)
Locks the non-recursive registry mutex, such that no access or modification of the (deep or shallow) ...
boost::unordered_map< Thread_local_state *, Metadata > State_per_thread_map
Return type of state_per_thread().
const std::string & nickname() const
Returns nickname, a brief string suitable for logging.
const State_per_thread_map & state_per_thread(const Lock &safety_lock) const
Returns reference to immutable container holding info for each thread in which this_thread_state() ha...
Thread_local_state * this_thread_state_or_null()
Returns pointer to this thread's thread-local object, if it has been created via an earlier this_thre...
Tsp_wrapper m_this_thread_state_or_null
In a given thread T, m_this_thread_state_or_null.get() is null if this_thread_state() has not yet bee...
static void cleanup(Tl_context *ctx)
Called by thread_specific_ptr for a given thread's m_this_thread_state_or_null.m_tsp....
boost::shared_ptr< Registry_ctl > m_ctl
The non-thread-local state. See Registry_ctl docs. shared_ptr is used only for weak_ptr.
static constexpr bool S_TL_STATE_HAS_MT_LOG_CONTEXT
true if and only if Thread_local_state is a public sub-class of log::Log_context_mt which has implica...
Thread_local_state_t Thread_local_state
Short-hand for template parameter type. See our class doc header for requirements.
Lock_guard< Mutex_non_recursive > Lock
Short-hand for mutex lock; made public for use in while_locked() and state_per_thread().
const std::string m_nickname
See nickname().
#define FLOW_LOG_INFO(ARG_stream_fragment)
Logs an INFO message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
#define FLOW_LOG_FATAL(ARG_stream_fragment)
Logs a FATAL message into flow::log::Logger *get_logger() with flow::log::Component get_log_component...
Function< void()> Task
Short-hand for a task that can be posted for execution by a Concurrent_task_loop or flow::util::Task_...
Flow module containing miscellaneous general-use facilities that don't fit into any other Flow module...
Thread::id Thread_id
Short-hand for an OS-provided ID of a util::Thread.
boost::unique_lock< Mutex > Lock_guard
Short-hand for advanced-capability RAII lock guard for any mutex, ensuring exclusive ownership of tha...
boost::mutex Mutex_non_recursive
Short-hand for non-reentrant, exclusive mutex. ("Reentrant" = one can lock an already-locked-in-that-...
Basic_string_view< char > String_view
Commonly used char-based Basic_string_view. See its doc header.
Flow_log_component
The flow::log::Component payload enumeration comprising various log components used by Flow's own int...
The entirety of the cross-thread registry state, in a struct so as to be able to wrap it in a shared_...
Mutex m_mutex
Protects the Registry_ctl (or m_state_per_thread; same difference).
State_per_thread_map m_state_per_thread
Registry containing each Thread_local_state, one per distinct thread to have created one via this_thr...
The actual user Thread_local_state stored per thread as lazily-created in this_thread_state(); plus a...
boost::weak_ptr< Registry_ctl > m_ctl_observer
Observer of (existent or non-existent) daddy's m_ctl. See Tl_context doc header for explanation.
Thread_local_state * m_state
The main user state.
Simply wraps a boost::thread_specific_ptr<Tl_context>, adding absolutely no data or algorithms,...
Tsp_wrapper(const Tsp_wrapper &src)=delete
Forbid copy.
boost::thread_specific_ptr< Tl_context > m_tsp
What we wrap and forward-to-and-fro.
Tsp_wrapper & operator=(const Tsp_wrapper &src)=delete
Forbid copy.
Tsp_wrapper(Ctor_args &&... ctor_args)
Constructs payload.