26 #include <glog/logging.h> 71 template <
template <
typename>
class Atom>
78 auto state =
state_.load(std::memory_order_acquire);
88 Atom<uint32_t>& spinCutoff,
89 const bool updateSpinCutoff)
noexcept {
90 const auto ret =
tryWaitForTurn(turn, spinCutoff, updateSpinCutoff);
107 class Clock = std::chrono::steady_clock,
108 class Duration =
typename Clock::duration>
111 Atom<uint32_t>& spinCutoff,
112 const bool updateSpinCutoff,
113 const std::chrono::time_point<Clock, Duration>* absTime =
115 uint32_t prevThresh = spinCutoff.load(std::memory_order_relaxed);
116 const uint32_t effectiveSpinCutoff =
117 updateSpinCutoff || prevThresh == 0 ?
kMaxSpins : prevThresh;
121 for (tries = 0;; ++tries) {
124 if (current_sturn == sturn) {
136 if (tries < effectiveSpinCutoff) {
142 uint32_t our_waiter_delta = (sturn - current_sturn) >> kTurnShift;
144 if (our_waiter_delta <= current_max_waiter_delta) {
149 new_state =
encode(current_sturn, our_waiter_delta);
150 if (state != new_state &&
151 !
state_.compare_exchange_strong(state, new_state)) {
166 if (updateSpinCutoff || prevThresh == 0) {
175 target = std::min<uint32_t>(
179 if (prevThresh == 0) {
181 spinCutoff.store(target);
186 spinCutoff.compare_exchange_weak(
187 prevThresh, prevThresh +
int(target - prevThresh) / 8);
202 max_waiter_delta == 0 ? 0 : max_waiter_delta - 1);
203 if (
state_.compare_exchange_strong(state, new_state)) {
204 if (max_waiter_delta != 0) {
249 return 1u << (turn & 31);
void completeTurn(const uint32_t turn) noexcept
Unblocks a thread running waitForTurn(turn + 1)
TryWaitResult tryWaitForTurn(const uint32_t turn, Atom< uint32_t > &spinCutoff, const bool updateSpinCutoff, const std::chrono::time_point< Clock, Duration > *absTime=nullptr) noexcept
uint32_t decodeCurrentSturn(uint32_t state) const noexcept
uint32_t encode(uint32_t currentSturn, uint32_t maxWaiterD) const noexcept
Atom< std::uint32_t > Futex
TurnSequencer(const uint32_t firstTurn=0) noexcept
void waitForTurn(const uint32_t turn, Atom< uint32_t > &spinCutoff, const bool updateSpinCutoff) noexcept
—— Concurrent Priority Queue Implementation ——
The minimum spin count that we will adaptively select.
requires E e noexcept(noexcept(s.error(std::move(e))))
FutexResult futexWait(const Futex *futex, uint32_t expected, uint32_t waitMask)
std::chrono::milliseconds Duration
FutexResult futexWaitUntil(const Futex *futex, uint32_t expected, std::chrono::time_point< Clock, Duration > const &deadline, uint32_t waitMask)
uint32_t futexChannel(uint32_t turn) const noexcept
uint32_t decodeMaxWaitersDelta(uint32_t state) const noexcept
uint8_t uncompletedTurnLSB() const noexcept
bool isTurn(const uint32_t turn) const noexcept
Returns true iff a call to waitForTurn(turn, ...) won't block.
int futexWake(const Futex *futex, int count, uint32_t wakeMask)
void asm_volatile_pause()