proxygen
BucketedTimeSeries-defs.h
Go to the documentation of this file.
1 /*
2  * Copyright 2012-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <folly/Likely.h>
21 #include <glog/logging.h>
22 #include <algorithm>
23 #include <stdexcept>
24 
25 namespace folly {
26 
27 template <typename VT, typename CT>
29  size_t nBuckets,
30  Duration maxDuration)
31  : firstTime_(Duration(1)), latestTime_(), duration_(maxDuration) {
32  // For tracking all-time data we only use total_, and don't need to bother
33  // with buckets_
34  if (!isAllTime()) {
35  // Round nBuckets down to duration_.count().
36  //
37  // There is no point in having more buckets than our timestamp
38  // granularity: otherwise we would have buckets that could never be used.
39  if (nBuckets > size_t(duration_.count())) {
40  nBuckets = size_t(duration_.count());
41  }
42 
43  buckets_.resize(nBuckets, Bucket());
44  }
45 }
46 
47 template <typename VT, typename CT>
49  TimePoint theFirstTime,
50  TimePoint theLatestTime,
51  Duration maxDuration,
52  const std::vector<Bucket>& bucketsList)
53  : firstTime_(theFirstTime),
54  latestTime_(theLatestTime),
55  duration_(maxDuration),
56  buckets_(bucketsList) {
57  // Come up with the total_ from buckets_ being passed in
58  for (auto const& bucket : buckets_) {
59  total_.add(bucket.sum, bucket.count);
60  }
61 
62  // Verify the integrity of the data
63 
64  // If firstTime is greater than latestTime, the total count should be 0.
65  // (firstTime being greater than latestTime means that no data points have
66  // ever been added to the time series.)
67  if (firstTime_ > latestTime_ && (total_.sum != 0 || total_.count != 0)) {
68  throw std::invalid_argument(
69  "The total should have been 0 "
70  "if firstTime is greater than lastestTime");
71  }
72 
73  // If firstTime is less than or equal to latestTime,
74  // latestTime - firstTime should be less than or equal to the duration.
75  if (firstTime_ <= latestTime_ && latestTime_ - firstTime_ > duration_) {
76  throw std::invalid_argument(
77  "The difference between firstTime and latestTime "
78  "should be less than or equal to the duration");
79  }
80 }
81 
82 template <typename VT, typename CT>
84  return addValueAggregated(now, val, 1);
85 }
86 
87 template <typename VT, typename CT>
89  TimePoint now,
90  const ValueType& val,
91  uint64_t times) {
92  return addValueAggregated(now, val * ValueType(times), times);
93 }
94 
95 template <typename VT, typename CT>
97  TimePoint now,
98  const ValueType& total,
99  uint64_t nsamples) {
100  if (isAllTime()) {
101  if (UNLIKELY(empty())) {
102  firstTime_ = now;
103  latestTime_ = now;
104  } else if (now > latestTime_) {
105  latestTime_ = now;
106  } else if (now < firstTime_) {
107  firstTime_ = now;
108  }
109  total_.add(total, nsamples);
110  return true;
111  }
112 
113  size_t bucketIdx;
114  if (UNLIKELY(empty())) {
115  // First data point we've ever seen
116  firstTime_ = now;
117  latestTime_ = now;
118  bucketIdx = getBucketIdx(now);
119  } else if (now > latestTime_) {
120  // More recent time. Need to update the buckets.
121  bucketIdx = updateBuckets(now);
122  } else if (LIKELY(now == latestTime_)) {
123  // Current time.
124  bucketIdx = getBucketIdx(now);
125  } else {
126  // An earlier time in the past. We need to check if this time still falls
127  // within our window.
128  if (now < getEarliestTimeNonEmpty()) {
129  return false;
130  }
131  bucketIdx = getBucketIdx(now);
132  }
133 
134  total_.add(total, nsamples);
135  buckets_[bucketIdx].add(total, nsamples);
136  return true;
137 }
138 
139 template <typename VT, typename CT>
141  if (empty()) {
142  // This is the first data point.
143  firstTime_ = now;
144  }
145 
146  // For all-time data, all we need to do is update latestTime_
147  if (isAllTime()) {
149  return 0;
150  }
151 
152  // Make sure time doesn't go backwards.
153  // If the time is less than or equal to the latest time we have already seen,
154  // we don't need to do anything.
155  if (now <= latestTime_) {
156  return getBucketIdx(latestTime_);
157  }
158 
159  return updateBuckets(now);
160 }
161 
162 template <typename VT, typename CT>
164  // We could cache nextBucketStart as a member variable, so we don't have to
165  // recompute it each time update() is called with a new timestamp value.
166  // This makes things faster when update() (or addValue()) is called once
167  // per second, but slightly slower when update() is called multiple times a
168  // second. We care more about optimizing the cases where addValue() is being
169  // called frequently. If addValue() is only being called once every few
170  // seconds, it doesn't matter as much if it is fast.
171 
172  // Get info about the bucket that latestTime_ points at
173  size_t currentBucket;
174  TimePoint currentBucketStart;
175  TimePoint nextBucketStart;
177  latestTime_, &currentBucket, &currentBucketStart, &nextBucketStart);
178 
179  // Update latestTime_
180  latestTime_ = now;
181 
182  if (now < nextBucketStart) {
183  // We're still in the same bucket.
184  // We're done after updating latestTime_.
185  return currentBucket;
186  } else if (now >= currentBucketStart + duration_) {
187  // It's been a while. We have wrapped, and all of the buckets need to be
188  // cleared.
189  for (Bucket& bucket : buckets_) {
190  bucket.clear();
191  }
192  total_.clear();
193  return getBucketIdx(latestTime_);
194  } else {
195  // clear all the buckets between the last time and current time, meaning
196  // buckets in the range [(currentBucket+1), newBucket]. Note that
197  // the bucket (currentBucket+1) is always the oldest bucket we have. Since
198  // our array is circular, loop when we reach the end.
199  size_t newBucket = getBucketIdx(now);
200  size_t idx = currentBucket;
201  while (idx != newBucket) {
202  ++idx;
203  if (idx >= buckets_.size()) {
204  idx = 0;
205  }
206  total_ -= buckets_[idx];
207  buckets_[idx].clear();
208  }
209  return newBucket;
210  }
211 }
212 
213 template <typename VT, typename CT>
215  for (Bucket& bucket : buckets_) {
216  bucket.clear();
217  }
218  total_.clear();
219  // Set firstTime_ larger than latestTime_,
220  // to indicate that the timeseries is empty
223 }
224 
225 template <typename VT, typename CT>
226 typename CT::time_point BucketedTimeSeries<VT, CT>::getEarliestTime() const {
227  if (empty()) {
228  return TimePoint();
229  }
230  if (isAllTime()) {
231  return firstTime_;
232  }
233 
234  // Compute the earliest time we can track
235  TimePoint earliestTime = getEarliestTimeNonEmpty();
236 
237  // We're never tracking data before firstTime_
238  earliestTime = std::max(earliestTime, firstTime_);
239 
240  return earliestTime;
241 }
242 
243 template <typename VT, typename CT>
245  const {
246  size_t currentBucket;
247  TimePoint currentBucketStart;
248  TimePoint nextBucketStart;
250  latestTime_, &currentBucket, &currentBucketStart, &nextBucketStart);
251 
252  // Subtract 1 duration from the start of the next bucket to find the
253  // earliest possible data point we could be tracking.
254  return nextBucketStart - duration_;
255 }
256 
257 template <typename VT, typename CT>
258 typename CT::duration BucketedTimeSeries<VT, CT>::elapsed() const {
259  if (empty()) {
260  return Duration(0);
261  }
262 
263  // Add 1 since [latestTime_, earliestTime] is an inclusive interval.
264  return latestTime_ - getEarliestTime() + Duration(1);
265 }
266 
267 template <typename VT, typename CT>
270  TimePoint end) const {
271  if (empty()) {
272  return Duration(0);
273  }
274  start = std::max(start, getEarliestTime());
275  end = std::min(end, latestTime_ + Duration(1));
276  end = std::max(start, end);
277  return end - start;
278 }
279 
280 template <typename VT, typename CT>
282  ValueType total = ValueType();
284  start,
285  end,
286  [&](const Bucket& bucket,
287  TimePoint bucketStart,
288  TimePoint nextBucketStart) -> bool {
289  total += this->rangeAdjust(
290  bucketStart, nextBucketStart, start, end, bucket.sum);
291  return true;
292  });
293 
294  return total;
295 }
296 
297 template <typename VT, typename CT>
299  const {
300  uint64_t sample_count = 0;
302  start,
303  end,
304  [&](const Bucket& bucket,
305  TimePoint bucketStart,
306  TimePoint nextBucketStart) -> bool {
307  sample_count += this->rangeAdjust(
308  bucketStart, nextBucketStart, start, end, ValueType(bucket.count));
309  return true;
310  });
311 
312  return sample_count;
313 }
314 
315 template <typename VT, typename CT>
316 template <typename ReturnType>
318  const {
319  ValueType total = ValueType();
320  uint64_t sample_count = 0;
322  start,
323  end,
324  [&](const Bucket& bucket,
325  TimePoint bucketStart,
326  TimePoint nextBucketStart) -> bool {
327  total += this->rangeAdjust(
328  bucketStart, nextBucketStart, start, end, bucket.sum);
329  sample_count += this->rangeAdjust(
330  bucketStart, nextBucketStart, start, end, ValueType(bucket.count));
331  return true;
332  });
333 
334  if (sample_count == 0) {
335  return ReturnType(0);
336  }
337 
338  return detail::avgHelper<ReturnType>(total, sample_count);
339 }
340 
341 /*
342  * A note about some of the bucket index calculations below:
343  *
344  * buckets_.size() may not divide evenly into duration_. When this happens,
345  * some buckets will be wider than others. We still want to spread the data
346  * out as evenly as possible among the buckets (as opposed to just making the
347  * last bucket be significantly wider than all of the others).
348  *
349  * To make the division work out, we pretend that the buckets are each
350  * duration_ wide, so that the overall duration becomes
351  * buckets.size() * duration_.
352  *
353  * To transform a real timestamp into the scale used by our buckets,
354  * we have to multiply by buckets_.size(). To figure out which bucket it goes
355  * into, we then divide by duration_.
356  */
357 
358 template <typename VT, typename CT>
360  // For all-time data we don't use buckets_. Everything is tracked in total_.
361  DCHECK(!isAllTime());
362 
363  auto timeIntoCurrentCycle = (time.time_since_epoch() % duration_);
364  return timeIntoCurrentCycle.count() * buckets_.size() / duration_.count();
365 }
366 
367 /*
368  * Compute the bucket index for the specified time, as well as the earliest
369  * time that falls into this bucket.
370  */
371 template <typename VT, typename CT>
373  TimePoint time,
374  size_t* bucketIdx,
375  TimePoint* bucketStart,
376  TimePoint* nextBucketStart) const {
377  typedef typename Duration::rep TimeInt;
378  DCHECK(!isAllTime());
379 
380  // Keep these two lines together. The compiler should be able to compute
381  // both the division and modulus with a single operation.
382  Duration timeMod = time.time_since_epoch() % duration_;
383  TimeInt numFullDurations = time.time_since_epoch() / duration_;
384 
385  TimeInt scaledTime = timeMod.count() * TimeInt(buckets_.size());
386 
387  // Keep these two lines together. The compiler should be able to compute
388  // both the division and modulus with a single operation.
389  *bucketIdx = size_t(scaledTime / duration_.count());
390  TimeInt scaledOffsetInBucket = scaledTime % duration_.count();
391 
392  TimeInt scaledBucketStart = scaledTime - scaledOffsetInBucket;
393  TimeInt scaledNextBucketStart = scaledBucketStart + duration_.count();
394 
395  Duration bucketStartMod(
396  (scaledBucketStart + buckets_.size() - 1) / buckets_.size());
397  Duration nextBucketStartMod(
398  (scaledNextBucketStart + buckets_.size() - 1) / buckets_.size());
399 
400  TimePoint durationStart(numFullDurations * duration_);
401  *bucketStart = bucketStartMod + durationStart;
402  *nextBucketStart = nextBucketStartMod + durationStart;
403 }
404 
405 template <typename VT, typename CT>
406 template <typename Function>
408  if (isAllTime()) {
410  return;
411  }
412 
413  typedef typename Duration::rep TimeInt;
414 
415  // Compute durationStart, latestBucketIdx, and scaledNextBucketStart,
416  // the same way as in getBucketInfo().
417  Duration timeMod = latestTime_.time_since_epoch() % duration_;
418  TimeInt numFullDurations = latestTime_.time_since_epoch() / duration_;
419  TimePoint durationStart(numFullDurations * duration_);
420  TimeInt scaledTime = timeMod.count() * TimeInt(buckets_.size());
421  size_t latestBucketIdx = size_t(scaledTime / duration_.count());
422  TimeInt scaledOffsetInBucket = scaledTime % duration_.count();
423  TimeInt scaledBucketStart = scaledTime - scaledOffsetInBucket;
424  TimeInt scaledNextBucketStart = scaledBucketStart + duration_.count();
425 
426  // Walk through the buckets, starting one past the current bucket.
427  // The next bucket is from the previous cycle, so subtract 1 duration
428  // from durationStart.
429  size_t idx = latestBucketIdx;
430  durationStart -= duration_;
431 
432  TimePoint nextBucketStart =
433  Duration(
434  (scaledNextBucketStart + buckets_.size() - 1) / buckets_.size()) +
435  durationStart;
436  while (true) {
437  ++idx;
438  if (idx >= buckets_.size()) {
439  idx = 0;
440  durationStart += duration_;
441  scaledNextBucketStart = duration_.count();
442  } else {
443  scaledNextBucketStart += duration_.count();
444  }
445 
446  TimePoint bucketStart = nextBucketStart;
447  nextBucketStart =
448  Duration(
449  (scaledNextBucketStart + buckets_.size() - 1) / buckets_.size()) +
450  durationStart;
451 
452  // Should we bother skipping buckets where firstTime_ >= nextBucketStart?
453  // For now we go ahead and invoke the function with these buckets.
454  // sum and count should always be 0 in these buckets.
455 
456  DCHECK_LE(
457  bucketStart.time_since_epoch().count(),
458  latestTime_.time_since_epoch().count());
459  bool ret = fn(buckets_[idx], bucketStart, nextBucketStart);
460  if (!ret) {
461  break;
462  }
463 
464  if (idx == latestBucketIdx) {
465  // all done
466  break;
467  }
468  }
469 }
470 
471 /*
472  * Adjust the input value from the specified bucket to only account
473  * for the desired range.
474  *
475  * For example, if the bucket spans time [10, 20), but we only care about the
476  * range [10, 16), this will return 60% of the input value.
477  */
478 template <typename VT, typename CT>
480  TimePoint bucketStart,
481  TimePoint nextBucketStart,
483  TimePoint end,
484  ValueType input) const {
485  // If nextBucketStart is greater than latestTime_, treat nextBucketStart as
486  // if it were latestTime_. This makes us more accurate when someone is
487  // querying for all of the data up to latestTime_. Even though latestTime_
488  // may only be partially through the bucket, we don't want to adjust
489  // downwards in this case, because the bucket really only has data up to
490  // latestTime_.
491  if (bucketStart <= latestTime_ && nextBucketStart > latestTime_) {
492  nextBucketStart = latestTime_ + Duration(1);
493  }
494 
495  if (start <= bucketStart && end >= nextBucketStart) {
496  // The bucket is wholly contained in the [start, end) interval
497  return input;
498  }
499 
500  TimePoint intervalStart = std::max(start, bucketStart);
501  TimePoint intervalEnd = std::min(end, nextBucketStart);
502  return input * (intervalEnd - intervalStart) /
503  (nextBucketStart - bucketStart);
504 }
505 
506 template <typename VT, typename CT>
507 template <typename Function>
510  TimePoint end,
511  Function fn) const {
513  [&start, &end, &fn](
514  const Bucket& bucket,
515  TimePoint bucketStart,
516  TimePoint nextBucketStart) -> bool {
517  if (start >= nextBucketStart) {
518  return true;
519  }
520  if (end <= bucketStart) {
521  return false;
522  }
523  bool ret = fn(bucket, bucketStart, nextBucketStart);
524  return ret;
525  });
526 }
527 
528 } // namespace folly
LogLevel max
Definition: LogLevel.cpp:31
std::chrono::steady_clock::time_point now()
#define LIKELY(x)
Definition: Likely.h:47
double val
Definition: String.cpp:273
ValueType rangeAdjust(TimePoint bucketStart, TimePoint nextBucketStart, TimePoint start, TimePoint end, ValueType input) const
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
size_t updateBuckets(TimePoint now)
bool addValueAggregated(TimePoint now, const ValueType &total, uint64_t nsamples)
TimePoint getEarliestTimeNonEmpty() const
std::vector< Bucket > buckets_
LogLevel min
Definition: LogLevel.cpp:30
void getBucketInfo(TimePoint time, size_t *bucketIdx, TimePoint *bucketStart, TimePoint *nextBucketStart) const
auto end(TestAdlIterable &instance)
Definition: ForeachTest.cpp:62
const ValueType & sum() const
void add(const ValueType &s, uint64_t c)
Definition: Bucket.h:102
auto start
BucketedTimeSeries(size_t numBuckets, Duration duration)
bool addValue(TimePoint now, const ValueType &val)
size_t getBucketIdx(TimePoint time) const
detail::Bucket< ValueType > Bucket
Future< Unit > times(const int n, F &&thunk)
Definition: Future-inl.h:2348
#define UNLIKELY(x)
Definition: Likely.h:48
typename Clock::time_point TimePoint
std::chrono::nanoseconds time()
typename Clock::duration Duration
void forEachBucket(Function fn) const