27 template <
typename DigestT,
typename ClockT>
29 typename ClockT::duration bufferDuration,
32 : bufferDuration_(bufferDuration), digestBuilder_(bufferSize, digestSize) {
37 template <
typename DigestT,
typename ClockT>
40 std::unique_lock<SharedMutex>
g(
mutex_, std::try_to_lock_t());
48 template <
typename DigestT,
typename ClockT>
52 if (remainder.count() != 0) {
58 template <
typename DigestT,
typename ClockT>
61 std::unique_lock<SharedMutex>
g(
mutex_);
66 template <
typename DigestT,
typename ClockT>
68 std::unique_lock<SharedMutex>
g(
mutex_);
72 template <
typename DigestT,
typename ClockT>
75 const std::unique_lock<SharedMutex>&
g,
77 DCHECK(g.owns_lock());
79 auto oldExpiry =
expiry_.load(std::memory_order_relaxed).tp;
87 template <
typename DigestT,
typename ClockT>
89 typename ClockT::duration bufferDuration,
92 :
BufferedStat<DigestT, ClockT>(bufferDuration, bufferSize, digestSize),
93 digest_(digestSize) {}
95 template <
typename DigestT,
typename ClockT>
101 template <
typename DigestT,
typename ClockT>
106 const std::unique_lock<SharedMutex>& ) {
111 template <
typename DigestT,
typename ClockT>
114 typename ClockT::duration bufferDuration,
117 :
BufferedStat<DigestT, ClockT>(bufferDuration, bufferSize, digestSize),
118 slidingWindow_([=]() {
return DigestT(digestSize); }, nBuckets) {}
120 template <
typename DigestT,
typename ClockT>
123 std::vector<DigestT> digests;
132 [](
const DigestT& digest) {
return digest.empty(); }),
137 template <
typename DigestT,
typename ClockT>
142 const std::unique_lock<SharedMutex>& ) {
143 if (newExpiry > oldExpiry) {
144 auto diff = newExpiry - oldExpiry;
void append(double value, TimePoint now=ClockT::now())
void onNewDigest(DigestT digest, TimePoint newExpiry, TimePoint oldExpiry, const std::unique_lock< SharedMutex > &g) final
constexpr detail::Map< Move > move
std::chrono::steady_clock::time_point now()
std::unique_lock< SharedMutex > updateIfExpired(TimePoint now)
const ClockT::duration bufferDuration_
virtual void onNewDigest(DigestT digest, TimePoint newExpiry, TimePoint oldExpiry, const std::unique_lock< SharedMutex > &g)=0
—— Concurrent Priority Queue Implementation ——
std::atomic< TimePointHolder > expiry_
void set(size_t idx, BucketT bucket)
BufferedSlidingWindow(size_t nBuckets, typename ClockT::duration bufferDuration, size_t bufferSize, size_t digestSize)
DigestT get(TimePoint now=ClockT::now())
std::vector< DigestT > get(TimePoint now=ClockT::now())
void slide(size_t nBuckets)
std::vector< BucketT > get() const
SlidingWindow< DigestT > slidingWindow_
BufferedDigest(typename ClockT::duration bufferDuration, size_t bufferSize, size_t digestSize)
uint64_t diff(uint64_t a, uint64_t b)
void doUpdate(TimePoint now, const std::unique_lock< SharedMutex > &g, UpdateMode updateMode)
void onNewDigest(DigestT digest, TimePoint newExpiry, TimePoint oldExpiry, const std::unique_lock< SharedMutex > &g) final
uint64_t value(const typename LockFreeRingBuffer< T, Atom >::Cursor &rbcursor)
DigestBuilder< DigestT > digestBuilder_
void merge(unsigned int iters, size_t maxSize, size_t bufSize)
TimePoint roundUp(TimePoint t)
typename ClockT::time_point TimePoint