SeqAn3  3.2.0
The Modern C++ library for sequence analysis.
buffer_queue.hpp
Go to the documentation of this file.
1 // -----------------------------------------------------------------------------------------------------
2 // Copyright (c) 2006-2022, Knut Reinert & Freie Universität Berlin
3 // Copyright (c) 2016-2022, Knut Reinert & MPI für molekulare Genetik
4 // This file may be used, modified and/or redistributed under the terms of the 3-clause BSD-License
5 // shipped with this file and also available at: https://github.com/seqan/seqan3/blob/master/LICENSE.md
6 // -----------------------------------------------------------------------------------------------------
7 
13 #pragma once
14 
15 #include <algorithm>
16 #include <atomic>
17 #include <bit>
18 #include <cassert>
19 #include <cmath>
20 #include <concepts>
21 #include <mutex>
22 #include <seqan3/std/new>
23 #include <ranges>
24 #include <shared_mutex>
25 #include <span>
26 #include <type_traits>
27 #include <vector>
28 
32 
33 namespace seqan3::contrib
34 {
35 
37 enum class queue_op_status : uint8_t
38 {
39  success = 0,
40  empty,
41  full,
42  closed
43 };
44 
45 enum struct buffer_queue_policy : uint8_t
46 {
47  fixed,
48  dynamic
49 };
50 
51 // Ringbuffer implementation:
52 // The underlying buffer has size (number of actual elements + 1). This is a trick to easily check if the queue is empty
53 // or full. Furthermore, the ring buffer uses 4 pointers. The actual push_back and pop_front position as well as
54 // the pending push_back and pop_front position. The latter indicates the position that have been advanced concurrently
55 // by multiple threads from either end.
56 //
57 // head: position to read/extract from the queue (first inserted elment) => pop_front_position
58 // tail: position where to write to/push new elements to the queue => push_back_position
59 // head_read: The actual position after x threads concurrently popped from the queue. => pending_pop_front_position
60 // tail_write: The actual position after x threads concurrently pushed to the queue. => pending_push_back_position
61 // [ ? ] [ 4 ] [ 3 ] [ 8 ] [ 0 ] [ x ] [ ? ]
62 // | ^
63 // v |
64 // head headRead tail tailWrite
65 //
66 // valid buffer between [headRead, tail)
67 // currently filled [tail, tailWrite)
68 // currently removed [head, headRead)
69 //
70 // State: empty = (head == tail)
71 // [ ? ] [ ? ] [ ? ] [ ? ] [ ? ] [ ? ] [ ? ]
72 // tail
73 // head
74 // The head is on the same position as tail.
75 // This means that currently no element is in the buffer.
76 
77 // State: full = (tail + 1 == head)
78 // [ 2 ] [ 4 ] [ 3 ] [ ? ] [ 8 ] [ 0 ] [ 7 ]
79 // tail
80 // head
81 // The tail is one position before the head.
82 // This means that currently no element can be added to the buffer since it is full.
83 // Strategies are to either wait until some elements have been popped or to expand the capacity of the
84 // queue by one, inserting the element at the current tail position and moving all elements starting from head one
85 // position to the right.
86 
87 template <std::semiregular value_t,
89  buffer_queue_policy buffer_policy = buffer_queue_policy::dynamic>
90 class buffer_queue
91 {
92 public:
93 
94  using buffer_type = buffer_t;
95  using value_type = typename buffer_type::value_type;
96  using size_type = typename buffer_type::size_type;
97  using reference = void;
98  using const_reference = void;
99 
100  // Default constructor sets capacity to 1 (still empty)
101  buffer_queue() : buffer_queue{0u}
102  {}
103  buffer_queue(buffer_queue const &) = delete;
104  buffer_queue(buffer_queue &&) = delete;
105  buffer_queue & operator=(buffer_queue const &) = delete;
106  buffer_queue & operator=(buffer_queue &&) = delete;
107  ~buffer_queue() = default;
108 
109  // you can set the initial capacity here
110  explicit buffer_queue(size_type const init_capacity)
111  {
112  buffer.resize(init_capacity + 1);
113  ring_buffer_capacity = std::bit_ceil(buffer.size());
114  }
115 
116  template <std::ranges::input_range range_type>
117  requires std::convertible_to<std::ranges::range_value_t<range_type>, value_type>
118  buffer_queue(size_type const init_capacity, range_type && r) : buffer_queue{init_capacity}
119  {
120  std::ranges::copy(r, std::ranges::begin(buffer));
121  }
122 
126  template <typename value2_t>
127  requires std::convertible_to<value2_t, value_t>
128  void push(value2_t && value)
129  {
130  detail::spin_delay delay{};
131 
132  for (;;)
133  {
134  auto status = try_push(std::forward<value2_t>(value));
135  if (status == queue_op_status::closed)
136  throw queue_op_status::closed;
137  else if (status == queue_op_status::success)
138  return;
139 
140  assert(status != queue_op_status::empty);
141  assert(status == queue_op_status::full);
142  delay.wait(); // pause and then try again.
143  }
144  } // throws if closed
145 
146  template <typename value2_t>
147  requires std::convertible_to<value2_t, value_t>
148  queue_op_status wait_push(value2_t && value)
149  {
150  detail::spin_delay delay{};
151 
152  for (;;)
153  {
154  auto status = try_push(std::forward<value2_t>(value));
155  // wait until queue is not full anymore..
156  if (status != queue_op_status::full)
157  return status;
158 
159  assert(status != queue_op_status::empty);
160  assert(status == queue_op_status::full);
161  delay.wait(); // pause and then try again.
162  }
163  }
164 
165  value_type value_pop() // throws if closed
166  {
167  detail::spin_delay delay{};
168 
169  value_type value{};
170  for (;;)
171  {
172  if (!writer_waiting.load())
173  {
174  auto status = try_pop(value);
175 
176  if (status == queue_op_status::closed)
177  throw queue_op_status::closed;
178  else if (status == queue_op_status::success)
179  return value;
180 
181  assert(status != queue_op_status::full);
182  assert(status == queue_op_status::empty);
183  }
184  delay.wait(); // pause and then try again.
185  }
186  }
187 
188  queue_op_status wait_pop(value_type & value)
189  {
190  detail::spin_delay delay{};
191 
192  queue_op_status status;
193  for (;;)
194  {
195  if (!writer_waiting.load())
196  {
197  status = try_pop(value);
198 
199  if (status == queue_op_status::closed || status == queue_op_status::success)
200  break;
201 
202  assert(status != queue_op_status::full);
203  assert(status == queue_op_status::empty);
204  }
205  delay.wait(); // pause and then try again.
206  }
207  return status;
208  }
210 
214  template <typename value2_t>
215  requires std::convertible_to<value2_t, value_t>
216  queue_op_status try_push(value2_t &&);
217 
218  queue_op_status try_pop(value_t &);
220 
224  void close()
225  {
226  if (writer_waiting.exchange(true)) // First writer that closes the queue will continue, the rest returns.
227  return;
228 
229  try
230  {
231  std::unique_lock write_lock{mutex};
232  closed_flag = true;
233  writer_waiting.store(false); // reset the lock.
234  }
235  catch (...)
236  {
237  writer_waiting.store(false); // reset the lock.
239  }
240  }
241 
242  bool is_closed() const noexcept
243  {
244  return closed_flag;
245  }
246 
247  bool is_empty() const noexcept
248  {
249  std::unique_lock write_lock(mutex);
250  return pop_front_position == push_back_position;
251  }
252 
253  bool is_full() const noexcept
254  {
255  std::unique_lock write_lock(mutex);
256  return is_ring_buffer_exhausted(pop_front_position, push_back_position);
257  }
258 
259  size_type size() const noexcept
260  {
261  std::unique_lock write_lock(mutex);
262  if (to_buffer_position(pop_front_position) <= to_buffer_position(push_back_position))
263  {
264  return to_buffer_position(push_back_position) - to_buffer_position(pop_front_position);
265  }
266  else
267  {
268  assert(buffer.size() > (to_buffer_position(pop_front_position) - to_buffer_position(push_back_position)));
269  return buffer.size() - (to_buffer_position(pop_front_position) - to_buffer_position(push_back_position));
270  }
271  }
273 private:
274 
284  constexpr bool is_ring_buffer_exhausted(size_type const from, size_type const to) const
285  {
286  assert(to <= (from + ring_buffer_capacity + 1)); // The tail cannot overwrite the head.
287 
288  return to >= from + ring_buffer_capacity;
289  }
290 
304  constexpr size_type to_buffer_position(size_type const position) const
305  {
306  return position & (ring_buffer_capacity - 1);
307  }
308 
328  size_type cyclic_increment(size_type position)
329  {
330  // invariants:
331  // - ring_buffer_capacity is a power of 2
332  // - (position % ring_buffer_capacity) is in [0, buffer.size())
333  //
334  // return the next greater position that fulfils the invariants
335  if (to_buffer_position(++position) >= buffer.size())
336  position += ring_buffer_capacity - buffer.size(); // If the position reached
337  return position;
338  }
339 
340  template <typename value2_t>
341  requires (std::convertible_to<value2_t, value_t>) &&
342  (buffer_policy == buffer_queue_policy::fixed)
343  bool overflow(value2_t &&)
344  {
345  return false;
346  }
347 
348  template <typename value2_t>
349  requires (std::convertible_to<value2_t, value_t>) &&
350  (buffer_policy == buffer_queue_policy::dynamic)
351  bool overflow(value2_t && value);
352 
354  buffer_t buffer;
357  alignas(std::hardware_destructive_interference_size) std::atomic<size_type> pending_pop_front_position{0};
359  alignas(std::hardware_destructive_interference_size) std::atomic<size_type> pending_push_back_position{0};
361  alignas(std::hardware_destructive_interference_size) std::atomic_bool writer_waiting{false};
362  alignas(std::hardware_destructive_interference_size) bool closed_flag{false};
363 };
364 
365 // Specifies a fixed size buffer queue.
366 template <std::semiregular value_t, sequence_container buffer_t = std::vector<value_t>>
367 using fixed_buffer_queue = buffer_queue<value_t, buffer_t, buffer_queue_policy::fixed>;
368 
369 // Specifies a dynamic size buffer queue (growable).
370 template <std::semiregular value_t, sequence_container buffer_t = std::vector<value_t>>
371 using dynamic_buffer_queue = buffer_queue<value_t, buffer_t, buffer_queue_policy::dynamic>;
372 
373 // ============================================================================
374 // Metafunctions
375 // ============================================================================
376 
377 // ============================================================================
378 // Functions
379 // ============================================================================
380 
381 template <std::semiregular value_t, sequence_container buffer_t, buffer_queue_policy buffer_policy>
382 template <typename value2_t>
383  requires (std::convertible_to<value2_t, value_t>) &&
384  (buffer_policy == buffer_queue_policy::dynamic)
385 inline bool buffer_queue<value_t, buffer_t, buffer_policy>::overflow(value2_t && value)
386 {
387  // try to extend capacity
388  std::unique_lock write_lock{mutex};
389 
390  size_type old_size = buffer.size();
391  size_type ring_buffer_capacity = this->ring_buffer_capacity;
392  size_type local_front = this->pop_front_position;
393  size_type local_back = this->push_back_position;
394 
395  // Expects no pending pushes or pops in unique lock.
396  assert(local_back == this->pending_push_back_position);
397  assert(local_front == this->pending_pop_front_position);
398 
399  bool valueWasAppended = false;
400 
401  // did we reach the capacity limit (another thread could have done the upgrade already)?
402  // buffer is full if tail_pos + 1 == head_pos
403  if (is_ring_buffer_exhausted(local_front, cyclic_increment(local_back)))
404  {
405  // In case of a full queue write the value into the additional slot.
406  // Note, that the ring-buffer implementation uses one additional field which is not used except
407  // when overflow happens. This invariant is used, to simply check for the full/empty state of the queue.
408  if (old_size != 0)
409  {
410  auto it = std::ranges::begin(buffer) + to_buffer_position(local_back);
411  *it = std::forward<value2_t>(value);
412  local_back = local_front + ring_buffer_capacity;
413  valueWasAppended = true;
414  }
415 
416  assert(is_ring_buffer_exhausted(local_front, local_back));
417 
418  // get positions of head/tail in current buffer sequence
419  size_type front_buffer_position = to_buffer_position(local_front);
420  size_type back_buffer_position = to_buffer_position(local_back);
421 
422  // increase capacity by one and move all elements from current pop_front_position one to the right.
423  buffer.resize(old_size + 1);
424  ring_buffer_capacity = std::bit_ceil(buffer.size());
425  std::ranges::move_backward(std::span{buffer.data() + front_buffer_position, buffer.data() + old_size},
426  buffer.data() + buffer.size());
427 
428  // Update the pop_front and push_back positions.
429  if (old_size != 0)
430  {
431  this->pending_pop_front_position = this->pop_front_position = front_buffer_position + 1;
432  this->pending_push_back_position = this->push_back_position = back_buffer_position + ring_buffer_capacity;
433  }
434  this->ring_buffer_capacity = ring_buffer_capacity;
435  }
436  return valueWasAppended;
437 }
438 
439 // ----------------------------------------------------------------------------
440 // Function try_pop()
441 // ----------------------------------------------------------------------------
442 
443 /*
444  * @fn ConcurrentQueue#tryPopFront
445  * @headerfile <seqan/parallel.h>
446  * @brief Try to dequeue a value from a queue.
447  *
448  * @signature bool tryPopFront(result, queue[, parallelTag]);
449  *
450  *
451  * @param[in,out] queue A queue.
452  * @param[out] result The dequeued value (if available).
453  * @param[in] parallelTag The concurrency scheme. If multiple threads dequeue values concurrently this tag must be
454  * @link ParallelismTags#Parallel @endlink. The more efficient @link ParallelismTags#Serial
455  * @endlink tag can only be used if one thread calls <tt>popFront</tt> at a time.
456  * Default is @link ParallelismTags#Parallel @endlink.
457  * @return bool Returns <tt>true</tt> if a value could be dequeued and <tt>false</tt> otherwise.
458  */
459 template <std::semiregular value_t, sequence_container buffer_t, buffer_queue_policy buffer_policy>
460 inline queue_op_status buffer_queue<value_t, buffer_t, buffer_policy>::try_pop(value_t & result)
461 {
462  // try to extract a value
463  std::shared_lock read_lock{mutex};
464 
465  size_type local_pending_pop_front_position{};
466  size_type next_local_pop_front_position{};
467  detail::spin_delay spinDelay{};
468 
469  local_pending_pop_front_position = this->pending_pop_front_position;
470  // wait for queue to become filled
471  while (true)
472  {
473  size_type local_push_back_position = this->push_back_position;
474 
475  assert(local_pending_pop_front_position <= local_push_back_position);
476 
477  // Check if queue is empty
478  if (local_pending_pop_front_position == local_push_back_position)
479  {
480  return is_closed() ? queue_op_status::closed : queue_op_status::empty;
481  }
482 
483  // Get the next ring-buffer position to read from.
484  next_local_pop_front_position = cyclic_increment(local_pending_pop_front_position);
485  // Did another/other thread(s) already acquired this slot?
486  // If yes, try with next position. If not, break and read from aquired position.
487  if (this->pending_pop_front_position.compare_exchange_weak(local_pending_pop_front_position,
488  next_local_pop_front_position))
489  break;
490 
491  spinDelay.wait();
492  }
493 
494  // Store the value from the aquired read position.
495  result = std::ranges::iter_move(buffer.begin() + to_buffer_position(local_pending_pop_front_position));
496 
497  // wait for pending previous reads and synchronize pop_front_position to local_pending_pop_front_position
498  {
499  detail::spin_delay delay{};
500  size_type acquired_slot = local_pending_pop_front_position;
501  while (!this->pop_front_position.compare_exchange_weak(acquired_slot, next_local_pop_front_position))
502  {
503  acquired_slot = local_pending_pop_front_position;
504  delay.wait(); // add adapting delay in case of high contention.
505  }
506  }
507 
508  return queue_op_status::success;
509 }
510 
511 // ----------------------------------------------------------------------------
512 // Function try_push()
513 // ----------------------------------------------------------------------------
514 
515 /*
516  * @fn ConcurrentQueue#appendValue
517  * @headerfile <seqan/parallel.h>
518  * @brief Enqueue a value to a queue.
519  *
520  * @signature void appendValue(queue, val[, expandTag[, parallelTag]);
521  *
522  *
523  * @param[in,out] queue A queue.
524  * @param[in] val The value to enqueue.
525  * @param[in] expandTag The overflow strategy. If @link OverflowStrategyTags#Generous @endlink the queue will be
526  * automatically resized if the capacity is exceeded, otherwise the thread spinlocks until
527  * the element can be enqueued.
528  * Default is the @link DefaultOverflowImplicit @endlink result for the <tt>queue</tt> type.
529  * @param[in] parallelTag The concurrency scheme. If multiple threads enqueue values concurrently this tag must be
530  * @link ParallelismTags#Parallel @endlink. The more efficient @link ParallelismTags#Serial
531  * @endlink tag can only be used if one thread calls <tt>appendValue</tt> at a time.
532  * Default is @link ParallelismTags#Parallel @endlink.
533  */
534 template <std::semiregular value_t, sequence_container buffer_t, buffer_queue_policy buffer_policy>
535 template <typename value2_t>
536  requires std::convertible_to<value2_t, value_t>
537 inline queue_op_status buffer_queue<value_t, buffer_t, buffer_policy>::try_push(value2_t && value)
538 {
539  // try to push the value
540  {
541  detail::spin_delay delay{};
542 
543  std::shared_lock read_lock(mutex);
544 
545  if (is_closed())
546  return queue_op_status::closed;
547 
548  // Current up to date position to push an element to
549  size_type local_pending_push_back_position = this->pending_push_back_position;
550 
551  while (true)
552  {
553  // Get the next potential position to write the value too.
554  size_type next_local_push_back_position = cyclic_increment(local_pending_push_back_position);
555  size_type local_pop_front_position = this->pop_front_position;
556 
557  // Check if there are enough slots to write to.
558  // If not either wait or try to overflow if it is a dynamic queue.
559  if (is_ring_buffer_exhausted(local_pop_front_position, next_local_push_back_position))
560  break;
561 
562  // Did another/other thread(s) acquired the current pending position before this thread
563  // If yes, try again if not, write into acquired slot.
564  if (this->pending_push_back_position.compare_exchange_weak(local_pending_push_back_position,
565  next_local_push_back_position))
566  {
567  // Current thread acquired the local_pending_push_back_position and can now write the value into the
568  // proper slot of the ring buffer.
569  auto it = std::ranges::begin(buffer) + to_buffer_position(local_pending_push_back_position);
570  *it = std::forward<value2_t>(value);
571 
572  // wait for pending previous writes and synchronise push_back_position to
573  // local_pending_push_back_position
574  {
575  detail::spin_delay delay{};
576  // the slot this thread acquired to write to
577  size_type acquired_slot = local_pending_push_back_position;
578  while (!this->push_back_position.compare_exchange_weak(acquired_slot,
579  next_local_push_back_position))
580  {
581  acquired_slot = local_pending_push_back_position;
582  delay.wait();
583  }
584  }
585  return queue_op_status::success;
586  }
587 
588  delay.wait();
589  }
590  }
591 
592  // if possible extend capacity and return.
593  if (overflow(std::forward<value2_t>(value)))
594  {
595  return queue_op_status::success; // always return success, since the queue resizes and cannot be full.
596  }
597 
598  // We could not extend the queue so it must be full.
599  return queue_op_status::full;
600 }
602 } // namespace seqan3::contrib
The <bit> header from C++20's standard library.
The <concepts> header from C++20's standard library.
Provides various transformation traits used by the range module.
T current_exception(T... args)
T empty(T... args)
T fixed(T... args)
requires requires
The rank_type of the semi-alphabet; defined as the return type of seqan3::to_rank....
Definition: alphabet/concept.hpp:164
constexpr std::size_t hardware_destructive_interference_size
Minimum offset between two objects to avoid false sharing.
Definition: new:34
constexpr auto to(args_t &&... args)
Converts a range to a container.
Definition: range/to.hpp:114
constexpr size_t size
The size of a type pack.
Definition: type_pack/traits.hpp:146
A more refined container concept than seqan3::container.
The <new> header from C++17's standard library.
The <ranges> header from C++20's standard library.
T rethrow_exception(T... args)
Provides std::span from the C++20 standard library.
Provides seqan3::detail::spin_delay.
Adaptations of concepts from the standard library.