proxygen
TurnSequencer.h
Go to the documentation of this file.
1 /*
2  * Copyright 2015-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <algorithm>
20 #include <limits>
21 
22 #include <folly/detail/Futex.h>
23 #include <folly/portability/Asm.h>
25 
26 #include <glog/logging.h>
27 
28 namespace folly {
29 
30 namespace detail {
31 
71 template <template <typename> class Atom>
72 struct TurnSequencer {
73  explicit TurnSequencer(const uint32_t firstTurn = 0) noexcept
74  : state_(encode(firstTurn << kTurnShift, 0)) {}
75 
77  bool isTurn(const uint32_t turn) const noexcept {
78  auto state = state_.load(std::memory_order_acquire);
79  return decodeCurrentSturn(state) == (turn << kTurnShift);
80  }
81 
82  enum class TryWaitResult { SUCCESS, PAST, TIMEDOUT };
83 
87  const uint32_t turn,
88  Atom<uint32_t>& spinCutoff,
89  const bool updateSpinCutoff) noexcept {
90  const auto ret = tryWaitForTurn(turn, spinCutoff, updateSpinCutoff);
91  DCHECK(ret == TryWaitResult::SUCCESS);
92  }
93 
94  // Internally we always work with shifted turn values, which makes the
95  // truncation and wraparound work correctly. This leaves us bits at
96  // the bottom to store the number of waiters. We call shifted turns
97  // "sturns" inside this class.
98 
106  template <
107  class Clock = std::chrono::steady_clock,
108  class Duration = typename Clock::duration>
110  const uint32_t turn,
111  Atom<uint32_t>& spinCutoff,
112  const bool updateSpinCutoff,
113  const std::chrono::time_point<Clock, Duration>* absTime =
114  nullptr) noexcept {
115  uint32_t prevThresh = spinCutoff.load(std::memory_order_relaxed);
116  const uint32_t effectiveSpinCutoff =
117  updateSpinCutoff || prevThresh == 0 ? kMaxSpins : prevThresh;
118 
119  uint32_t tries;
120  const uint32_t sturn = turn << kTurnShift;
121  for (tries = 0;; ++tries) {
122  uint32_t state = state_.load(std::memory_order_acquire);
123  uint32_t current_sturn = decodeCurrentSturn(state);
124  if (current_sturn == sturn) {
125  break;
126  }
127 
128  // wrap-safe version of (current_sturn >= sturn)
129  if (sturn - current_sturn >= std::numeric_limits<uint32_t>::max() / 2) {
130  // turn is in the past
131  return TryWaitResult::PAST;
132  }
133 
134  // the first effectSpinCutoff tries are spins, after that we will
135  // record ourself as a waiter and block with futexWait
136  if (tries < effectiveSpinCutoff) {
138  continue;
139  }
140 
141  uint32_t current_max_waiter_delta = decodeMaxWaitersDelta(state);
142  uint32_t our_waiter_delta = (sturn - current_sturn) >> kTurnShift;
143  uint32_t new_state;
144  if (our_waiter_delta <= current_max_waiter_delta) {
145  // state already records us as waiters, probably because this
146  // isn't our first time around this loop
147  new_state = state;
148  } else {
149  new_state = encode(current_sturn, our_waiter_delta);
150  if (state != new_state &&
151  !state_.compare_exchange_strong(state, new_state)) {
152  continue;
153  }
154  }
155  if (absTime) {
156  auto futexResult = detail::futexWaitUntil(
157  &state_, new_state, *absTime, futexChannel(turn));
158  if (futexResult == FutexResult::TIMEDOUT) {
160  }
161  } else {
162  detail::futexWait(&state_, new_state, futexChannel(turn));
163  }
164  }
165 
166  if (updateSpinCutoff || prevThresh == 0) {
167  // if we hit kMaxSpins then spinning was pointless, so the right
168  // spinCutoff is kMinSpins
169  uint32_t target;
170  if (tries >= kMaxSpins) {
171  target = kMinSpins;
172  } else {
173  // to account for variations, we allow ourself to spin 2*N when
174  // we think that N is actually required in order to succeed
175  target = std::min<uint32_t>(
176  kMaxSpins, std::max<uint32_t>(kMinSpins, tries * 2));
177  }
178 
179  if (prevThresh == 0) {
180  // bootstrap
181  spinCutoff.store(target);
182  } else {
183  // try once, keep moving if CAS fails. Exponential moving average
184  // with alpha of 7/8
185  // Be careful that the quantity we add to prevThresh is signed.
186  spinCutoff.compare_exchange_weak(
187  prevThresh, prevThresh + int(target - prevThresh) / 8);
188  }
189  }
190 
191  return TryWaitResult::SUCCESS;
192  }
193 
195  void completeTurn(const uint32_t turn) noexcept {
196  uint32_t state = state_.load(std::memory_order_acquire);
197  while (true) {
198  DCHECK(state == encode(turn << kTurnShift, decodeMaxWaitersDelta(state)));
199  uint32_t max_waiter_delta = decodeMaxWaitersDelta(state);
200  uint32_t new_state = encode(
201  (turn + 1) << kTurnShift,
202  max_waiter_delta == 0 ? 0 : max_waiter_delta - 1);
203  if (state_.compare_exchange_strong(state, new_state)) {
204  if (max_waiter_delta != 0) {
207  }
208  break;
209  }
210  // failing compare_exchange_strong updates first arg to the value
211  // that caused the failure, so no need to reread state_
212  }
213  }
214 
218  return uint8_t(state_.load(std::memory_order_acquire) >> kTurnShift);
219  }
220 
221  private:
222  enum : uint32_t {
231  kWaitersMask = (1 << kTurnShift) - 1,
232 
234  kMinSpins = 20,
235 
239  kMaxSpins = 2000,
240  };
241 
245 
249  return 1u << (turn & 31);
250  }
251 
253  return state & ~kWaitersMask;
254  }
255 
257  return state & kWaitersMask;
258  }
259 
260  uint32_t encode(uint32_t currentSturn, uint32_t maxWaiterD) const noexcept {
261  return currentSturn | std::min(uint32_t{kWaitersMask}, maxWaiterD);
262  }
263 };
264 
265 } // namespace detail
266 } // namespace folly
void completeTurn(const uint32_t turn) noexcept
Unblocks a thread running waitForTurn(turn + 1)
TryWaitResult tryWaitForTurn(const uint32_t turn, Atom< uint32_t > &spinCutoff, const bool updateSpinCutoff, const std::chrono::time_point< Clock, Duration > *absTime=nullptr) noexcept
uint32_t decodeCurrentSturn(uint32_t state) const noexcept
LogLevel max
Definition: LogLevel.cpp:31
uint32_t encode(uint32_t currentSturn, uint32_t maxWaiterD) const noexcept
Atom< std::uint32_t > Futex
Definition: Futex.h:51
TurnSequencer(const uint32_t firstTurn=0) noexcept
Definition: TurnSequencer.h:73
void waitForTurn(const uint32_t turn, Atom< uint32_t > &spinCutoff, const bool updateSpinCutoff) noexcept
Definition: TurnSequencer.h:86
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
The minimum spin count that we will adaptively select.
requires E e noexcept(noexcept(s.error(std::move(e))))
FutexResult futexWait(const Futex *futex, uint32_t expected, uint32_t waitMask)
Definition: Futex-inl.h:100
std::chrono::milliseconds Duration
Definition: Types.h:36
LogLevel min
Definition: LogLevel.cpp:30
#define Atom
FutexResult futexWaitUntil(const Futex *futex, uint32_t expected, std::chrono::time_point< Clock, Duration > const &deadline, uint32_t waitMask)
Definition: Futex-inl.h:112
uint32_t futexChannel(uint32_t turn) const noexcept
uint32_t decodeMaxWaitersDelta(uint32_t state) const noexcept
const
Definition: upload.py:398
uint8_t uncompletedTurnLSB() const noexcept
state
Definition: http_parser.c:272
bool isTurn(const uint32_t turn) const noexcept
Returns true iff a call to waitForTurn(turn, ...) won&#39;t block.
Definition: TurnSequencer.h:77
int futexWake(const Futex *futex, int count, uint32_t wakeMask)
Definition: Futex-inl.h:107
void asm_volatile_pause()
Definition: Asm.h:37