proxygen
DigestBuilder-defs.h
Go to the documentation of this file.
1 /*
2  * Copyright 2018-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
20 
21 #include <algorithm>
22 
24 
25 namespace folly {
26 namespace detail {
27 
28 static FOLLY_TLS uint32_t tls_lastCpuBufferSlot = 0;
29 
30 template <typename DigestT>
31 DigestBuilder<DigestT>::DigestBuilder(size_t bufferSize, size_t digestSize)
32  : bufferSize_(bufferSize), digestSize_(digestSize) {
33  auto& cl = CacheLocality::system();
34  cpuLocalBuffers_.resize(cl.numCachesByLevel[0]);
35 }
36 
37 template <typename DigestT>
39  std::vector<std::vector<double>> valuesVec;
40  std::vector<std::unique_ptr<DigestT>> digestPtrs;
41  valuesVec.reserve(cpuLocalBuffers_.size());
42  digestPtrs.reserve(cpuLocalBuffers_.size());
43 
44  for (auto& cpuLocalBuffer : cpuLocalBuffers_) {
45  SpinLockGuard g(cpuLocalBuffer.mutex);
46  valuesVec.push_back(std::move(cpuLocalBuffer.buffer));
47  if (cpuLocalBuffer.digest) {
48  digestPtrs.push_back(std::move(cpuLocalBuffer.digest));
49  }
50  }
51 
52  std::vector<DigestT> digests;
53  for (auto& digestPtr : digestPtrs) {
54  digests.push_back(std::move(*digestPtr));
55  }
56 
57  size_t count = 0;
58  for (const auto& vec : valuesVec) {
59  count += vec.size();
60  }
61  if (count) {
62  std::vector<double> values;
63  values.reserve(count);
64  for (const auto& vec : valuesVec) {
65  values.insert(values.end(), vec.begin(), vec.end());
66  }
67  DigestT digest(digestSize_);
68  digests.push_back(digest.merge(values));
69  }
70  return DigestT::merge(digests);
71 }
72 
73 template <typename DigestT>
75  auto& which = tls_lastCpuBufferSlot;
76  auto cpuLocalBuf = &cpuLocalBuffers_[which];
77  std::unique_lock<SpinLock> g(cpuLocalBuf->mutex, std::try_to_lock_t());
78  if (!g.owns_lock()) {
80  cpuLocalBuf = &cpuLocalBuffers_[which];
81  g = std::unique_lock<SpinLock>(cpuLocalBuf->mutex);
82  }
83  cpuLocalBuf->buffer.push_back(value);
84  if (cpuLocalBuf->buffer.size() == bufferSize_) {
85  if (!cpuLocalBuf->digest) {
86  cpuLocalBuf->digest = std::make_unique<DigestT>(digestSize_);
87  }
88  *cpuLocalBuf->digest = cpuLocalBuf->digest->merge(cpuLocalBuf->buffer);
89  cpuLocalBuf->buffer.clear();
90  }
91 }
92 
93 } // namespace detail
94 } // namespace folly
static const CacheLocality & system()
const size_t bufferSize_
Definition: Random.cpp:104
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
std::vector< CpuLocalBuffer > cpuLocalBuffers_
Definition: DigestBuilder.h:63
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
Definition: Traits.h:588
int * count
g_t g(f_t)
static size_t current(size_t numStripes)
uint64_t value(const typename LockFreeRingBuffer< T, Atom >::Cursor &rbcursor)
static FOLLY_TLS uint32_t tls_lastCpuBufferSlot
void merge(unsigned int iters, size_t maxSize, size_t bufSize)
DigestBuilder(size_t bufferSize, size_t digestSize)
std::vector< int > values(1'000)