proxygen
TDigest.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2012-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <folly/stats/TDigest.h>
19 
20 #include <algorithm>
21 #include <limits>
22 
23 namespace folly {
24 
25 /*
26  * A good biased scaling function has the following properties:
27  * - The value of the function k(0, delta) = 0, and k(1, delta) = delta.
28  * This is a requirement for any t-digest function.
29  * - The limit of the derivative of the function dk/dq at 0 is inf, and at
30  * 1 is inf. This provides bias to improve accuracy at the tails.
31  * - For any q <= 0.5, dk/dq(q) = dk/dq(1-q). This ensures that the accuracy
32  * of upper and lower quantiles are equivalent.
33  *
34  * The scaling function used here is...
35  * k(q, d) = (IF q >= 0.5, d - d * sqrt(2 - 2q) / 2, d * sqrt(2q) / 2)
36  *
37  * k(0, d) = 0
38  * k(1, d) = d
39  *
40  * dk/dq = (IF q >= 0.5, d / sqrt(2-2q), d / sqrt(2q))
41  * limit q->1 dk/dq = inf
42  * limit q->0 dk/dq = inf
43  *
44  * When plotted, the derivative function is symmetric, centered at q=0.5.
45  *
46  * Note that FMA has been tested here, but benchmarks have not shown it to be a
47  * performance improvement.
48  */
49 
50 /*
51  * q_to_k is unused but left here as a comment for completeness.
52  * double q_to_k(double q, double d) {
53  * if (q >= 0.5) {
54  * return d - d * std::sqrt(0.5 - 0.5 * q);
55  * }
56  * return d * std::sqrt(0.5 * q);
57  * }
58  */
59 
60 static double k_to_q(double k, double d) {
61  double k_div_d = k / d;
62  if (k_div_d >= 0.5) {
63  double base = 1 - k_div_d;
64  return 1 - 2 * base * base;
65  } else {
66  return 2 * k_div_d * k_div_d;
67  }
68 }
69 
70 static double clamp(double v, double lo, double hi) {
71  if (v > hi) {
72  return hi;
73  } else if (v < lo) {
74  return lo;
75  }
76  return v;
77 }
78 
80  std::vector<Centroid> centroids,
81  double sum,
82  double count,
83  double max_val,
84  double min_val,
85  size_t maxSize)
86  : maxSize_(maxSize),
87  sum_(sum),
88  count_(count),
89  max_(max_val),
90  min_(min_val) {
91  if (centroids.size() <= maxSize_) {
92  centroids_ = std::move(centroids);
93  } else {
94  // Number of centroids is greater than maxSize, we need to compress them
95  // When merging, resulting digest takes the maxSize of the first digest
96  auto sz = centroids.size();
97  std::array<TDigest, 2> digests{{
99  TDigest(std::move(centroids), sum_, count_, max_, min_, sz),
100  }};
101  *this = this->merge(digests);
102  }
103 }
104 
105 // Merge unsorted values by first sorting them. Use radix sort if
106 // possible. This implementation puts all additional memory in the
107 // heap, so that if called from fiber context we do not smash the
108 // stack. Otherwise it is very similar to boost::spreadsort.
110  auto n = unsortedValues.size();
111 
112  // We require 256 buckets per byte level, plus one count array we can reuse.
113  std::unique_ptr<uint64_t[]> buckets{new uint64_t[256 * 9]};
114  // Allocate input and tmp array
115  std::unique_ptr<double[]> tmp{new double[n * 2]};
116  auto out = tmp.get() + n;
117  auto in = tmp.get();
118  std::copy(unsortedValues.begin(), unsortedValues.end(), in);
119 
120  detail::double_radix_sort(n, buckets.get(), in, out);
121  DCHECK(std::is_sorted(in, in + n));
122 
123  return merge(presorted, Range<const double*>(in, in + n));
124 }
125 
127  if (sortedValues.empty()) {
128  return *this;
129  }
130 
131  TDigest result(maxSize_);
132 
133  result.count_ = count_ + sortedValues.size();
134 
135  double maybeMin = *sortedValues.begin();
136  double maybeMax = *(sortedValues.end() - 1);
137  if (count_ > 0) {
138  // We know that min_ and max_ are numbers
139  result.min_ = std::min(min_, maybeMin);
140  result.max_ = std::max(max_, maybeMax);
141  } else {
142  // We know that min_ and max_ are NaN.
143  result.min_ = maybeMin;
144  result.max_ = maybeMax;
145  }
146 
147  std::vector<Centroid> compressed;
148  compressed.reserve(maxSize_);
149 
150  double k_limit = 1;
151  double q_limit_times_count = k_to_q(k_limit++, maxSize_) * result.count_;
152 
153  auto it_centroids = centroids_.begin();
154  auto it_sortedValues = sortedValues.begin();
155 
156  Centroid cur;
157  if (it_centroids != centroids_.end() &&
158  it_centroids->mean() < *it_sortedValues) {
159  cur = *it_centroids++;
160  } else {
161  cur = Centroid(*it_sortedValues++, 1.0);
162  }
163 
164  double weightSoFar = cur.weight();
165 
166  // Keep track of sums along the way to reduce expensive floating points
167  double sumsToMerge = 0;
168  double weightsToMerge = 0;
169 
170  while (it_centroids != centroids_.end() ||
171  it_sortedValues != sortedValues.end()) {
172  Centroid next;
173 
174  if (it_centroids != centroids_.end() &&
175  (it_sortedValues == sortedValues.end() ||
176  it_centroids->mean() < *it_sortedValues)) {
177  next = *it_centroids++;
178  } else {
179  next = Centroid(*it_sortedValues++, 1.0);
180  }
181 
182  double nextSum = next.mean() * next.weight();
183  weightSoFar += next.weight();
184 
185  if (weightSoFar <= q_limit_times_count) {
186  sumsToMerge += nextSum;
187  weightsToMerge += next.weight();
188  } else {
189  result.sum_ += cur.add(sumsToMerge, weightsToMerge);
190  sumsToMerge = 0;
191  weightsToMerge = 0;
192  compressed.push_back(cur);
193  q_limit_times_count = k_to_q(k_limit++, maxSize_) * result.count_;
194  cur = next;
195  }
196  }
197  result.sum_ += cur.add(sumsToMerge, weightsToMerge);
198  compressed.push_back(cur);
199  compressed.shrink_to_fit();
200 
201  // Deal with floating point precision
202  std::sort(compressed.begin(), compressed.end());
203 
204  result.centroids_ = std::move(compressed);
205  return result;
206 }
207 
209  size_t nCentroids = 0;
210  for (auto it = digests.begin(); it != digests.end(); it++) {
211  nCentroids += it->centroids_.size();
212  }
213 
214  if (nCentroids == 0) {
215  return TDigest();
216  }
217 
218  std::vector<Centroid> centroids;
219  centroids.reserve(nCentroids);
220 
221  std::vector<std::vector<Centroid>::iterator> starts;
222  starts.reserve(digests.size());
223 
224  double count = 0;
225 
226  // We can safely use these limits to avoid isnan checks below because we know
227  // nCentroids > 0, so at least one TDigest has a min and max.
228  double min = std::numeric_limits<double>::infinity();
229  double max = -std::numeric_limits<double>::infinity();
230 
231  for (auto it = digests.begin(); it != digests.end(); it++) {
232  starts.push_back(centroids.end());
233  double curCount = it->count();
234  if (curCount > 0) {
235  DCHECK(!std::isnan(it->min_));
236  DCHECK(!std::isnan(it->max_));
237  min = std::min(min, it->min_);
238  max = std::max(max, it->max_);
239  count += curCount;
240  for (const auto& centroid : it->centroids_) {
241  centroids.push_back(centroid);
242  }
243  }
244  }
245 
246  for (size_t digestsPerBlock = 1; digestsPerBlock < starts.size();
247  digestsPerBlock *= 2) {
248  // Each sorted block is digestPerBlock digests big. For each step, try to
249  // merge two blocks together.
250  for (size_t i = 0; i < starts.size(); i += (digestsPerBlock * 2)) {
251  // It is possible that this block is incomplete (less than digestsPerBlock
252  // big). In that case, the rest of the block is sorted and leave it alone
253  if (i + digestsPerBlock < starts.size()) {
254  auto first = starts[i];
255  auto middle = starts[i + digestsPerBlock];
256 
257  // It is possible that the next block is incomplete (less than
258  // digestsPerBlock big). In that case, merge to end. Otherwise, merge to
259  // the end of that block.
260  std::vector<Centroid>::iterator last =
261  (i + (digestsPerBlock * 2) < starts.size())
262  ? *(starts.begin() + i + 2 * digestsPerBlock)
263  : centroids.end();
264  std::inplace_merge(first, middle, last);
265  }
266  }
267  }
268 
269  DCHECK(std::is_sorted(centroids.begin(), centroids.end()));
270 
271  size_t maxSize = digests.begin()->maxSize_;
272  TDigest result(maxSize);
273 
274  std::vector<Centroid> compressed;
275  compressed.reserve(maxSize);
276 
277  double k_limit = 1;
278  double q_limit_times_count = k_to_q(k_limit, maxSize) * count;
279 
280  Centroid cur = centroids.front();
281  double weightSoFar = cur.weight();
282  double sumsToMerge = 0;
283  double weightsToMerge = 0;
284  for (auto it = centroids.begin() + 1; it != centroids.end(); ++it) {
285  weightSoFar += it->weight();
286  if (weightSoFar <= q_limit_times_count) {
287  sumsToMerge += it->mean() * it->weight();
288  weightsToMerge += it->weight();
289  } else {
290  result.sum_ += cur.add(sumsToMerge, weightsToMerge);
291  sumsToMerge = 0;
292  weightsToMerge = 0;
293  compressed.push_back(cur);
294  q_limit_times_count = k_to_q(k_limit++, maxSize) * count;
295  cur = *it;
296  }
297  }
298  result.sum_ += cur.add(sumsToMerge, weightsToMerge);
299  compressed.push_back(cur);
300  compressed.shrink_to_fit();
301 
302  // Deal with floating point precision
303  std::sort(compressed.begin(), compressed.end());
304 
305  result.count_ = count;
306  result.min_ = min;
307  result.max_ = max;
308  result.centroids_ = std::move(compressed);
309  return result;
310 }
311 
312 double TDigest::estimateQuantile(double q) const {
313  if (centroids_.empty()) {
314  return 0.0;
315  }
316  double rank = q * count_;
317 
318  size_t pos;
319  double t;
320  if (q > 0.5) {
321  if (q >= 1.0) {
322  return max_;
323  }
324  pos = 0;
325  t = count_;
326  for (auto rit = centroids_.rbegin(); rit != centroids_.rend(); ++rit) {
327  t -= rit->weight();
328  if (rank >= t) {
329  pos = std::distance(rit, centroids_.rend()) - 1;
330  break;
331  }
332  }
333  } else {
334  if (q <= 0.0) {
335  return min_;
336  }
337  pos = centroids_.size() - 1;
338  t = 0;
339  for (auto it = centroids_.begin(); it != centroids_.end(); ++it) {
340  if (rank < t + it->weight()) {
341  pos = std::distance(centroids_.begin(), it);
342  break;
343  }
344  t += it->weight();
345  }
346  }
347 
348  double delta = 0;
349  double min = min_;
350  double max = max_;
351  if (centroids_.size() > 1) {
352  if (pos == 0) {
353  delta = centroids_[pos + 1].mean() - centroids_[pos].mean();
354  max = centroids_[pos + 1].mean();
355  } else if (pos == centroids_.size() - 1) {
356  delta = centroids_[pos].mean() - centroids_[pos - 1].mean();
357  min = centroids_[pos - 1].mean();
358  } else {
359  delta = (centroids_[pos + 1].mean() - centroids_[pos - 1].mean()) / 2;
360  min = centroids_[pos - 1].mean();
361  max = centroids_[pos + 1].mean();
362  }
363  }
364  auto value = centroids_[pos].mean() +
365  ((rank - t) / centroids_[pos].weight() - 0.5) * delta;
366  return clamp(value, min, max);
367 }
368 
369 double TDigest::Centroid::add(double sum, double weight) {
370  sum += (mean_ * weight_);
371  weight_ += weight;
372  mean_ = sum / weight_;
373  return sum;
374 }
375 
376 } // namespace folly
std::atomic< int64_t > sum(0)
auto v
double weight() const
Definition: TDigest.h:63
LogLevel max
Definition: LogLevel.cpp:31
static double k_to_q(double k, double d)
Definition: TDigest.cpp:60
double sum_
Definition: TDigest.h:145
double mean() const
Definition: TDigest.h:59
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
constexpr size_type size() const
Definition: Range.h:431
double estimateQuantile(double q) const
Definition: TDigest.cpp:312
void double_radix_sort(uint64_t n, uint64_t *buckets, double *in, double *tmp)
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
double sum() const
Definition: TDigest.h:114
const int min_
double count() const
Definition: TDigest.h:118
const int max_
double add(double sum, double weight)
Definition: TDigest.cpp:369
constexpr std::decay< T >::type copy(T &&value) noexcept(noexcept(typename std::decay< T >::type(std::forward< T >(value))))
Definition: Utility.h:72
constexpr bool empty() const
Definition: Range.h:443
LogLevel min
Definition: LogLevel.cpp:30
size_t maxSize() const
Definition: TDigest.h:138
size_t maxSize_
Definition: TDigest.h:144
std::vector< Centroid > centroids_
Definition: TDigest.h:143
TDigest(size_t maxSize=100)
Definition: TDigest.h:81
constexpr Iter end() const
Definition: Range.h:455
int * count
constexpr Iter begin() const
Definition: Range.h:452
static double clamp(double v, double lo, double hi)
Definition: TDigest.cpp:70
TDigest merge(presorted_t, Range< const double * > sortedValues) const
Definition: TDigest.cpp:126
double min_
Definition: TDigest.h:148
constexpr presorted_t presorted
Definition: Utility.h:311
uint64_t value(const typename LockFreeRingBuffer< T, Atom >::Cursor &rbcursor)
double count_
Definition: TDigest.h:146
double max() const
Definition: TDigest.h:126
KeyT k
double min() const
Definition: TDigest.h:122
double max_
Definition: TDigest.h:147
constexpr detail::First first
Definition: Base-inl.h:2553
def next(obj)
Definition: ast.py:58