27 #include <boost/noncopyable.hpp> 38 template <
class IntT,
class Tag = IntT>
49 cache =
new IntCache(*
this);
52 cache->increment(inc);
58 return target_.load(std::memory_order_relaxed);
66 const auto accessor =
cache_.accessAllThreads();
68 for (
const auto& cache : accessor) {
69 if (!cache.reset_.load(std::memory_order_acquire)) {
70 ret += cache.val_.load(std::memory_order_relaxed);
78 return target_.exchange(0, std::memory_order_release);
88 auto accessor =
cache_.accessAllThreads();
90 for (
auto& cache : accessor) {
91 if (!cache.reset_.load(std::memory_order_acquire)) {
92 ret += cache.val_.load(std::memory_order_relaxed);
93 cache.reset_.store(
true, std::memory_order_release);
100 cacheSize_.store(newSize, std::memory_order_release);
128 void set(IntT newVal) {
129 for (
auto& cache :
cache_.accessAllThreads()) {
130 cache.reset_.store(
true, std::memory_order_release);
132 target_.store(newVal, std::memory_order_release);
144 mutable std::atomic<IntT>
val_;
149 : parent_(&parent), val_(0), numUpdates_(0), reset_(false) {}
152 if (
LIKELY(!reset_.load(std::memory_order_acquire))) {
156 val_.load(std::memory_order_relaxed) + inc,
157 std::memory_order_release);
159 val_.store(inc, std::memory_order_relaxed);
160 reset_.store(
false, std::memory_order_release);
165 parent_->
cacheSize_.load(std::memory_order_acquire))) {
171 parent_->
target_.fetch_add(val_, std::memory_order_release);
172 val_.store(0, std::memory_order_release);
std::atomic< uint32_t > cacheSize_
void setCacheSize(uint32_t newSize)
uint32_t getCacheSize() const
ThreadCachedInt & operator++()
ThreadCachedInt & operator-=(IntT inc)
—— Concurrent Priority Queue Implementation ——
ThreadCachedInt * parent_
std::atomic< IntT > target_
ThreadCachedInt & operator+=(IntT inc)
ThreadCachedInt & operator--()
std::atomic< bool > reset_
ThreadCachedInt(IntT initialVal=0, uint32_t cacheSize=1000)
ThreadLocalPtr< IntCache, Tag, AccessModeStrict > cache_
IntCache(ThreadCachedInt &parent)
folly::Function< void()> parent