proxygen
TokenBucket.h
Go to the documentation of this file.
1 /*
2  * Copyright 2015-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <algorithm>
20 #include <atomic>
21 #include <chrono>
22 
23 #include <folly/Likely.h>
25 
26 namespace folly {
27 
47 template <typename Clock = std::chrono::steady_clock>
49  static_assert(Clock::is_steady, "clock must be steady");
50 
51  public:
59  explicit BasicDynamicTokenBucket(double zeroTime = 0) noexcept
60  : zeroTime_(zeroTime) {}
61 
69  : zeroTime_(other.zeroTime_.load()) {}
70 
78  const BasicDynamicTokenBucket& other) noexcept {
79  zeroTime_ = other.zeroTime_.load();
80  return *this;
81  }
82 
92  void reset(double zeroTime = 0) noexcept {
93  zeroTime_ = zeroTime;
94  }
95 
99  static double defaultClockNow() noexcept {
100  using dur = std::chrono::duration<double>;
101  auto const now = Clock::now().time_since_epoch();
102  return std::chrono::duration_cast<dur>(now).count();
103  }
104 
121  bool consume(
122  double toConsume,
123  double rate,
124  double burstSize,
125  double nowInSeconds = defaultClockNow()) {
126  assert(rate > 0);
127  assert(burstSize > 0);
128 
129  return consumeImpl(
130  rate, burstSize, nowInSeconds, [toConsume](double& tokens) {
131  if (tokens < toConsume) {
132  return false;
133  }
134  tokens -= toConsume;
135  return true;
136  });
137  }
138 
155  double toConsume,
156  double rate,
157  double burstSize,
158  double nowInSeconds = defaultClockNow()) {
159  assert(rate > 0);
160  assert(burstSize > 0);
161 
162  double consumed;
163  consumeImpl(
164  rate, burstSize, nowInSeconds, [&consumed, toConsume](double& tokens) {
165  if (tokens < toConsume) {
166  consumed = tokens;
167  tokens = 0.0;
168  } else {
169  consumed = toConsume;
170  tokens -= toConsume;
171  }
172  return true;
173  });
174  return consumed;
175  }
176 
182  double available(
183  double rate,
184  double burstSize,
185  double nowInSeconds = defaultClockNow()) const noexcept {
186  assert(rate > 0);
187  assert(burstSize > 0);
188 
189  return std::min((nowInSeconds - this->zeroTime_) * rate, burstSize);
190  }
191 
192  private:
193  template <typename TCallback>
195  double rate,
196  double burstSize,
197  double nowInSeconds,
198  const TCallback& callback) {
199  auto zeroTimeOld = zeroTime_.load();
200  double zeroTimeNew;
201  do {
202  auto tokens = std::min((nowInSeconds - zeroTimeOld) * rate, burstSize);
203  if (!callback(tokens)) {
204  return false;
205  }
206  zeroTimeNew = nowInSeconds - tokens / rate;
207  } while (
208  UNLIKELY(!zeroTime_.compare_exchange_weak(zeroTimeOld, zeroTimeNew)));
209 
210  return true;
211  }
212 
213  alignas(hardware_destructive_interference_size) std::atomic<double> zeroTime_;
214 };
215 
220 template <typename Clock = std::chrono::steady_clock>
222  static_assert(Clock::is_steady, "clock must be steady");
223 
224  private:
226 
227  public:
238  double genRate,
239  double burstSize,
240  double zeroTime = 0) noexcept
241  : tokenBucket_(zeroTime), rate_(genRate), burstSize_(burstSize) {
242  assert(rate_ > 0);
243  assert(burstSize_ > 0);
244  }
245 
251  BasicTokenBucket(const BasicTokenBucket& other) noexcept = default;
252 
258  BasicTokenBucket& operator=(const BasicTokenBucket& other) noexcept = default;
259 
263  static double defaultClockNow() noexcept(noexcept(Impl::defaultClockNow())) {
264  return Impl::defaultClockNow();
265  }
266 
278  void reset(
279  double genRate,
280  double burstSize,
281  double nowInSeconds = defaultClockNow()) noexcept {
282  assert(genRate > 0);
283  assert(burstSize > 0);
284  const double availTokens = available(nowInSeconds);
285  rate_ = genRate;
286  burstSize_ = burstSize;
287  setCapacity(availTokens, nowInSeconds);
288  }
289 
300  void setCapacity(double tokens, double nowInSeconds) noexcept {
301  tokenBucket_.reset(nowInSeconds - tokens / rate_);
302  }
303 
318  bool consume(double toConsume, double nowInSeconds = defaultClockNow()) {
319  return tokenBucket_.consume(toConsume, rate_, burstSize_, nowInSeconds);
320  }
321 
336  double toConsume,
337  double nowInSeconds = defaultClockNow()) {
338  return tokenBucket_.consumeOrDrain(
339  toConsume, rate_, burstSize_, nowInSeconds);
340  }
341 
347  double available(double nowInSeconds = defaultClockNow()) const {
348  return tokenBucket_.available(rate_, burstSize_, nowInSeconds);
349  }
350 
356  double rate() const noexcept {
357  return rate_;
358  }
359 
365  double burst() const noexcept {
366  return burstSize_;
367  }
368 
369  private:
371  double rate_;
372  double burstSize_;
373 };
374 
377 
378 } // namespace folly
BasicTokenBucket(double genRate, double burstSize, double zeroTime=0) noexcept
Definition: TokenBucket.h:237
std::atomic< double > zeroTime_
Definition: TokenBucket.h:213
std::chrono::steady_clock::time_point now()
BasicDynamicTokenBucket & operator=(const BasicDynamicTokenBucket &other) noexcept
Definition: TokenBucket.h:77
bool consumeImpl(double rate, double burstSize, double nowInSeconds, const TCallback &callback)
Definition: TokenBucket.h:194
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
requires E e noexcept(noexcept(s.error(std::move(e))))
double available(double nowInSeconds=defaultClockNow()) const
Definition: TokenBucket.h:347
static double defaultClockNow() noexcept
Definition: TokenBucket.h:99
constexpr std::size_t hardware_destructive_interference_size
Definition: Align.h:107
bool consume(double toConsume, double rate, double burstSize, double nowInSeconds=defaultClockNow())
Definition: TokenBucket.h:121
void reset(double zeroTime=0) noexcept
Definition: TokenBucket.h:92
LogLevel min
Definition: LogLevel.cpp:30
double available(double rate, double burstSize, double nowInSeconds=defaultClockNow()) const noexcept
Definition: TokenBucket.h:182
double consumeOrDrain(double toConsume, double rate, double burstSize, double nowInSeconds=defaultClockNow())
Definition: TokenBucket.h:154
bool consume(double toConsume, double nowInSeconds=defaultClockNow())
Definition: TokenBucket.h:318
int * count
double rate() const noexcept
Definition: TokenBucket.h:356
const
Definition: upload.py:398
BasicDynamicTokenBucket(const BasicDynamicTokenBucket &other) noexcept
Definition: TokenBucket.h:68
void reset(double genRate, double burstSize, double nowInSeconds=defaultClockNow()) noexcept
Definition: TokenBucket.h:278
BasicDynamicTokenBucket(double zeroTime=0) noexcept
Definition: TokenBucket.h:59
#define UNLIKELY(x)
Definition: Likely.h:48
void setCapacity(double tokens, double nowInSeconds) noexcept
Definition: TokenBucket.h:300
static const char tokens[256]
Definition: http_parser.c:184
double burst() const noexcept
Definition: TokenBucket.h:365
static double defaultClockNow() noexcept(noexcept(Impl::defaultClockNow()))
Definition: TokenBucket.h:263
double consumeOrDrain(double toConsume, double nowInSeconds=defaultClockNow())
Definition: TokenBucket.h:335