proxygen
ThreadCachedInts.h
Go to the documentation of this file.
1 /*
2  * Copyright 2017-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <folly/Function.h>
20 #include <folly/ThreadLocal.h>
22 
23 // This is unlike folly::ThreadCachedInt in that the full value
24 // is never rounded up globally and cached, it only supports readFull.
25 //
26 // folly/experimental/TLRefCount is similar, but does not support a
27 // waitForZero, and is not reset-able.
28 //
29 // Note that the RCU implementation is completely abstracted from the
30 // counter implementation, a rseq implementation can be dropped in
31 // if the kernel supports it.
32 
33 namespace folly {
34 
35 namespace detail {
36 
37 template <typename Tag>
39  std::atomic<int64_t> orphan_inc_[2] = {};
40  std::atomic<int64_t> orphan_dec_[2] = {};
42 
43  class Integer {
44  public:
47  : ints_(ints), inc_{}, dec_{}, cache_(ints->int_cache_) {}
48  std::atomic<int64_t> inc_[2];
49  std::atomic<int64_t> dec_[2];
50  Integer*& cache_; // reference to the cached ptr
52  // Increment counts must be set before decrement counts
53  ints_->orphan_inc_[0].fetch_add(
54  inc_[0].load(std::memory_order_relaxed), std::memory_order_relaxed);
55  ints_->orphan_inc_[1].fetch_add(
56  inc_[1].load(std::memory_order_relaxed), std::memory_order_relaxed);
58  ints_->orphan_dec_[0].fetch_add(
59  dec_[0].load(std::memory_order_relaxed), std::memory_order_relaxed);
60  ints_->orphan_dec_[1].fetch_add(
61  dec_[1].load(std::memory_order_relaxed), std::memory_order_relaxed);
62  ints_->waiting_.store(0, std::memory_order_release);
63  detail::futexWake(&ints_->waiting_);
64  // reset the cache_ on destructor so we can handle the delete/recreate
65  cache_ = nullptr;
66  }
67  };
69 
70  // Cache the int pointer in a threadlocal.
71  static thread_local Integer* int_cache_;
72 
73  void init() {
74  auto ret = new Integer(this);
75  cs_.reset(ret);
76  int_cache_ = ret;
77  }
78 
79  public:
81  if (!int_cache_) {
82  init();
83  }
84 
85  auto& c = int_cache_->inc_[epoch];
86  auto val = c.load(std::memory_order_relaxed);
87  c.store(val + 1, std::memory_order_relaxed);
88 
90  }
91 
94  if (!int_cache_) {
95  init();
96  }
97 
98  auto& c = int_cache_->dec_[epoch];
99  auto val = c.load(std::memory_order_relaxed);
100  c.store(val + 1, std::memory_order_relaxed);
101 
103  if (waiting_.load(std::memory_order_acquire)) {
104  waiting_.store(0, std::memory_order_release);
106  }
107  }
108 
110  int64_t full = -orphan_dec_[epoch].load(std::memory_order_relaxed);
111 
112  // Matches A - ensure all threads have seen new value of version,
113  // *and* that we see current values of counters in readFull()
114  //
115  // Note that in lock_shared if a reader is currently between the
116  // version load and counter increment, they may update the wrong
117  // epoch. However, this is ok - they started concurrently *after*
118  // any callbacks that will run, and therefore it is safe to run
119  // the callbacks.
121  for (auto& i : cs_.accessAllThreads()) {
122  full -= i.dec_[epoch].load(std::memory_order_relaxed);
123  }
124 
125  // Matches B - ensure that all increments are seen if decrements
126  // are seen. This is necessary because increment and decrement
127  // are allowed to happen on different threads.
129 
130  auto accessor = cs_.accessAllThreads();
131  for (auto& i : accessor) {
132  full += i.inc_[epoch].load(std::memory_order_relaxed);
133  }
134 
135  // orphan is read behind accessAllThreads lock
136  return full + orphan_inc_[epoch].load(std::memory_order_relaxed);
137  }
138 
139  void waitForZero(uint8_t phase) {
140  // Try reading before futex sleeping.
141  if (readFull(phase) == 0) {
142  return;
143  }
144 
145  while (true) {
146  waiting_.store(1, std::memory_order_release);
147  // Matches C. Ensure either decrement sees waiting_,
148  // or we see their decrement and can safely sleep.
150  if (readFull(phase) == 0) {
151  break;
152  }
154  }
155  waiting_.store(0, std::memory_order_relaxed);
156  }
157 
158  // We are guaranteed to be called while StaticMeta lock is still
159  // held because of ordering in AtForkList. We can therefore safely
160  // touch orphan_ and clear out all counts.
161  void resetAfterFork() {
162  if (int_cache_) {
163  int_cache_->dec_[0].store(0, std::memory_order_relaxed);
164  int_cache_->dec_[1].store(0, std::memory_order_relaxed);
165  int_cache_->inc_[0].store(0, std::memory_order_relaxed);
166  int_cache_->inc_[1].store(0, std::memory_order_relaxed);
167  }
168  orphan_inc_[0].store(0, std::memory_order_relaxed);
169  orphan_inc_[1].store(0, std::memory_order_relaxed);
170  orphan_dec_[0].store(0, std::memory_order_relaxed);
171  orphan_dec_[1].store(0, std::memory_order_relaxed);
172  }
173 };
174 
175 template <typename Tag>
176 thread_local typename detail::ThreadCachedInts<Tag>::Integer*
178 
179 } // namespace detail
180 } // namespace folly
FOLLY_ALWAYS_INLINE void decrement(uint8_t epoch)
constexpr Integer(ThreadCachedInts *ints) noexcept
void reset(T *newPtr=nullptr)
Definition: ThreadLocal.h:176
#define FOLLY_ALWAYS_INLINE
Definition: CPortability.h:151
static thread_local Integer * int_cache_
void asymmetricHeavyBarrier(AMBFlags flags)
Atom< std::uint32_t > Futex
Definition: Futex.h:51
double val
Definition: String.cpp:273
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
requires E e noexcept(noexcept(s.error(std::move(e))))
FutexResult futexWait(const Futex *futex, uint32_t expected, uint32_t waitMask)
Definition: Futex-inl.h:100
def load()
Definition: deadlock.py:441
folly::ThreadLocalPtr< Integer, Tag > cs_
FOLLY_ALWAYS_INLINE void increment(uint8_t epoch)
std::atomic< int64_t > orphan_dec_[2]
Accessor accessAllThreads() const
Definition: ThreadLocal.h:437
FOLLY_ALWAYS_INLINE void asymmetricLightBarrier()
char c
std::atomic< int64_t > orphan_inc_[2]
int64_t readFull(uint8_t epoch)
int futexWake(const Futex *futex, int count, uint32_t wakeMask)
Definition: Futex-inl.h:107