proxygen
folly::detail::TurnSequencer< Atom > Struct Template Reference

#include <TurnSequencer.h>

Public Types

enum  TryWaitResult { TryWaitResult::SUCCESS, TryWaitResult::PAST, TryWaitResult::TIMEDOUT }
 

Public Member Functions

 TurnSequencer (const uint32_t firstTurn=0) noexcept
 
bool isTurn (const uint32_t turn) const noexcept
 Returns true iff a call to waitForTurn(turn, ...) won't block. More...
 
void waitForTurn (const uint32_t turn, Atom< uint32_t > &spinCutoff, const bool updateSpinCutoff) noexcept
 
template<class Clock = std::chrono::steady_clock, class Duration = typename Clock::duration>
TryWaitResult tryWaitForTurn (const uint32_t turn, Atom< uint32_t > &spinCutoff, const bool updateSpinCutoff, const std::chrono::time_point< Clock, Duration > *absTime=nullptr) noexcept
 
void completeTurn (const uint32_t turn) noexcept
 Unblocks a thread running waitForTurn(turn + 1) More...
 
uint8_t uncompletedTurnLSB () const noexcept
 

Private Types

enum  : uint32_t { kTurnShift = 6, kWaitersMask = (1 << kTurnShift) - 1, kMinSpins = 20, kMaxSpins = 2000 }
 

Private Member Functions

uint32_t futexChannel (uint32_t turn) const noexcept
 
uint32_t decodeCurrentSturn (uint32_t state) const noexcept
 
uint32_t decodeMaxWaitersDelta (uint32_t state) const noexcept
 
uint32_t encode (uint32_t currentSturn, uint32_t maxWaiterD) const noexcept
 

Private Attributes

Futex< Atomstate_
 

Detailed Description

template<template< typename > class Atom>
struct folly::detail::TurnSequencer< Atom >

A TurnSequencer allows threads to order their execution according to a monotonically increasing (with wraparound) "turn" value. The two operations provided are to wait for turn T, and to move to the next turn. Every thread that is waiting for T must have arrived before that turn is marked completed (for MPMCQueue only one thread waits for any particular turn, so this is trivially true).

TurnSequencer's state_ holds 26 bits of the current turn (shifted left by 6), along with a 6 bit saturating value that records the maximum waiter minus the current turn. Wraparound of the turn space is expected and handled. This allows us to atomically adjust the number of outstanding waiters when we perform a FUTEX_WAKE operation. Compare this strategy to sem_t's separate num_waiters field, which isn't decremented until after the waiting thread gets scheduled, during which time more enqueues might have occurred and made pointless FUTEX_WAKE calls.

TurnSequencer uses futex() directly. It is optimized for the case that the highest awaited turn is 32 or less higher than the current turn. We use the FUTEX_WAIT_BITSET variant, which lets us embed 32 separate wakeup channels in a single futex. See http://locklessinc.com/articles/futex_cheat_sheet for a description.

We only need to keep exact track of the delta between the current turn and the maximum waiter for the 32 turns that follow the current one, because waiters at turn t+32 will be awoken at turn t. At that point they can then adjust the delta using the higher base. Since we need to encode waiter deltas of 0 to 32 inclusive, we use 6 bits. We actually store waiter deltas up to 63, since that might reduce the number of CAS operations a tiny bit.

To avoid some futex() calls entirely, TurnSequencer uses an adaptive spin cutoff before waiting. The overheads (and convergence rate) of separately tracking the spin cutoff for each TurnSequencer would be prohibitive, so the actual storage is passed in as a parameter and updated atomically. This also lets the caller use different adaptive cutoffs for different operations (read versus write, for example). To avoid contention, the spin cutoff is only updated when requested by the caller.

Definition at line 72 of file TurnSequencer.h.

Member Enumeration Documentation

template<template< typename > class Atom>
anonymous enum : uint32_t
private
Enumerator
kTurnShift 

kTurnShift counts the bits that are stolen to record the delta between the current turn and the maximum waiter. It needs to be big enough to record wait deltas of 0 to 32 inclusive. Waiters more than 32 in the future will be woken up 32*n turns early (since their BITSET will hit) and will adjust the waiter count again. We go a bit beyond and let the waiter count go up to 63, which is free and might save us a few CAS

kWaitersMask 
kMinSpins 

The minimum spin count that we will adaptively select.

kMaxSpins 

The maximum spin count that we will adaptively select, and the spin count that will be used when probing to get a new data point for the adaptation

Definition at line 222 of file TurnSequencer.h.

222  : uint32_t {
230  kTurnShift = 6,
231  kWaitersMask = (1 << kTurnShift) - 1,
232 
234  kMinSpins = 20,
235 
239  kMaxSpins = 2000,
240  };
The minimum spin count that we will adaptively select.
template<template< typename > class Atom>
enum folly::detail::TurnSequencer::TryWaitResult
strong
Enumerator
SUCCESS 
PAST 
TIMEDOUT 

Definition at line 82 of file TurnSequencer.h.

Constructor & Destructor Documentation

template<template< typename > class Atom>
folly::detail::TurnSequencer< Atom >::TurnSequencer ( const uint32_t  firstTurn = 0)
inlineexplicitnoexcept

Definition at line 73 of file TurnSequencer.h.

74  : state_(encode(firstTurn << kTurnShift, 0)) {}
uint32_t encode(uint32_t currentSturn, uint32_t maxWaiterD) const noexcept

Member Function Documentation

template<template< typename > class Atom>
void folly::detail::TurnSequencer< Atom >::completeTurn ( const uint32_t  turn)
inlinenoexcept

Unblocks a thread running waitForTurn(turn + 1)

Definition at line 195 of file TurnSequencer.h.

Referenced by folly::detail::SingleElementQueue< T, Atom >::dequeueImpl(), and folly::detail::SingleElementQueue< T, Atom >::enqueueImpl().

195  {
196  uint32_t state = state_.load(std::memory_order_acquire);
197  while (true) {
198  DCHECK(state == encode(turn << kTurnShift, decodeMaxWaitersDelta(state)));
199  uint32_t max_waiter_delta = decodeMaxWaitersDelta(state);
200  uint32_t new_state = encode(
201  (turn + 1) << kTurnShift,
202  max_waiter_delta == 0 ? 0 : max_waiter_delta - 1);
203  if (state_.compare_exchange_strong(state, new_state)) {
204  if (max_waiter_delta != 0) {
207  }
208  break;
209  }
210  // failing compare_exchange_strong updates first arg to the value
211  // that caused the failure, so no need to reread state_
212  }
213  }
LogLevel max
Definition: LogLevel.cpp:31
uint32_t encode(uint32_t currentSturn, uint32_t maxWaiterD) const noexcept
uint32_t futexChannel(uint32_t turn) const noexcept
uint32_t decodeMaxWaitersDelta(uint32_t state) const noexcept
state
Definition: http_parser.c:272
int futexWake(const Futex *futex, int count, uint32_t wakeMask)
Definition: Futex-inl.h:107
template<template< typename > class Atom>
uint32_t folly::detail::TurnSequencer< Atom >::decodeCurrentSturn ( uint32_t  state) const
inlineprivatenoexcept
template<template< typename > class Atom>
uint32_t folly::detail::TurnSequencer< Atom >::decodeMaxWaitersDelta ( uint32_t  state) const
inlineprivatenoexcept
template<template< typename > class Atom>
uint32_t folly::detail::TurnSequencer< Atom >::encode ( uint32_t  currentSturn,
uint32_t  maxWaiterD 
) const
inlineprivatenoexcept
template<template< typename > class Atom>
uint32_t folly::detail::TurnSequencer< Atom >::futexChannel ( uint32_t  turn) const
inlineprivatenoexcept

Returns the bitmask to pass futexWait or futexWake when communicating about the specified turn

Definition at line 248 of file TurnSequencer.h.

Referenced by folly::detail::TurnSequencer< std::atomic >::completeTurn(), and folly::detail::TurnSequencer< std::atomic >::tryWaitForTurn().

248  {
249  return 1u << (turn & 31);
250  }
template<template< typename > class Atom>
bool folly::detail::TurnSequencer< Atom >::isTurn ( const uint32_t  turn) const
inlinenoexcept

Returns true iff a call to waitForTurn(turn, ...) won't block.

Definition at line 77 of file TurnSequencer.h.

77  {
78  auto state = state_.load(std::memory_order_acquire);
79  return decodeCurrentSturn(state) == (turn << kTurnShift);
80  }
uint32_t decodeCurrentSturn(uint32_t state) const noexcept
state
Definition: http_parser.c:272
template<template< typename > class Atom>
template<class Clock = std::chrono::steady_clock, class Duration = typename Clock::duration>
TryWaitResult folly::detail::TurnSequencer< Atom >::tryWaitForTurn ( const uint32_t  turn,
Atom< uint32_t > &  spinCutoff,
const bool  updateSpinCutoff,
const std::chrono::time_point< Clock, Duration > *  absTime = nullptr 
)
inlinenoexcept

Blocks the current thread until turn has arrived. If updateSpinCutoff is true then this will spin for up to kMaxSpins tries before blocking and will adjust spinCutoff based on the results, otherwise it will spin for at most spinCutoff spins. Returns SUCCESS if the wait succeeded, PAST if the turn is in the past or TIMEDOUT if the absTime time value is not nullptr and is reached before the turn arrives

Definition at line 109 of file TurnSequencer.h.

Referenced by folly::detail::TurnSequencer< std::atomic >::waitForTurn().

114  {
115  uint32_t prevThresh = spinCutoff.load(std::memory_order_relaxed);
116  const uint32_t effectiveSpinCutoff =
117  updateSpinCutoff || prevThresh == 0 ? kMaxSpins : prevThresh;
118 
119  uint32_t tries;
120  const uint32_t sturn = turn << kTurnShift;
121  for (tries = 0;; ++tries) {
122  uint32_t state = state_.load(std::memory_order_acquire);
123  uint32_t current_sturn = decodeCurrentSturn(state);
124  if (current_sturn == sturn) {
125  break;
126  }
127 
128  // wrap-safe version of (current_sturn >= sturn)
129  if (sturn - current_sturn >= std::numeric_limits<uint32_t>::max() / 2) {
130  // turn is in the past
131  return TryWaitResult::PAST;
132  }
133 
134  // the first effectSpinCutoff tries are spins, after that we will
135  // record ourself as a waiter and block with futexWait
136  if (tries < effectiveSpinCutoff) {
138  continue;
139  }
140 
141  uint32_t current_max_waiter_delta = decodeMaxWaitersDelta(state);
142  uint32_t our_waiter_delta = (sturn - current_sturn) >> kTurnShift;
143  uint32_t new_state;
144  if (our_waiter_delta <= current_max_waiter_delta) {
145  // state already records us as waiters, probably because this
146  // isn't our first time around this loop
147  new_state = state;
148  } else {
149  new_state = encode(current_sturn, our_waiter_delta);
150  if (state != new_state &&
151  !state_.compare_exchange_strong(state, new_state)) {
152  continue;
153  }
154  }
155  if (absTime) {
156  auto futexResult = detail::futexWaitUntil(
157  &state_, new_state, *absTime, futexChannel(turn));
158  if (futexResult == FutexResult::TIMEDOUT) {
160  }
161  } else {
162  detail::futexWait(&state_, new_state, futexChannel(turn));
163  }
164  }
165 
166  if (updateSpinCutoff || prevThresh == 0) {
167  // if we hit kMaxSpins then spinning was pointless, so the right
168  // spinCutoff is kMinSpins
169  uint32_t target;
170  if (tries >= kMaxSpins) {
171  target = kMinSpins;
172  } else {
173  // to account for variations, we allow ourself to spin 2*N when
174  // we think that N is actually required in order to succeed
175  target = std::min<uint32_t>(
176  kMaxSpins, std::max<uint32_t>(kMinSpins, tries * 2));
177  }
178 
179  if (prevThresh == 0) {
180  // bootstrap
181  spinCutoff.store(target);
182  } else {
183  // try once, keep moving if CAS fails. Exponential moving average
184  // with alpha of 7/8
185  // Be careful that the quantity we add to prevThresh is signed.
186  spinCutoff.compare_exchange_weak(
187  prevThresh, prevThresh + int(target - prevThresh) / 8);
188  }
189  }
190 
191  return TryWaitResult::SUCCESS;
192  }
uint32_t decodeCurrentSturn(uint32_t state) const noexcept
LogLevel max
Definition: LogLevel.cpp:31
uint32_t encode(uint32_t currentSturn, uint32_t maxWaiterD) const noexcept
The minimum spin count that we will adaptively select.
FutexResult futexWait(const Futex *futex, uint32_t expected, uint32_t waitMask)
Definition: Futex-inl.h:100
FutexResult futexWaitUntil(const Futex *futex, uint32_t expected, std::chrono::time_point< Clock, Duration > const &deadline, uint32_t waitMask)
Definition: Futex-inl.h:112
uint32_t futexChannel(uint32_t turn) const noexcept
uint32_t decodeMaxWaitersDelta(uint32_t state) const noexcept
state
Definition: http_parser.c:272
void asm_volatile_pause()
Definition: Asm.h:37
template<template< typename > class Atom>
uint8_t folly::detail::TurnSequencer< Atom >::uncompletedTurnLSB ( ) const
inlinenoexcept

Returns the least-most significant byte of the current uncompleted turn. The full 32 bit turn cannot be recovered.

Definition at line 217 of file TurnSequencer.h.

217  {
218  return uint8_t(state_.load(std::memory_order_acquire) >> kTurnShift);
219  }
template<template< typename > class Atom>
void folly::detail::TurnSequencer< Atom >::waitForTurn ( const uint32_t  turn,
Atom< uint32_t > &  spinCutoff,
const bool  updateSpinCutoff 
)
inlinenoexcept

See tryWaitForTurn Requires that turn is not a turn in the past.

Definition at line 86 of file TurnSequencer.h.

Referenced by folly::detail::SingleElementQueue< T, Atom >::dequeueImpl(), and folly::detail::SingleElementQueue< T, Atom >::enqueueImpl().

89  {
90  const auto ret = tryWaitForTurn(turn, spinCutoff, updateSpinCutoff);
91  DCHECK(ret == TryWaitResult::SUCCESS);
92  }
TryWaitResult tryWaitForTurn(const uint32_t turn, Atom< uint32_t > &spinCutoff, const bool updateSpinCutoff, const std::chrono::time_point< Clock, Duration > *absTime=nullptr) noexcept

Member Data Documentation

template<template< typename > class Atom>
Futex<Atom> folly::detail::TurnSequencer< Atom >::state_
private

This holds both the current turn, and the highest waiting turn, stored as (current_turn << 6) | min(63, max(waited_turn - current_turn))

Definition at line 244 of file TurnSequencer.h.

Referenced by folly::detail::TurnSequencer< std::atomic >::completeTurn(), folly::detail::TurnSequencer< std::atomic >::isTurn(), folly::detail::TurnSequencer< std::atomic >::tryWaitForTurn(), and folly::detail::TurnSequencer< std::atomic >::uncompletedTurnLSB().


The documentation for this struct was generated from the following file: