15class work_stealing_queue {
19 work_stealing_queue(int64_t capacity = 1024) {
20 assert((
mod_2n(capacity, 2) == 0) &&
"Capacity must be a power of two");
21 _top.store(0, std::memory_order_relaxed);
22 _bottom.store(0, std::memory_order_relaxed);
23 _array.store(
new array{capacity}, std::memory_order_relaxed);
27 work_stealing_queue(
const work_stealing_queue& rhs) =
delete;
28 work_stealing_queue& operator=(
const work_stealing_queue& rhs) =
delete;
32 work_stealing_queue(work_stealing_queue&& rhs) =
delete;
33 work_stealing_queue& operator=(work_stealing_queue&& rhs) =
delete;
36 ~work_stealing_queue() {
37 for(
auto a : _garbage) {
45 [[nodiscard]]
bool empty() const noexcept {
46 auto b = _bottom.load(std::memory_order_relaxed);
47 auto t = _top.load(std::memory_order_relaxed);
53 [[nodiscard]]
size_t size() const noexcept {
54 auto b = _bottom.load(std::memory_order_relaxed);
55 auto t = _top.load(std::memory_order_relaxed);
56 return static_cast<size_t>(b >= t ? b - t : 0);
62 auto bottom = _bottom.load(std::memory_order_relaxed);
63 auto top = _top.load(std::memory_order_acquire);
64 auto* array = _array.load(std::memory_order::relaxed);
66 if (array->capacity() - 1 < (bottom - top)) {
67 auto* tmp = array->grow(bottom, top);
68 _garbage.push_back(array);
69 std::swap(array, tmp);
70 _array.store(array, std::memory_order_relaxed);
73 array->push(bottom, std::forward<
decltype(o)>(o));
74 std::atomic_thread_fence(std::memory_order_release);
75 _bottom.store(bottom + 1, std::memory_order_relaxed);
80 std::optional<T> steal() {
81 auto top = _top.load(std::memory_order_acquire);
82 std::atomic_thread_fence(std::memory_order_seq_cst);
83 auto bottom = _bottom.load(std::memory_order_acquire);
85 std::optional<T> item;
88 auto* array = _array.load(std::memory_order_consume);
89 item = array->pop(top);
90 if(!_top.compare_exchange_strong(top, top + 1, std::memory_order_seq_cst, std::memory_order_relaxed)) {
100 std::optional<T> pop() {
101 auto bottom = _bottom.load(std::memory_order_relaxed) - 1;
102 auto* array = _array.load(std::memory_order_relaxed);
103 _bottom.store(bottom, std::memory_order_relaxed);
104 std::atomic_thread_fence(std::memory_order_seq_cst);
105 auto top = _top.load(std::memory_order_relaxed);
107 std::optional<T> item;
110 item = array->pop(bottom);
114 if(!_top.compare_exchange_strong(top, top + 1, std::memory_order_seq_cst, std::memory_order_relaxed)) {
118 _bottom.store(bottom + 1, std::memory_order_relaxed);
121 _bottom.store(bottom + 1, std::memory_order_relaxed);
129 [[nodiscard]] int64_t capacity() const noexcept {
130 return _array.load(std::memory_order::relaxed)->capacity();
137 std::atomic<T>* _data;
139 explicit array(int64_t c) :
142 _data{new
std::atomic<T>[static_cast<size_t>(_capacity)]} {
150 int64_t capacity() const noexcept {
154 void push(int64_t i,
auto&& o)
noexcept {
155 _data[i & _mask].store(std::forward<
decltype(o)>(o), std::memory_order::relaxed);
158 T pop(int64_t i)
noexcept {
159 return _data[i & _mask].load(std::memory_order::relaxed);
162 array* grow(int64_t bottom, int64_t top) {
163 array* ptr =
new array {2 * _capacity};
165 for(int64_t i = top; i != bottom; ++i) {
166 ptr->push(i, pop(i));
173 std::atomic<int64_t> _top;
174 std::atomic<int64_t> _bottom;
175 std::atomic<array*> _array;
176 std::vector<array*> _garbage;
Definition component.hpp:17
constexpr auto mod_2n(auto value, auto divisor) noexcept -> decltype(auto)
Calculate the value % b=2^n.
Definition bits.hpp:10