21 #include <glog/logging.h> 27 template <
typename VT,
typename CT>
31 : firstTime_(
Duration(1)), latestTime_(), duration_(maxDuration) {
39 if (nBuckets >
size_t(
duration_.count())) {
47 template <
typename VT,
typename CT>
52 const std::vector<Bucket>& bucketsList)
58 for (
auto const& bucket :
buckets_) {
68 throw std::invalid_argument(
69 "The total should have been 0 " 70 "if firstTime is greater than lastestTime");
75 if (firstTime_ <= latestTime_ && latestTime_ - firstTime_ >
duration_) {
76 throw std::invalid_argument(
77 "The difference between firstTime and latestTime " 78 "should be less than or equal to the duration");
82 template <
typename VT,
typename CT>
87 template <
typename VT,
typename CT>
95 template <
typename VT,
typename CT>
135 buckets_[bucketIdx].add(total, nsamples);
139 template <
typename VT,
typename CT>
162 template <
typename VT,
typename CT>
173 size_t currentBucket;
177 latestTime_, ¤tBucket, ¤tBucketStart, &nextBucketStart);
182 if (now < nextBucketStart) {
185 return currentBucket;
186 }
else if (now >= currentBucketStart +
duration_) {
200 size_t idx = currentBucket;
201 while (idx != newBucket) {
213 template <
typename VT,
typename CT>
225 template <
typename VT,
typename CT>
243 template <
typename VT,
typename CT>
246 size_t currentBucket;
250 latestTime_, ¤tBucket, ¤tBucketStart, &nextBucketStart);
257 template <
typename VT,
typename CT>
267 template <
typename VT,
typename CT>
280 template <
typename VT,
typename CT>
290 bucketStart, nextBucketStart, start, end, bucket.
sum);
297 template <
typename VT,
typename CT>
308 bucketStart, nextBucketStart, start, end,
ValueType(bucket.
count));
315 template <
typename VT,
typename CT>
316 template <
typename ReturnType>
328 bucketStart, nextBucketStart, start, end, bucket.
sum);
330 bucketStart, nextBucketStart, start, end,
ValueType(bucket.
count));
334 if (sample_count == 0) {
335 return ReturnType(0);
338 return detail::avgHelper<ReturnType>(total, sample_count);
358 template <
typename VT,
typename CT>
363 auto timeIntoCurrentCycle = (time.time_since_epoch() %
duration_);
371 template <
typename VT,
typename CT>
377 typedef typename Duration::rep TimeInt;
383 TimeInt numFullDurations = time.time_since_epoch() /
duration_;
385 TimeInt scaledTime = timeMod.count() * TimeInt(
buckets_.size());
389 *bucketIdx = size_t(scaledTime /
duration_.count());
390 TimeInt scaledOffsetInBucket = scaledTime %
duration_.count();
392 TimeInt scaledBucketStart = scaledTime - scaledOffsetInBucket;
393 TimeInt scaledNextBucketStart = scaledBucketStart +
duration_.count();
401 *bucketStart = bucketStartMod + durationStart;
402 *nextBucketStart = nextBucketStartMod + durationStart;
405 template <
typename VT,
typename CT>
406 template <
typename Function>
413 typedef typename Duration::rep TimeInt;
420 TimeInt scaledTime = timeMod.count() * TimeInt(
buckets_.size());
421 size_t latestBucketIdx = size_t(scaledTime / duration_.count());
422 TimeInt scaledOffsetInBucket = scaledTime % duration_.count();
423 TimeInt scaledBucketStart = scaledTime - scaledOffsetInBucket;
424 TimeInt scaledNextBucketStart = scaledBucketStart + duration_.count();
429 size_t idx = latestBucketIdx;
441 scaledNextBucketStart = duration_.count();
443 scaledNextBucketStart += duration_.count();
457 bucketStart.time_since_epoch().count(),
459 bool ret = fn(
buckets_[idx], bucketStart, nextBucketStart);
464 if (idx == latestBucketIdx) {
478 template <
typename VT,
typename CT>
491 if (bucketStart <= latestTime_ && nextBucketStart >
latestTime_) {
492 nextBucketStart = latestTime_ +
Duration(1);
495 if (start <= bucketStart && end >= nextBucketStart) {
502 return input * (intervalEnd - intervalStart) /
503 (nextBucketStart - bucketStart);
506 template <
typename VT,
typename CT>
507 template <
typename Function>
517 if (start >= nextBucketStart) {
520 if (end <= bucketStart) {
523 bool ret = fn(bucket, bucketStart, nextBucketStart);
std::chrono::steady_clock::time_point now()
ValueType rangeAdjust(TimePoint bucketStart, TimePoint nextBucketStart, TimePoint start, TimePoint end, ValueType input) const
—— Concurrent Priority Queue Implementation ——
size_t updateBuckets(TimePoint now)
bool addValueAggregated(TimePoint now, const ValueType &total, uint64_t nsamples)
TimePoint getEarliestTimeNonEmpty() const
std::vector< Bucket > buckets_
void getBucketInfo(TimePoint time, size_t *bucketIdx, TimePoint *bucketStart, TimePoint *nextBucketStart) const
auto end(TestAdlIterable &instance)
const ValueType & sum() const
void add(const ValueType &s, uint64_t c)
BucketedTimeSeries(size_t numBuckets, Duration duration)
bool addValue(TimePoint now, const ValueType &val)
size_t getBucketIdx(TimePoint time) const
size_t update(TimePoint now)
detail::Bucket< ValueType > Bucket
TimePoint getEarliestTime() const
Future< Unit > times(const int n, F &&thunk)
typename Clock::time_point TimePoint
std::chrono::nanoseconds time()
typename Clock::duration Duration
void forEachBucket(Function fn) const