Flow 1.0.0
Flow project: Full implementation reference.
Public Member Functions | Static Public Attributes | Private Attributes | List of all members
flow::async::Task_qing_thread Class Reference

Internally used building block of various concrete Concurrent_task_loop subclasses that encapsulates a thread that spawns at construction time and a dedicated-or-shared util::Task_engine (a/k/a boost.asio io_service) run()ning in that new thread. More...

#include <task_qing_thread.hpp>

Inheritance diagram for flow::async::Task_qing_thread:
[legend]
Collaboration diagram for flow::async::Task_qing_thread:
[legend]

Public Member Functions

 Task_qing_thread (flow::log::Logger *logger_ptr, util::String_view nickname, const Task_engine_ptr &task_engine, bool own_task_engine, boost::promise< void > *done_promise_else_block=0, Task &&init_func_or_empty=Task())
 Constructs object, immediately spawning new (worker) thread, memorizing a ref-counted reference to the provided util::Task_engine which may or may not be a fresh one and meant to be shared with other Task_qing_threads or exclusively by this one. More...
 
 ~Task_qing_thread ()
 stop(), followed by forgetting the Task_engine returned by task_engine(); the latter action may destroy that Task_engine synchronously. More...
 
Task_engine_ptr task_engine ()
 Returns pointer to util::Task_engine such that post()ing to it will cause the subsequent asynchronous execution of that task in a way explained in the Task_qing_thread() constructor doc header. More...
 
util::Threadraw_worker_thread ()
 Returns the util::Thread – a thin wrapper around the native OS thread handle – corresponding to the worker thread started in constructor. More...
 
void stop ()
 Blocks the calling thread until the constructor-started thread has finished; if the underlying Task_engine is not shared then first signals it to stop executing any further Tasks, thus causing the constructor-started thread to in fact finish soon and hence this method to return soon. More...
 
- Public Member Functions inherited from flow::log::Log_context
 Log_context (Logger *logger=0)
 Constructs Log_context by storing the given pointer to a Logger and a null Component. More...
 
template<typename Component_payload >
 Log_context (Logger *logger, Component_payload component_payload)
 Constructs Log_context by storing the given pointer to a Logger and a new Component storing the specified generically typed payload (an enum value). More...
 
 Log_context (const Log_context &src)
 Copy constructor that stores equal Logger* and Component values as the source. More...
 
 Log_context (Log_context &&src)
 Move constructor that makes this equal to src, while the latter becomes as-if default-constructed. More...
 
Log_contextoperator= (const Log_context &src)
 Assignment operator that behaves similarly to the copy constructor. More...
 
Log_contextoperator= (Log_context &&src)
 Move assignment operator that behaves similarly to the move constructor. More...
 
void swap (Log_context &other)
 Swaps Logger pointers and Component objects held by *this and other. More...
 
Loggerget_logger () const
 Returns the stored Logger pointer, particularly as many FLOW_LOG_*() macros expect. More...
 
const Componentget_log_component () const
 Returns reference to the stored Component object, particularly as many FLOW_LOG_*() macros expect. More...
 

Static Public Attributes

static const int S_BAD_EXIT = 1
 exit() code returned to OS in the event Task_qing_thread chooses to exit() the entire program (as of this writing, this occurs on uncaught exception by a user task). More...
 

Private Attributes

Task_engine_ptr m_task_engine
 See task_engine(). More...
 
bool m_own_task_engine
 See constructor. More...
 
boost::movelib::unique_ptr< util::Threadm_worker_thread
 Thread created in constructor. Not-a-thread after stop(); not not-a-thread before stop(). More...
 

Detailed Description

Internally used building block of various concrete Concurrent_task_loop subclasses that encapsulates a thread that spawns at construction time and a dedicated-or-shared util::Task_engine (a/k/a boost.asio io_service) run()ning in that new thread.

This class:

Design rationale

Task_qing_thread() constructor doc header contains discussion worth reading.

Definition at line 59 of file task_qing_thread.hpp.

Constructor & Destructor Documentation

◆ Task_qing_thread()

flow::async::Task_qing_thread::Task_qing_thread ( flow::log::Logger logger_ptr,
util::String_view  nickname,
const Task_engine_ptr task_engine,
bool  own_task_engine,
boost::promise< void > *  done_promise_else_block = 0,
Task &&  init_func_or_empty = Task() 
)
explicit

Constructs object, immediately spawning new (worker) thread, memorizing a ref-counted reference to the provided util::Task_engine which may or may not be a fresh one and meant to be shared with other Task_qing_threads or exclusively by this one.

The post-condition of this constructor is:

  • If done_promise_else_block is null, it's:
    • The new thread has started.
    • If init_func_or_empty is not .empty(), then: init_func_or_empty() ran first-thing in that thread and returned.
      • Note: This is a useful guarantee, if, say, you need to perform some privileged actions at thread setup; once this post-condition holds, it is safe to drop privileges.
    • The thread is ready to participate in task-posting via post(*this->task_engine(), F) and similar.
  • If done_promise_else_block is not null, it's:
    • None. However, if one executes done_promise_else_block->get_future().wait(), upon return from that statement, the above if-done_promise_else_block-is-null post-conditions will hold.
Note
!done_promise_else_block mode is typical. Just wait for construction, then post() away. The other mode is helpful when one has an N-thread pool and wants each Task_qing_thread to initialize concurrently with the others instead of serially. Then one can pass in N promises to N Task_qing_thread ctor calls, then wait on the N respective unique_futures.
When creating the util::Task_engine task_engine, it is usually a good idea for perf to pass the concurrency-hint arg value 1 (one) if either own_task_engine == true, or the overall thread pool simply will have but 1 thread. This will allow boost.asio to optimize internal locking and such.

The worker thread started by each Task_qing_thread constructor will exit upon any uncaught exception by one of the user-supplied Tasks post()ed onto it subsequently. If this occurs, the handler will exit() the entire program with a non-zero code after logging (to *logger_ptr) the exception message. (It is informally recommended that all other threads in the application do the same.)

Assuming no such uncaught exception is thrown, the thread will run until stop() or the destructor is called and returns.

Basic concept discussion: To share or not to share (a Task_engine)?

The choice of own_task_engine flag, as of this writing, does not actually affect much of *this behavior. If true, then you're saying this is the only thread to run tasks on the Task_engine (call it E). If false, it may be shared with other threads. In practice, though, as of this writing, this only controls whether stop() will perform E->stop() (which causes all E->run()s to return and hence threads to soon exit) for you (true), or you must do it for the shared E yourself (which has other objects like *this associated with it). Either way, E can be used before or after *this thread runs in whatever way one prefers, including: one can pre-queueing tasks (via post(*E, F) and such) for it to join in executing in new thread; one can inherit any not-yet-executed tasks after stop(), to execute them in some other thread/run().

That said, despite the small practical impact in this class, the decision of whether to assign one Task_qing_thread (and hence util::Thread and hence native thread) to a Task_engine in one-to-one fashion, versus sharing the latter with more Task_qing_threads, is a central one to your design. It is central to specifying the pattern of how post()ed Tasks are spread across actual threads in a pool. In particular, if it's true (not shared), then one must select a specific Task_qing_thread (and, therefore, its corresponding worker thread) before actually post()ing; otherwise it will be selected intelligently by boost.asio. On the other hand, if it's false (shared), then to guarantee two tasks FG will not execute concurrently (<= desirable if they're assigned to one async::Op) one must probably use a util::Strand. Meanwhile if it IS true (not shared), then one can simply guarantee it by posting onto the same Task_qing_thread (i.e., worker thread)... which is straightforward but theoretically worse at using available time slices across threads. It's worse that way, but on the other hand thread-to-core-pinning is arguably more predictable in terms of ultimate effect on performance when Strands aren't used. Plus it might cause thread-caching perf increases.

Very informally, and perhaps arguably, the true (do-not-share-engine) mode is the legacy way and is how certain entrenched legacy daemons do it; the false (share-engine-among-threads) is the common-sense boost.asio-leveraging way which might be the default for new applications; but it depends also on perf analysis of thread caching benefits of the true way as well.

Rationale/history

Also informally: The hairiness of forcing the user to have to make this decision, and then write potentially if-laden code that subsequently actually posts tasks, is a chief motivation for abstracting such details behind the interfaces Concurrent_task_loop and async::Op. Then the user gets to just post Tasks, optionally tagged with Ops to prevent unwanted concurrency, while the aforementioned interfaces will deal with the different ways of using Task_qing_thread. Therefore, Task_qing_thread is a detail/ class not to be used by or exposed to the user.

Historically, a certain proof of concept (PoC) started out by having "user" code deal with Task_engines directly, quickly morphing to wrap them with Task_qing_thread for ease of use. Then once this PoC desired to have knobs controlling how tasks are scheduled across threads, without having the "user" code worry about it after initial thread-pool setup, Task_qing_thread was moved from the public area into detail/, and Concurrent_task_loop was born (along with with helper type async::Op).

Logging subtlety

For convenience, as promised by at least Cross_thread_task_loop::start() doc header: If user has specified a Config::this_thread_verbosity_override() setting (to reduce or increase log volume temporarily), then we take it upon ourselves to apply this setting to the spawned thread during exactly the following times:

  • Any startup logging in the spawned thread, by *this.
  • Any shutdown logging in the spawned thread, by *this, after stop() triggers thread exit.
Parameters
logger_ptrLogger to use for subsequently logging.
nicknameBrief, human-readable nickname of the new thread pool, as of this writing for logging only.
task_engineThe util::Task_engine E such that the body of the new thread will be essentially E->run().
own_task_engineEssentially, true if you do not wish to share *task_engine with other Task_qing_threads; false if you do wish to share it with other such threads. See more detailed notes above. Also see stop().
init_func_or_emptyIf not .empty(), init_func_or_empty() shall execute first-thing in the new thread, before internal code begins the thread's participation in the *task_engine event loop (i.e., before task_engine->run()).
done_promise_else_blockIf null, ctor will block until the thread has started and is ready to participate in task_engine()->post()ing (etc.; see above text). If not null, then it will kick things off asynchronously and satisfy the promise *done_promise_else_block once the thread has started and is ready to p... you get the idea.

Definition at line 36 of file task_qing_thread.cpp.

References flow::log::beautify_chrono_logger_this_thread(), FLOW_LOG_INFO, FLOW_LOG_TRACE, flow::log::Log_context::get_logger(), m_own_task_engine, m_task_engine, m_worker_thread, and flow::async::S_ASYNC.

Here is the call graph for this function:

◆ ~Task_qing_thread()

flow::async::Task_qing_thread::~Task_qing_thread ( )

stop(), followed by forgetting the Task_engine returned by task_engine(); the latter action may destroy that Task_engine synchronously.

In particular task_engine() shall be destroyed by this destructor, unless you've saved a copy of that shared_ptr elsewhere (particularly in own_task_engine == false mode in ctor, it is likely to be saved in another Task_qing_thread).

Since stop() has the post-condition that the thread has been joined, the same post-condition holds for this destructor. It is, of course, safe to call this destructor after already having called stop().

See also
stop() which is useful when you want the thread to exit/be joined, but the underlying Task_engine must continue to exist for a bit; in particular post() on it would execute but do nothing. Then once you've ensured no more such post()s are forthcoming, and hence it's safe, "finish the job" by destroying *this.
task_engine() through which one can obtain a ref-counted util::Task_engine, for example with the idea to have another thread task_engine()->run(), thus inheriting any queued work/tasks and able to enqueue and execute future ones.

Definition at line 246 of file task_qing_thread.cpp.

References FLOW_LOG_INFO, FLOW_LOG_TRACE, and stop().

Here is the call graph for this function:

Member Function Documentation

◆ raw_worker_thread()

util::Thread * flow::async::Task_qing_thread::raw_worker_thread ( )

Returns the util::Thread – a thin wrapper around the native OS thread handle – corresponding to the worker thread started in constructor.

The intended use of this is to set thread attributes (such as processor-core affinity) in a way that won't affect/disturb the concurrently executing thread's ability to execute tasks; meaning one might grab its native ID and then set some affinity attribute, but it wouldn't (say) suspend the thread or join it. Slightly informally, then: any such steps ("such" being the informal part) lead to undefined behavior.

Returns
Pointer to Thread, not null. Guaranteed valid until destructor is invoked; guaranteed to be not-a-thread after stop() and not not-a-thread before it.

Definition at line 307 of file task_qing_thread.cpp.

References m_worker_thread.

◆ stop()

void flow::async::Task_qing_thread::stop ( )

Blocks the calling thread until the constructor-started thread has finished; if the underlying Task_engine is not shared then first signals it to stop executing any further Tasks, thus causing the constructor-started thread to in fact finish soon and hence this method to return soon.

After stop() has returned once already, stop() will immediately return. Concurrently executing stop() from 2+ different threads leads to undefined behavior.

In effect: If we own the Task_engine (own_task_engine == true in constructor), this method causes the Task_engine to stop executing tasks ASAP and then waits as long as necessary for the thread to exit; then returns. This will be fast if Tasks are well behaved (do not block).

In effect: If we share an external Task_engine (own_task_engine == false in constructor), this method simply waits for the thread to exit as long as needed. Hence the caller must trigger the shared Task_engine to exit this thread's Task_engine::run(). (In particular, Task_engine::stop() will do this.) Otherwise this method will block until then.

The key fact is that, after this returns, the Task_engine returned by task_engine() shall not have been destroyed by this method. In particular, post() on that Task_engine object will still work without undefined behavior/crashing. The post()ed function just won't actually run. (It may run on another thread but not this one, since by definition this thread has been joined.)

Definition at line 257 of file task_qing_thread.cpp.

References FLOW_LOG_INFO, FLOW_LOG_TRACE, m_own_task_engine, m_task_engine, and m_worker_thread.

Referenced by ~Task_qing_thread().

Here is the caller graph for this function:

◆ task_engine()

Task_engine_ptr flow::async::Task_qing_thread::task_engine ( )

Returns pointer to util::Task_engine such that post()ing to it will cause the subsequent asynchronous execution of that task in a way explained in the Task_qing_thread() constructor doc header.

This is the same object passed to ctor.

Do note that the user's saving a copy of this pointer can extend the life of the returned Task_engine (which is NOT at all the same as extending the life of raw_worker_thread(); also NOT at all the same as making it possible to actually execute work which requires threads).

Could technically be const, but const usage is OK to be conservative. In spirit, at least, it's not const.

Returns
See above.

Definition at line 302 of file task_qing_thread.cpp.

References m_task_engine.

Member Data Documentation

◆ m_own_task_engine

bool flow::async::Task_qing_thread::m_own_task_engine
private

See constructor.

Definition at line 268 of file task_qing_thread.hpp.

Referenced by stop(), and Task_qing_thread().

◆ m_task_engine

Task_engine_ptr flow::async::Task_qing_thread::m_task_engine
private

See task_engine().

Definition at line 265 of file task_qing_thread.hpp.

Referenced by stop(), task_engine(), and Task_qing_thread().

◆ m_worker_thread

boost::movelib::unique_ptr<util::Thread> flow::async::Task_qing_thread::m_worker_thread
private

Thread created in constructor. Not-a-thread after stop(); not not-a-thread before stop().

Definition at line 271 of file task_qing_thread.hpp.

Referenced by raw_worker_thread(), stop(), and Task_qing_thread().

◆ S_BAD_EXIT

const int flow::async::Task_qing_thread::S_BAD_EXIT = 1
static

exit() code returned to OS in the event Task_qing_thread chooses to exit() the entire program (as of this writing, this occurs on uncaught exception by a user task).

Definition at line 70 of file task_qing_thread.hpp.


The documentation for this class was generated from the following files: