24#ifdef CO_ECS_WORKER_STATS
27 std::atomic<uint64_t> task_count;
28 std::atomic<uint64_t> steal_count;
29 std::atomic<uint64_t> idle_count;
32 task_count.fetch_add(1, std::memory_order::relaxed);
36 steal_count.fetch_add(1, std::memory_order::relaxed);
40 idle_count.fetch_add(1, std::memory_order::relaxed);
53 [[nodiscard]] std::size_t
id() const noexcept {
60 return *current_worker;
75 get_queue().push(task);
83 auto* next_task = get_task();
87#ifdef CO_ECS_WORKER_STATS
95#ifdef CO_ECS_WORKER_STATS
98 const worker_stats& stats() const noexcept {
106 static inline thread_local worker* current_worker;
109 current_worker =
this;
131 _thread =
thread_t([
this]() { run(); });
135 _active.store(
false, std::memory_order::relaxed);
139 if (_thread.joinable()) {
144 auto is_active() const noexcept ->
bool {
145 return _active.load(std::memory_order::relaxed);
148 [[nodiscard]] task_t* get_task() {
150 if (
auto maybe_task = get_queue().pop()) {
155 if (
worker* main_worker = &_pool.main_worker(); main_worker !=
this) {
156 if (
auto maybe_task = steal(*main_worker)) {
163 if (
worker* random_worker = _pool.random_worker(); random_worker && random_worker !=
this) {
164 if (
auto maybe_task = steal(*random_worker)) {
174 auto maybe_task =
worker.get_queue().steal();
175#ifdef CO_ECS_WORKER_STATS
183 void execute(task_t* task) {
185#ifdef CO_ECS_WORKER_STATS
193#ifdef CO_ECS_WORKER_STATS
199 detail::work_stealing_queue<task_t*>& get_queue() noexcept {
204 detail::work_stealing_queue<task_t*> _queue;
207 std::atomic<bool> _active{
true };
210#ifdef CO_ECS_WORKER_STATS
218 assert(
num_workers > 0 &&
"Number of workers should be > 0");
222 throw std::logic_error(
"Thread pool already created");
227 _workers.emplace_back(std::make_unique<worker>(*
this, 0));
228 worker::current_worker = _workers[0].get();
232 _workers.emplace_back(std::make_unique<worker>(*
this, i));
237 _workers[i]->start();
244 for (
auto i = 1; i < _workers.size(); i++) {
249 for (
auto i = 1; i < _workers.size(); i++) {
290 return *_workers.at(
id);
297 return _workers.size();
307 worker& main_worker() noexcept {
311 worker* random_worker() noexcept {
317 std::uniform_int_distribution<std::size_t> dist{ 1,
num_workers() - 1 };
318 std::default_random_engine random_engine{ std::random_device()() };
320 auto random_index = dist(random_engine);
322 return _workers[random_index].get();
326 _worker_wait_semaphore.release();
330 constexpr auto wait_time = std::chrono::milliseconds(5);
331 _worker_wait_semaphore.try_acquire_for(wait_time);
337 std::vector<std::unique_ptr<worker>> _workers;
338 std::counting_semaphore<> _worker_wait_semaphore{ 0 };
static task_t * allocate(auto &&func, task_t *parent=nullptr)
Allocates a task with the specified function and parent, placing it in a circular buffer.
Definition task.hpp:68
Represents a task that can be executed, monitored for completion, and linked to a parent task.
Definition task.hpp:10
bool is_completed() const noexcept
Checks if the task has been completed.
Definition task.hpp:33
Thread pool worker.
Definition thread_pool.hpp:22
void submit(task_t *task)
Submit a task into local workers queue.
Definition thread_pool.hpp:74
void wait(task_t *task)
Wait for task completion.
Definition thread_pool.hpp:81
std::size_t id() const noexcept
Return ID of the worker.
Definition thread_pool.hpp:53
friend class thread_pool
Definition thread_pool.hpp:104
static worker & current() noexcept
Get current thread worker.
Definition thread_pool.hpp:59
worker(thread_pool &pool, uint16_t id)
Create a thread pool worker.
Definition thread_pool.hpp:48
task_t * submit(auto &&func, task_t *parent=nullptr)
Submit a task into local workers queue.
Definition thread_pool.hpp:66
Generic thread pool implementation.
Definition thread_pool.hpp:17
static worker & current_worker() noexcept
Get current worker.
Definition thread_pool.hpp:302
thread_pool & operator=(const thread_pool &)=delete
thread_pool & operator=(thread_pool &&)=delete
std::thread thread_t
Definition thread_pool.hpp:19
task_t * submit(auto &&func, task_t *parent=nullptr)
Submit a task to a thread pool.
Definition thread_pool.hpp:276
std::size_t num_workers() const noexcept
Return the number of workers.
Definition thread_pool.hpp:296
worker & get_worker_by_id(std::size_t id) noexcept
Get worker by ID.
Definition thread_pool.hpp:289
~thread_pool()
Destroy thread pool, worker threads are notified to exit and joined.
Definition thread_pool.hpp:242
thread_pool(const thread_pool &)=delete
thread_pool(thread_pool &&)=delete
static thread_pool & get()
Get thread pool instance.
Definition thread_pool.hpp:264
void wait(task_t *task)
Wait a task to complete.
Definition thread_pool.hpp:282
thread_pool(std::size_t num_workers=std::thread::hardware_concurrency())
Construct thread pool with num_workers workers.
Definition thread_pool.hpp:217
Definition archetype.hpp:11