19 #include <condition_variable> 31 namespace parking_lot_detail {
45 : key_(key), lotid_(lotid), signaled_(false) {}
47 template <
typename Clock,
typename Duration>
48 std::cv_status
wait(std::chrono::time_point<Clock, Duration> deadline) {
49 std::cv_status status = std::cv_status::no_timeout;
50 std::unique_lock<std::mutex> nodeLock(mutex_);
51 while (!signaled_ && status != std::cv_status::timeout) {
53 status = cond_.wait_until(nodeLock, deadline);
62 std::lock_guard<std::mutex> nodeLock(mutex_);
101 if (head_ == node && tail_ == node) {
106 }
else if (head_ == node) {
110 head_->
prev_ =
nullptr;
111 }
else if (tail_ == node) {
115 tail_->
next_ =
nullptr;
122 count_.fetch_sub(1, std::memory_order_relaxed);
161 template <
typename Data = Unit>
169 template <
typename D>
188 template <
typename Key,
typename D,
typename ToPark,
typename PreWait>
192 std::forward<D>(
data),
193 std::forward<ToPark>(toPark),
194 std::forward<PreWait>(preWait),
210 std::chrono::time_point<Clock, Duration> deadline);
224 std::chrono::duration<Rep, Period>& timeout) {
227 std::forward<D>(
data),
228 std::forward<ToPark>(toPark),
229 std::forward<PreWait>(preWait),
244 template <
typename Key,
typename Unparker>
245 void unpark(
const Key key, Unparker&& func);
248 template <
typename Data>
261 std::chrono::time_point<Clock, Duration> deadline) {
268 bucket.count_.fetch_add(1, std::memory_order_seq_cst);
270 std::unique_lock<std::mutex> bucketLock(bucket.mutex_);
272 if (!std::forward<ToPark>(toPark)()) {
274 bucket.count_.fetch_sub(1, std::memory_order_relaxed);
278 bucket.push_back(&node);
281 std::forward<PreWait>(preWait)();
283 auto status = node.
wait(deadline);
285 if (status == std::cv_status::timeout) {
287 std::lock_guard<std::mutex> bucketLock(bucket.mutex_);
297 template <
typename Data>
298 template <
typename Key,
typename Func>
305 if (bucket.count_.load(std::memory_order_seq_cst) == 0) {
309 std::lock_guard<std::mutex> bucketLock(bucket.mutex_);
311 for (
auto iter = bucket.head_; iter !=
nullptr;) {
312 auto node =
static_cast<WaitNode*
>(iter);
314 if (node->key_ == key && node->lotid_ ==
lotid_) {
315 auto result = std::forward<Func>(func)(node->data_);
std::atomic< uint64_t > count_
WaitNodeBase(uint64_t key, uint64_t lotid)
void unpark(const Key key, Unparker &&func)
std::chrono::steady_clock::time_point now()
std::atomic< uint64_t > idallocator
internal::KeyMatcher< M > Key(M inner_matcher)
void push_back(WaitNodeBase *node)
ParkResult park(const Key key, D &&data, ToPark &&toPark, PreWait &&preWait)
—— Concurrent Priority Queue Implementation ——
std::cv_status wait(std::chrono::time_point< Clock, Duration > deadline)
std::chrono::milliseconds Duration
constexpr auto data(C &c) -> decltype(c.data())
ParkResult park_until(const Key key, D &&data, ToPark &&toPark, PreWait &&preWait, std::chrono::time_point< Clock, Duration > deadline)
ParkResult park_for(const Key key, D &&data, ToPark &&toPark, PreWait &&preWait, std::chrono::duration< Rep, Period > &timeout)
WaitNode(uint64_t key, uint64_t lotid, D &&data)
uint64_t twang_mix64(uint64_t key) noexcept
static Bucket & bucketFor(uint64_t key)
void erase(WaitNodeBase *node)
#define FOLLY_SAFE_DCHECK(expr, msg)
std::condition_variable cond_