60 static double k_to_q(
double k,
double d) {
61 double k_div_d = k / d;
63 double base = 1 - k_div_d;
64 return 1 - 2 * base * base;
66 return 2 * k_div_d * k_div_d;
70 static double clamp(
double v,
double lo,
double hi) {
80 std::vector<Centroid> centroids,
96 auto sz = centroids.size();
97 std::array<TDigest, 2> digests{{
101 *
this = this->
merge(digests);
110 auto n = unsortedValues.
size();
113 std::unique_ptr<uint64_t[]> buckets{
new uint64_t[256 * 9]};
115 std::unique_ptr<double[]> tmp{
new double[n * 2]};
116 auto out = tmp.get() + n;
121 DCHECK(std::is_sorted(in, in + n));
127 if (sortedValues.
empty()) {
135 double maybeMin = *sortedValues.
begin();
136 double maybeMax = *(sortedValues.
end() - 1);
143 result.
min_ = maybeMin;
144 result.
max_ = maybeMax;
147 std::vector<Centroid> compressed;
154 auto it_sortedValues = sortedValues.
begin();
158 it_centroids->mean() < *it_sortedValues) {
159 cur = *it_centroids++;
161 cur =
Centroid(*it_sortedValues++, 1.0);
164 double weightSoFar = cur.
weight();
167 double sumsToMerge = 0;
168 double weightsToMerge = 0;
171 it_sortedValues != sortedValues.
end()) {
175 (it_sortedValues == sortedValues.
end() ||
176 it_centroids->mean() < *it_sortedValues)) {
177 next = *it_centroids++;
179 next =
Centroid(*it_sortedValues++, 1.0);
183 weightSoFar += next.
weight();
185 if (weightSoFar <= q_limit_times_count) {
186 sumsToMerge += nextSum;
187 weightsToMerge += next.
weight();
189 result.
sum_ += cur.
add(sumsToMerge, weightsToMerge);
192 compressed.push_back(cur);
197 result.
sum_ += cur.
add(sumsToMerge, weightsToMerge);
198 compressed.push_back(cur);
199 compressed.shrink_to_fit();
202 std::sort(compressed.begin(), compressed.end());
209 size_t nCentroids = 0;
210 for (
auto it = digests.
begin(); it != digests.
end(); it++) {
214 if (nCentroids == 0) {
218 std::vector<Centroid> centroids;
219 centroids.reserve(nCentroids);
221 std::vector<std::vector<Centroid>::iterator> starts;
222 starts.reserve(digests.
size());
228 double min = std::numeric_limits<double>::infinity();
229 double max = -std::numeric_limits<double>::infinity();
231 for (
auto it = digests.
begin(); it != digests.
end(); it++) {
232 starts.push_back(centroids.end());
233 double curCount = it->count();
235 DCHECK(!std::isnan(it->min_));
236 DCHECK(!std::isnan(it->max_));
240 for (
const auto& centroid : it->centroids_) {
241 centroids.push_back(centroid);
246 for (
size_t digestsPerBlock = 1; digestsPerBlock < starts.size();
247 digestsPerBlock *= 2) {
250 for (
size_t i = 0;
i < starts.size();
i += (digestsPerBlock * 2)) {
253 if (
i + digestsPerBlock < starts.size()) {
255 auto middle = starts[
i + digestsPerBlock];
260 std::vector<Centroid>::iterator last =
261 (
i + (digestsPerBlock * 2) < starts.size())
262 ? *(starts.begin() +
i + 2 * digestsPerBlock)
264 std::inplace_merge(
first, middle, last);
269 DCHECK(std::is_sorted(centroids.begin(), centroids.end()));
274 std::vector<Centroid> compressed;
275 compressed.reserve(maxSize);
278 double q_limit_times_count =
k_to_q(k_limit, maxSize) *
count;
281 double weightSoFar = cur.
weight();
282 double sumsToMerge = 0;
283 double weightsToMerge = 0;
284 for (
auto it = centroids.begin() + 1; it != centroids.end(); ++it) {
285 weightSoFar += it->weight();
286 if (weightSoFar <= q_limit_times_count) {
287 sumsToMerge += it->mean() * it->weight();
288 weightsToMerge += it->weight();
290 result.
sum_ += cur.
add(sumsToMerge, weightsToMerge);
293 compressed.push_back(cur);
294 q_limit_times_count =
k_to_q(k_limit++, maxSize) *
count;
298 result.
sum_ += cur.
add(sumsToMerge, weightsToMerge);
299 compressed.push_back(cur);
300 compressed.shrink_to_fit();
303 std::sort(compressed.begin(), compressed.end());
329 pos = std::distance(rit,
centroids_.rend()) - 1;
340 if (rank < t + it->weight()) {
365 ((rank -
t) /
centroids_[pos].weight() - 0.5) * delta;
370 sum += (mean_ * weight_);
372 mean_ = sum / weight_;
std::atomic< int64_t > sum(0)
static double k_to_q(double k, double d)
constexpr detail::Map< Move > move
constexpr size_type size() const
double estimateQuantile(double q) const
void double_radix_sort(uint64_t n, uint64_t *buckets, double *in, double *tmp)
—— Concurrent Priority Queue Implementation ——
double add(double sum, double weight)
constexpr std::decay< T >::type copy(T &&value) noexcept(noexcept(typename std::decay< T >::type(std::forward< T >(value))))
constexpr bool empty() const
std::vector< Centroid > centroids_
TDigest(size_t maxSize=100)
constexpr Iter end() const
constexpr Iter begin() const
static double clamp(double v, double lo, double hi)
TDigest merge(presorted_t, Range< const double * > sortedValues) const
constexpr presorted_t presorted
uint64_t value(const typename LockFreeRingBuffer< T, Atom >::Cursor &rbcursor)
constexpr detail::First first