co_ecs 0.9.0
Cobalt ECS
Loading...
Searching...
No Matches
work_stealing_queue.hpp
Go to the documentation of this file.
1#pragma once
2
3#include <atomic>
4#include <optional>
5#include <vector>
6#include <cassert>
7
9
10namespace co_ecs::detail {
11
14template<typename T>
15class work_stealing_queue {
16public:
19 work_stealing_queue(int64_t capacity = 1024) {
20 assert((mod_2n(capacity, 2) == 0) && "Capacity must be a power of two");
21 _top.store(0, std::memory_order_relaxed);
22 _bottom.store(0, std::memory_order_relaxed);
23 _array.store(new array{capacity}, std::memory_order_relaxed);
24 _garbage.reserve(32);
25 }
26
27 work_stealing_queue(const work_stealing_queue& rhs) = delete;
28 work_stealing_queue& operator=(const work_stealing_queue& rhs) = delete;
29
30 // No move semantic, idk if needed, explicitelly disabled for now
31
32 work_stealing_queue(work_stealing_queue&& rhs) = delete;
33 work_stealing_queue& operator=(work_stealing_queue&& rhs) = delete;
34
36 ~work_stealing_queue() {
37 for(auto a : _garbage) {
38 delete a;
39 }
40 delete _array.load();
41 }
42
45 [[nodiscard]] bool empty() const noexcept {
46 auto b = _bottom.load(std::memory_order_relaxed);
47 auto t = _top.load(std::memory_order_relaxed);
48 return b <= t;
49 }
50
53 [[nodiscard]] size_t size() const noexcept {
54 auto b = _bottom.load(std::memory_order_relaxed);
55 auto t = _top.load(std::memory_order_relaxed);
56 return static_cast<size_t>(b >= t ? b - t : 0);
57 }
58
61 void push(auto&& o) {
62 auto bottom = _bottom.load(std::memory_order_relaxed);
63 auto top = _top.load(std::memory_order_acquire);
64 auto* array = _array.load(std::memory_order::relaxed);
65
66 if (array->capacity() - 1 < (bottom - top)) {
67 auto* tmp = array->grow(bottom, top);
68 _garbage.push_back(array);
69 std::swap(array, tmp);
70 _array.store(array, std::memory_order_relaxed);
71 }
72
73 array->push(bottom, std::forward<decltype(o)>(o));
74 std::atomic_thread_fence(std::memory_order_release);
75 _bottom.store(bottom + 1, std::memory_order_relaxed);
76 }
77
80 std::optional<T> steal() {
81 auto top = _top.load(std::memory_order_acquire);
82 std::atomic_thread_fence(std::memory_order_seq_cst);
83 auto bottom = _bottom.load(std::memory_order_acquire);
84
85 std::optional<T> item;
86
87 if (top < bottom) {
88 auto* array = _array.load(std::memory_order_consume);
89 item = array->pop(top);
90 if(!_top.compare_exchange_strong(top, top + 1, std::memory_order_seq_cst, std::memory_order_relaxed)) {
91 return std::nullopt;
92 }
93 }
94
95 return item;
96 }
97
100 std::optional<T> pop() {
101 auto bottom = _bottom.load(std::memory_order_relaxed) - 1;
102 auto* array = _array.load(std::memory_order_relaxed);
103 _bottom.store(bottom, std::memory_order_relaxed);
104 std::atomic_thread_fence(std::memory_order_seq_cst);
105 auto top = _top.load(std::memory_order_relaxed);
106
107 std::optional<T> item;
108
109 if (top <= bottom) {
110 item = array->pop(bottom);
111
112 if(top == bottom) {
113 // the last item just got stolen
114 if(!_top.compare_exchange_strong(top, top + 1, std::memory_order_seq_cst, std::memory_order_relaxed)) {
115 // Failed race against steal operation
116 item = std::nullopt;
117 }
118 _bottom.store(bottom + 1, std::memory_order_relaxed);
119 }
120 } else {
121 _bottom.store(bottom + 1, std::memory_order_relaxed);
122 }
123
124 return item;
125 }
126
129 [[nodiscard]] int64_t capacity() const noexcept {
130 return _array.load(std::memory_order::relaxed)->capacity();
131 }
132
133private:
134 struct array {
135 int64_t _capacity;
136 int64_t _mask;
137 std::atomic<T>* _data;
138
139 explicit array(int64_t c) :
140 _capacity(c),
141 _mask(c - 1),
142 _data{new std::atomic<T>[static_cast<size_t>(_capacity)]} {
143 }
144
145 ~array() {
146 delete [] _data;
147 }
148
149 [[nodiscard]]
150 int64_t capacity() const noexcept {
151 return _capacity;
152 }
153
154 void push(int64_t i, auto&& o) noexcept {
155 _data[i & _mask].store(std::forward<decltype(o)>(o), std::memory_order::relaxed);
156 }
157
158 T pop(int64_t i) noexcept {
159 return _data[i & _mask].load(std::memory_order::relaxed);
160 }
161
162 array* grow(int64_t bottom, int64_t top) {
163 array* ptr = new array {2 * _capacity};
164
165 for(int64_t i = top; i != bottom; ++i) {
166 ptr->push(i, pop(i));
167 }
168
169 return ptr;
170 }
171 };
172
173 std::atomic<int64_t> _top;
174 std::atomic<int64_t> _bottom;
175 std::atomic<array*> _array;
176 std::vector<array*> _garbage;
177};
178
179} // namespace co_ecs::detail
Definition component.hpp:17
constexpr auto mod_2n(auto value, auto divisor) noexcept -> decltype(auto)
Calculate the value % b=2^n.
Definition bits.hpp:10
STL namespace.