proxygen
ParkingLot.h
Go to the documentation of this file.
1 /*
2  * Copyright 2017-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17 
18 #include <atomic>
19 #include <condition_variable>
20 #include <mutex>
21 
22 #include <folly/Hash.h>
23 #include <folly/Indestructible.h>
24 #include <folly/Optional.h>
25 #include <folly/Portability.h>
26 #include <folly/Unit.h>
27 #include <folly/lang/SafeAssert.h>
28 
29 namespace folly {
30 
31 namespace parking_lot_detail {
32 
33 struct WaitNodeBase {
34  const uint64_t key_;
36  WaitNodeBase* next_{nullptr};
37  WaitNodeBase* prev_{nullptr};
38 
39  // tricky: hold both bucket and node mutex to write, either to read
40  bool signaled_;
42  std::condition_variable cond_;
43 
45  : key_(key), lotid_(lotid), signaled_(false) {}
46 
47  template <typename Clock, typename Duration>
48  std::cv_status wait(std::chrono::time_point<Clock, Duration> deadline) {
49  std::cv_status status = std::cv_status::no_timeout;
50  std::unique_lock<std::mutex> nodeLock(mutex_);
51  while (!signaled_ && status != std::cv_status::timeout) {
53  status = cond_.wait_until(nodeLock, deadline);
54  } else {
55  cond_.wait(nodeLock);
56  }
57  }
58  return status;
59  }
60 
61  void wake() {
62  std::lock_guard<std::mutex> nodeLock(mutex_);
63  signaled_ = true;
64  cond_.notify_one();
65  }
66 
67  bool signaled() {
68  return signaled_;
69  }
70 };
71 
72 extern std::atomic<uint64_t> idallocator;
73 
74 // Our emulated futex uses 4096 lists of wait nodes. There are two levels
75 // of locking: a per-list mutex that controls access to the list and a
76 // per-node mutex, condvar, and bool that are used for the actual wakeups.
77 // The per-node mutex allows us to do precise wakeups without thundering
78 // herds.
79 struct Bucket {
83  std::atomic<uint64_t> count_;
84 
85  static Bucket& bucketFor(uint64_t key);
86 
87  void push_back(WaitNodeBase* node) {
88  if (tail_) {
89  FOLLY_SAFE_DCHECK(head_, "");
90  node->prev_ = tail_;
91  tail_->next_ = node;
92  tail_ = node;
93  } else {
94  tail_ = node;
95  head_ = node;
96  }
97  }
98 
99  void erase(WaitNodeBase* node) {
100  FOLLY_SAFE_DCHECK(count_.load(std::memory_order_relaxed) >= 1, "");
101  if (head_ == node && tail_ == node) {
102  FOLLY_SAFE_DCHECK(node->prev_ == nullptr, "");
103  FOLLY_SAFE_DCHECK(node->next_ == nullptr, "");
104  head_ = nullptr;
105  tail_ = nullptr;
106  } else if (head_ == node) {
107  FOLLY_SAFE_DCHECK(node->prev_ == nullptr, "");
108  FOLLY_SAFE_DCHECK(node->next_, "");
109  head_ = node->next_;
110  head_->prev_ = nullptr;
111  } else if (tail_ == node) {
112  FOLLY_SAFE_DCHECK(node->next_ == nullptr, "");
113  FOLLY_SAFE_DCHECK(node->prev_, "");
114  tail_ = node->prev_;
115  tail_->next_ = nullptr;
116  } else {
117  FOLLY_SAFE_DCHECK(node->next_, "");
118  FOLLY_SAFE_DCHECK(node->prev_, "");
119  node->next_->prev_ = node->prev_;
120  node->prev_->next_ = node->next_;
121  }
122  count_.fetch_sub(1, std::memory_order_relaxed);
123  }
124 };
125 
126 } // namespace parking_lot_detail
127 
128 enum class UnparkControl {
131  RetainBreak,
132  RemoveBreak,
133 };
134 
135 enum class ParkResult {
136  Skip,
137  Unpark,
138  Timeout,
139 };
140 
141 /*
142  * ParkingLot provides an interface that is similar to Linux's futex
143  * system call, but with additional functionality. It is implemented
144  * in a portable way on top of std::mutex and std::condition_variable.
145  *
146  * Additional reading:
147  * https://webkit.org/blog/6161/locking-in-webkit/
148  * https://github.com/WebKit/webkit/blob/master/Source/WTF/wtf/ParkingLot.h
149  * https://locklessinc.com/articles/futex_cheat_sheet/
150  *
151  * The main difference from futex is that park/unpark take lambdas,
152  * such that nearly anything can be done while holding the bucket
153  * lock. Unpark() lambda can also be used to wake up any number of
154  * waiters.
155  *
156  * ParkingLot is templated on the data type, however, all ParkingLot
157  * implementations are backed by a single static array of buckets to
158  * avoid large memory overhead. Lambdas will only ever be called on
159  * the specific ParkingLot's nodes.
160  */
161 template <typename Data = Unit>
162 class ParkingLot {
164  ParkingLot(const ParkingLot&) = delete;
165 
167  const Data data_;
168 
169  template <typename D>
170  WaitNode(uint64_t key, uint64_t lotid, D&& data)
171  : WaitNodeBase(key, lotid), data_(std::forward<D>(data)) {}
172  };
173 
174  public:
175  ParkingLot() : lotid_(parking_lot_detail::idallocator++) {}
176 
177  /* Park API
178  *
179  * Key is almost always the address of a variable.
180  *
181  * ToPark runs while holding the bucket lock: usually this
182  * is a check to see if we can sleep, by checking waiter bits.
183  *
184  * PreWait is usually used to implement condition variable like
185  * things, such that you can unlock the condition variable's lock at
186  * the appropriate time.
187  */
188  template <typename Key, typename D, typename ToPark, typename PreWait>
189  ParkResult park(const Key key, D&& data, ToPark&& toPark, PreWait&& preWait) {
190  return park_until(
191  key,
192  std::forward<D>(data),
193  std::forward<ToPark>(toPark),
194  std::forward<PreWait>(preWait),
196  }
197 
198  template <
199  typename Key,
200  typename D,
201  typename ToPark,
202  typename PreWait,
203  typename Clock,
204  typename Duration>
205  ParkResult park_until(
206  const Key key,
207  D&& data,
208  ToPark&& toPark,
209  PreWait&& preWait,
210  std::chrono::time_point<Clock, Duration> deadline);
211 
212  template <
213  typename Key,
214  typename D,
215  typename ToPark,
216  typename PreWait,
217  typename Rep,
218  typename Period>
220  const Key key,
221  D&& data,
222  ToPark&& toPark,
223  PreWait&& preWait,
224  std::chrono::duration<Rep, Period>& timeout) {
225  return park_until(
226  key,
227  std::forward<D>(data),
228  std::forward<ToPark>(toPark),
229  std::forward<PreWait>(preWait),
230  timeout + std::chrono::steady_clock::now());
231  }
232 
233  /*
234  * Unpark API
235  *
236  * Key is the same uniqueaddress used in park(), and is used as a
237  * hash key for lookup of waiters.
238  *
239  * Unparker is a function that is given the Data parameter, and
240  * returns an UnparkControl. The Remove* results will remove and
241  * wake the waiter, the Ignore/Stop results will not, while stopping
242  * or continuing iteration of the waiter list.
243  */
244  template <typename Key, typename Unparker>
245  void unpark(const Key key, Unparker&& func);
246 };
247 
248 template <typename Data>
249 template <
250  typename Key,
251  typename D,
252  typename ToPark,
253  typename PreWait,
254  typename Clock,
255  typename Duration>
257  const Key bits,
258  D&& data,
259  ToPark&& toPark,
260  PreWait&& preWait,
261  std::chrono::time_point<Clock, Duration> deadline) {
262  auto key = hash::twang_mix64(uint64_t(bits));
263  auto& bucket = parking_lot_detail::Bucket::bucketFor(key);
264  WaitNode node(key, lotid_, std::forward<D>(data));
265 
266  {
267  // A: Must be seq_cst. Matches B.
268  bucket.count_.fetch_add(1, std::memory_order_seq_cst);
269 
270  std::unique_lock<std::mutex> bucketLock(bucket.mutex_);
271 
272  if (!std::forward<ToPark>(toPark)()) {
273  bucketLock.unlock();
274  bucket.count_.fetch_sub(1, std::memory_order_relaxed);
275  return ParkResult::Skip;
276  }
277 
278  bucket.push_back(&node);
279  } // bucketLock scope
280 
281  std::forward<PreWait>(preWait)();
282 
283  auto status = node.wait(deadline);
284 
285  if (status == std::cv_status::timeout) {
286  // it's not really a timeout until we unlink the unsignaled node
287  std::lock_guard<std::mutex> bucketLock(bucket.mutex_);
288  if (!node.signaled()) {
289  bucket.erase(&node);
290  return ParkResult::Timeout;
291  }
292  }
293 
294  return ParkResult::Unpark;
295 }
296 
297 template <typename Data>
298 template <typename Key, typename Func>
299 void ParkingLot<Data>::unpark(const Key bits, Func&& func) {
300  auto key = hash::twang_mix64(uint64_t(bits));
301  auto& bucket = parking_lot_detail::Bucket::bucketFor(key);
302  // B: Must be seq_cst. Matches A. If true, A *must* see in seq_cst
303  // order any atomic updates in toPark() (and matching updates that
304  // happen before unpark is called)
305  if (bucket.count_.load(std::memory_order_seq_cst) == 0) {
306  return;
307  }
308 
309  std::lock_guard<std::mutex> bucketLock(bucket.mutex_);
310 
311  for (auto iter = bucket.head_; iter != nullptr;) {
312  auto node = static_cast<WaitNode*>(iter);
313  iter = iter->next_;
314  if (node->key_ == key && node->lotid_ == lotid_) {
315  auto result = std::forward<Func>(func)(node->data_);
316  if (result == UnparkControl::RemoveBreak ||
317  result == UnparkControl::RemoveContinue) {
318  // we unlink, but waiter destroys the node
319  bucket.erase(node);
320 
321  node->wake();
322  }
323  if (result == UnparkControl::RemoveBreak ||
324  result == UnparkControl::RetainBreak) {
325  return;
326  }
327  }
328  }
329 }
330 
331 } // namespace folly
std::atomic< uint64_t > count_
Definition: ParkingLot.h:83
ParkResult
Definition: ParkingLot.h:135
LogLevel max
Definition: LogLevel.cpp:31
WaitNodeBase(uint64_t key, uint64_t lotid)
Definition: ParkingLot.h:44
void unpark(const Key key, Unparker &&func)
std::chrono::steady_clock::time_point now()
std::atomic< uint64_t > idallocator
Definition: ParkingLot.cpp:33
internal::KeyMatcher< M > Key(M inner_matcher)
STL namespace.
void push_back(WaitNodeBase *node)
Definition: ParkingLot.h:87
ParkResult park(const Key key, D &&data, ToPark &&toPark, PreWait &&preWait)
Definition: ParkingLot.h:189
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
std::cv_status wait(std::chrono::time_point< Clock, Duration > deadline)
Definition: ParkingLot.h:48
std::chrono::milliseconds Duration
Definition: Types.h:36
#define D(name, bit)
Definition: CpuId.h:145
constexpr auto data(C &c) -> decltype(c.data())
Definition: Access.h:71
ParkResult park_until(const Key key, D &&data, ToPark &&toPark, PreWait &&preWait, std::chrono::time_point< Clock, Duration > deadline)
Definition: ParkingLot.h:256
ParkResult park_for(const Key key, D &&data, ToPark &&toPark, PreWait &&preWait, std::chrono::duration< Rep, Period > &timeout)
Definition: ParkingLot.h:219
UnparkControl
Definition: ParkingLot.h:128
WaitNode(uint64_t key, uint64_t lotid, D &&data)
Definition: ParkingLot.h:170
uint64_t twang_mix64(uint64_t key) noexcept
Definition: Hash.h:49
std::mutex mutex
static Bucket & bucketFor(uint64_t key)
Definition: ParkingLot.cpp:23
void erase(WaitNodeBase *node)
Definition: ParkingLot.h:99
const uint64_t lotid_
Definition: ParkingLot.h:163
#define FOLLY_SAFE_DCHECK(expr, msg)
Definition: SafeAssert.h:42
std::condition_variable cond_
Definition: ParkingLot.h:42