23 template <
typename Tag>
26 template <
typename Tag>
39 [
this]() {
return syncMutex_.try_lock(); },
40 [
this]() { syncMutex_.unlock(); },
42 counters_.resetAfterFork();
47 template <
typename Tag>
52 template <
typename Tag>
54 auto idx = version_.load(std::memory_order_acquire);
56 counters_.increment(idx);
61 template <
typename Tag>
63 DCHECK(0 == token.epoch_ || 1 == token.epoch_);
64 counters_.decrement(token.epoch_);
67 template <
typename Tag>
71 node->cb_ = [node, cb = std::forward<T>(cbin)]() {
78 template <
typename Tag>
85 uint64_t time = std::chrono::duration_cast<std::chrono::milliseconds>(
88 auto syncTime = syncTime_.load(std::memory_order_relaxed);
89 if (time > syncTime + syncTimePeriod_ &&
90 syncTime_.compare_exchange_strong(
91 syncTime, time, std::memory_order_relaxed)) {
94 std::lock_guard<std::mutex>
g(syncMutex_);
95 half_sync(
false, finished);
103 template <
typename Tag>
105 auto curr = version_.load(std::memory_order_acquire);
107 auto target = curr + 2;
112 auto work = work_.load(std::memory_order_acquire);
114 if (work < target && work_.compare_exchange_strong(tmp, target)) {
117 std::lock_guard<std::mutex>
g(syncMutex_);
118 while (version_.load(std::memory_order_acquire) < target) {
119 half_sync(
true, finished);
127 if (version_.load(std::memory_order_acquire) >= target) {
130 std::atomic<uint32_t> cutoff{100};
132 turn_.tryWaitForTurn(work, cutoff,
false);
145 template <
typename Tag>
147 uint64_t curr = version_.load(std::memory_order_acquire);
148 auto next = curr + 1;
157 q_.collect(queues_[0]);
160 counters_.waitForZero(
next & 1);
162 if (counters_.readFull(
next & 1) != 0) {
169 finished.splice(queues_[1]);
170 queues_[1].splice(queues_[0]);
172 version_.store(
next, std::memory_order_release);
174 turn_.completeTurn(curr);
void half_sync(bool blocking, list_head &cbs)
typename detail::ThreadCachedLists< Tag >::ListHead list_head
constexpr detail::Map< Move > move
std::chrono::steady_clock::time_point now()
FOLLY_ALWAYS_INLINE void unlock_shared(rcu_token &&)
—— Concurrent Priority Queue Implementation ——
rcu_domain(Executor *executor=nullptr) noexcept
requires E e noexcept(noexcept(s.error(std::move(e))))
PUSHMI_INLINE_VAR constexpr __adl::get_executor_fn executor
FOLLY_ALWAYS_INLINE rcu_token lock_shared()
void synchronize() noexcept
typename detail::ThreadCachedLists< Tag >::Node list_node
static void unregisterHandler(void *object)
static void registerHandler(void *object, folly::Function< bool()> prepare, folly::Function< void()> parent, folly::Function< void()> child)
std::chrono::nanoseconds time()
void retire(list_node *node) noexcept