proxygen
BufferedStat-defs.h
Go to the documentation of this file.
1 /*
2  * Copyright 2012-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
20 
23 
24 namespace folly {
25 namespace detail {
26 
27 template <typename DigestT, typename ClockT>
29  typename ClockT::duration bufferDuration,
30  size_t bufferSize,
31  size_t digestSize)
32  : bufferDuration_(bufferDuration), digestBuilder_(bufferSize, digestSize) {
33  expiry_.store(
34  TimePointHolder(roundUp(ClockT::now())), std::memory_order_relaxed);
35 }
36 
37 template <typename DigestT, typename ClockT>
39  if (UNLIKELY(now > expiry_.load(std::memory_order_relaxed).tp)) {
40  std::unique_lock<SharedMutex> g(mutex_, std::try_to_lock_t());
41  if (g.owns_lock()) {
43  }
44  }
45  digestBuilder_.append(value);
46 }
47 
48 template <typename DigestT, typename ClockT>
51  auto remainder = t.time_since_epoch() % bufferDuration_;
52  if (remainder.count() != 0) {
53  return t + bufferDuration_ - remainder;
54  }
55  return t;
56 }
57 
58 template <typename DigestT, typename ClockT>
59 std::unique_lock<SharedMutex> BufferedStat<DigestT, ClockT>::updateIfExpired(
60  TimePoint now) {
61  std::unique_lock<SharedMutex> g(mutex_);
63  return g;
64 }
65 
66 template <typename DigestT, typename ClockT>
68  std::unique_lock<SharedMutex> g(mutex_);
70 }
71 
72 template <typename DigestT, typename ClockT>
74  TimePoint now,
75  const std::unique_lock<SharedMutex>& g,
76  UpdateMode updateMode) {
77  DCHECK(g.owns_lock());
78  // Check that no other thread has performed the slide after the check
79  auto oldExpiry = expiry_.load(std::memory_order_relaxed).tp;
80  if (now > oldExpiry || updateMode == UpdateMode::Now) {
81  now = roundUp(now);
82  expiry_.store(TimePointHolder(now), std::memory_order_relaxed);
83  onNewDigest(digestBuilder_.build(), now, oldExpiry, g);
84  }
85 }
86 
87 template <typename DigestT, typename ClockT>
89  typename ClockT::duration bufferDuration,
90  size_t bufferSize,
91  size_t digestSize)
92  : BufferedStat<DigestT, ClockT>(bufferDuration, bufferSize, digestSize),
93  digest_(digestSize) {}
94 
95 template <typename DigestT, typename ClockT>
97  auto g = this->updateIfExpired(now);
98  return digest_;
99 }
100 
101 template <typename DigestT, typename ClockT>
103  DigestT digest,
104  TimePoint /*newExpiry*/,
105  TimePoint /*oldExpiry*/,
106  const std::unique_lock<SharedMutex>& /*g*/) {
107  std::array<DigestT, 2> a{{digest_, std::move(digest)}};
109 }
110 
111 template <typename DigestT, typename ClockT>
113  size_t nBuckets,
114  typename ClockT::duration bufferDuration,
115  size_t bufferSize,
116  size_t digestSize)
117  : BufferedStat<DigestT, ClockT>(bufferDuration, bufferSize, digestSize),
118  slidingWindow_([=]() { return DigestT(digestSize); }, nBuckets) {}
119 
120 template <typename DigestT, typename ClockT>
122  TimePoint now) {
123  std::vector<DigestT> digests;
124  {
125  auto g = this->updateIfExpired(now);
126  digests = slidingWindow_.get();
127  }
128  digests.erase(
129  std::remove_if(
130  digests.begin(),
131  digests.end(),
132  [](const DigestT& digest) { return digest.empty(); }),
133  digests.end());
134  return digests;
135 }
136 
137 template <typename DigestT, typename ClockT>
139  DigestT digest,
140  TimePoint newExpiry,
141  TimePoint oldExpiry,
142  const std::unique_lock<SharedMutex>& /*g*/) {
143  if (newExpiry > oldExpiry) {
144  auto diff = newExpiry - oldExpiry;
146  diff -= this->bufferDuration_;
147  slidingWindow_.set(diff / this->bufferDuration_, std::move(digest));
148  } else {
149  // just update current window
150  std::array<DigestT, 2> a{{slidingWindow_.front(), std::move(digest)}};
151  slidingWindow_.set(0 /* current window */, DigestT::merge(a));
152  }
153 }
154 
155 } // namespace detail
156 } // namespace folly
void append(double value, TimePoint now=ClockT::now())
void onNewDigest(DigestT digest, TimePoint newExpiry, TimePoint oldExpiry, const std::unique_lock< SharedMutex > &g) final
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
std::chrono::steady_clock::time_point now()
std::unique_lock< SharedMutex > updateIfExpired(TimePoint now)
const ClockT::duration bufferDuration_
Definition: BufferedStat.h:61
virtual void onNewDigest(DigestT digest, TimePoint newExpiry, TimePoint oldExpiry, const std::unique_lock< SharedMutex > &g)=0
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
std::atomic< TimePointHolder > expiry_
Definition: BufferedStat.h:62
void set(size_t idx, BucketT bucket)
BufferedSlidingWindow(size_t nBuckets, typename ClockT::duration bufferDuration, size_t bufferSize, size_t digestSize)
DigestT get(TimePoint now=ClockT::now())
std::vector< DigestT > get(TimePoint now=ClockT::now())
void slide(size_t nBuckets)
std::vector< BucketT > get() const
char a
SlidingWindow< DigestT > slidingWindow_
Definition: BufferedStat.h:145
BufferedDigest(typename ClockT::duration bufferDuration, size_t bufferSize, size_t digestSize)
uint64_t diff(uint64_t a, uint64_t b)
Definition: FutexTest.cpp:135
void doUpdate(TimePoint now, const std::unique_lock< SharedMutex > &g, UpdateMode updateMode)
g_t g(f_t)
void onNewDigest(DigestT digest, TimePoint newExpiry, TimePoint oldExpiry, const std::unique_lock< SharedMutex > &g) final
uint64_t value(const typename LockFreeRingBuffer< T, Atom >::Cursor &rbcursor)
DigestBuilder< DigestT > digestBuilder_
Definition: BufferedStat.h:78
#define UNLIKELY(x)
Definition: Likely.h:48
void merge(unsigned int iters, size_t maxSize, size_t bufSize)
TimePoint roundUp(TimePoint t)
typename ClockT::time_point TimePoint
Definition: BufferedStat.h:34