30 template <
typename DigestT>
32 :
bufferSize_(bufferSize), digestSize_(digestSize) {
37 template <
typename DigestT>
39 std::vector<std::vector<double>> valuesVec;
40 std::vector<std::unique_ptr<DigestT>> digestPtrs;
46 valuesVec.push_back(
std::move(cpuLocalBuffer.buffer));
47 if (cpuLocalBuffer.digest) {
48 digestPtrs.push_back(
std::move(cpuLocalBuffer.digest));
52 std::vector<DigestT> digests;
53 for (
auto& digestPtr : digestPtrs) {
58 for (
const auto&
vec : valuesVec) {
62 std::vector<double>
values;
63 values.reserve(count);
64 for (
const auto&
vec : valuesVec) {
65 values.insert(values.end(),
vec.begin(),
vec.end());
68 digests.push_back(digest.merge(values));
73 template <
typename DigestT>
77 std::unique_lock<SpinLock>
g(cpuLocalBuf->mutex, std::try_to_lock_t());
81 g = std::unique_lock<SpinLock>(cpuLocalBuf->mutex);
83 cpuLocalBuf->buffer.push_back(value);
85 if (!cpuLocalBuf->digest) {
86 cpuLocalBuf->digest = std::make_unique<DigestT>(
digestSize_);
88 *cpuLocalBuf->digest = cpuLocalBuf->digest->merge(cpuLocalBuf->buffer);
89 cpuLocalBuf->buffer.clear();
static const CacheLocality & system()
constexpr detail::Map< Move > move
std::vector< CpuLocalBuffer > cpuLocalBuffers_
—— Concurrent Priority Queue Implementation ——
void append(double value)
static size_t current(size_t numStripes)
uint64_t value(const typename LockFreeRingBuffer< T, Atom >::Cursor &rbcursor)
static FOLLY_TLS uint32_t tls_lastCpuBufferSlot
void merge(unsigned int iters, size_t maxSize, size_t bufSize)
DigestBuilder(size_t bufferSize, size_t digestSize)
std::vector< int > values(1'000)