32 #include <glog/logging.h> 42 namespace distributed_mutex {
54 constexpr
auto kLocked = std::uintptr_t{0b1};
152 template <
template <
typename>
class Atomic>
159 Atomic<std::uint64_t> futex_{kUninitialized};
164 std::uintptr_t next_{0};
173 std::uintptr_t waiters_{0};
190 inline std::chrono::nanoseconds
time() {
191 return std::chrono::nanoseconds{
asm_rdtsc()};
198 template <
typename Type>
204 CHECK(!(mask & 0b1));
206 return reinterpret_cast<Type*
>(from & mask);
216 auto time = t.count();
228 template <
template <
typename>
class Atomic,
bool TimePublishing>
234 : next_{
exchange(other.next_,
nullptr)},
235 expected_{
exchange(other.expected_, 0)},
236 wakerMetadata_{
exchange(other.wakerMetadata_, {})},
237 waiters_{
exchange(other.waiters_,
nullptr)},
238 ready_{
exchange(other.ready_,
nullptr)} {}
242 explicit operator bool()
const {
253 std::uintptr_t expected,
254 bool timedWaiter =
false,
260 timedWaiters_{timedWaiter},
261 wakerMetadata_{wakerMetadata},
281 std::uintptr_t expected_{0};
289 bool timedWaiters_{
false};
304 template <
template <
typename>
class Atomic,
bool TimePublishing>
306 : state_{kUnlocked} {}
308 template <
typename Waiter>
321 auto signal = waiter.
futex_.exchange(
data, std::memory_order_acq_rel);
327 if (skipped || (signal == kWake)) {
333 if (spins < kMaxSpins) {
341 template <
typename Waiter>
371 auto sleeper = &(*waiter)->sleeper_;
372 sleeper->store(kWake, std::memory_order_release);
377 template <
typename Waiter>
381 DCHECK((*waiter)->futex_.load(std::memory_order_relaxed) ==
kAboutToWait);
389 auto pre = (*waiter)->sleeper_.exchange(kSleeping, std::memory_order_acq_rel);
397 if (pre == kSleeping) {
405 while (pre != kWake) {
416 futexWait(&(*waiter)->sleeper_, kSleeping);
417 pre = (*waiter)->sleeper_.load(std::memory_order_acquire);
418 DCHECK((pre == kSleeping) || (pre == kWake));
423 DCHECK(next ==
nullptr);
424 next = extractAddress<Waiter>((*waiter)->next_);
428 template <
typename Waiter>
434 return spin(**waiter);
439 std::uintptr_t& previous) {
443 DCHECK(previous != kTimedWaiter);
445 if (
UNLIKELY(previous & kTimedWaiter)) {
449 previous = previous & (~kTimedWaiter);
453 template <
template <
typename>
class Atomic,
bool TimePublishing>
464 auto previous = std::uintptr_t{0};
467 auto timedWaiter =
false;
472 auto&& address =
reinterpret_cast<std::uintptr_t
>(&
state);
473 DCHECK(!(address & 0b1));
488 previous =
state_.exchange(address, std::memory_order_acq_rel);
490 state->next_ = previous;
491 if (previous == kUnlocked) {
492 return {
nullptr, address, timedWaiter, {},
nullptr, nextSleeper};
494 DCHECK(previous & kLocked);
498 if (!
wait(&
state, (waitMode == kAboutToWait), nextSleeper)) {
521 auto next = previous;
522 auto expected = address;
523 if (previous ==
state->wakerMetadata_.waker_) {
532 return {extractAddress<CachelinePadded<Waiter<Atomic>>>(
next),
535 state->wakerMetadata_,
543 auto nodeTime =
recover(value);
544 auto preempted = currentTime > nodeTime + kScheduledAwaySpinThreshold.count();
548 DCHECK(value != kSkipped);
553 return (value == kAboutToWait);
556 template <
typename Waiter>
560 std::uintptr_t
value,
566 (*waiter)->wakerMetadata_ = metadata;
567 (*waiter)->waiters_ =
reinterpret_cast<std::uintptr_t
>(sleepers);
568 (*waiter)->futex_.store(kWake, std::memory_order_release);
593 auto next = (*waiter)->next_;
594 (*waiter)->futex_.store(kSkipped, std::memory_order_release);
621 (*waiter)->wakerMetadata_ = metadata;
622 (*waiter)->waiters_ =
reinterpret_cast<std::uintptr_t
>(sleepers);
623 auto pre = (*waiter)->sleeper_.exchange(kSleeping, std::memory_order_acq_rel);
626 if (pre != kSleeping) {
635 auto next = (*waiter)->next_;
636 (*waiter)->next_ =
reinterpret_cast<std::uintptr_t
>(sleepers);
641 template <
typename Waiter>
652 auto value = (*current)->futex_.load(std::memory_order_acquire);
663 (
next == metadata.
waker_) ?
nullptr : extractAddress<Waiter>(
next);
669 template <
typename Atomic>
676 template <
typename Atomic,
typename Proxy,
typename Sleepers>
678 auto expected = proxy.expected_;
680 if (state.compare_exchange_strong(
683 std::memory_order_release,
684 std::memory_order_relaxed)) {
694 if (
UNLIKELY(expected == (proxy.expected_ | kTimedWaiter))) {
695 proxy.timedWaiters_ =
true;
704 template <
template <
typename>
class Atomic,
bool Publish>
708 DCHECK(proxy) <<
"Invalid proxy passed to DistributedMutex::unlock()";
754 auto head =
state_.exchange(kLocked, std::memory_order_acq_rel);
756 auto next = extractAddress<CachelinePadded<Waiter<Atomic>>>(head);
757 DCHECK((head & kLocked) && (head != kLocked)) <<
"incorrect state " << head;
764 template <
template <
typename>
class Atomic,
bool TimePublishing>
777 return {
nullptr, kLocked};
783 template <
typename Atomic,
typename Deadline,
typename MakeProxy>
829 auto previous = state.fetch_or(
data, std::memory_order_acquire);
830 if (!(previous & 0b1)) {
832 return proxy(
nullptr, kLocked,
true);
838 if (result == std::cv_status::timeout) {
839 return proxy(
nullptr, std::uintptr_t{0},
false);
844 template <
template <
typename>
class Atomic,
bool TimePublishing>
845 template <
typename Clock,
typename Duration>
848 const std::chrono::time_point<Clock, Duration>& deadline) {
867 template <
template <
typename>
class Atomic,
bool TimePublishing>
868 template <
typename Rep,
typename Period>
871 const std::chrono::duration<Rep, Period>& duration) {
CachelinePadded< Waiter< Atomic > > * ready_
DistributedMutexStateProxy try_lock()
bool wake(bool publishing, Waiter &waiter, WakerMetadata metadata, Waiter *&sleepers)
WakerMetadata wakerMetadata_
Waiter(std::uint64_t futex)
Type * extractAddress(std::uintptr_t from)
std::chrono::steady_clock::time_point now()
Atom< std::uint32_t > Futex
—— Concurrent Priority Queue Implementation ——
constexpr auto kScheduledAwaySpinThreshold
void unlock(DistributedMutexStateProxy)
bool doFutexWait(Waiter *waiter, Waiter *&next)
FutexResult futexWait(const Futex *futex, uint32_t expected, uint32_t waitMask)
bool preempted(std::uint64_t value)
bool atomic_fetch_set(Atomic &atomic, std::size_t bit, std::memory_order mo)
DistributedMutexStateProxy(DistributedMutexStateProxy &&other)
void recordTimedWaiterAndClearTimedBit(bool &timedWaiter, std::uintptr_t &previous)
std::uintptr_t tryWake(bool publishing, Waiter *waiter, std::uintptr_t value, WakerMetadata metadata, Waiter *&sleepers)
std::uint64_t strip(std::chrono::nanoseconds t)
void wakeTimedWaiters(Atomic *state, bool timedWaiters)
std::uint64_t asm_rdtsc()
constexpr auto data(C &c) -> decltype(c.data())
std::uint64_t recover(std::uint64_t from)
CachelinePadded< Waiter< Atomic > > * waiters_
bool wait(Waiter *waiter, bool shouldSleep, Waiter *&next)
constexpr auto kAboutToWait
DistributedMutexStateProxy try_lock_until(const std::chrono::time_point< Clock, Duration > &deadline)
Atomic< std::uint64_t > futex_
PUSHMI_INLINE_VAR constexpr struct folly::pushmi::operators::from_fn from
T exchange(T &obj, U &&new_value)
bool spin(Waiter &waiter)
bool tryUnlockClean(Atomic &state, Proxy &proxy, Sleepers sleepers)
void atomic_notify_one(const std::atomic< Integer > *atomic)
DistributedMutexStateProxy try_lock_for(const std::chrono::duration< Rep, Period > &duration)
bool isSleeper(std::uintptr_t value)
uint64_t value(const typename LockFreeRingBuffer< T, Atom >::Cursor &rbcursor)
void doFutexWake(Waiter *waiter)
void swap(SwapTrackingAlloc< T > &, SwapTrackingAlloc< T > &)
CachelinePadded< Waiter< Atomic > > * next_
Atomic< std::uintptr_t > state_
DistributedMutexStateProxy lock()
constexpr auto kUninitialized
std::chrono::nanoseconds time()
static void sleep() noexcept
constexpr auto kTimedWaiter
int futexWake(const Futex *futex, int count, uint32_t wakeMask)
std::cv_status atomic_wait_until(const std::atomic< Integer > *atomic, Integer expected, const std::chrono::time_point< Clock, Duration > &deadline)
void asm_volatile_pause()
DistributedMutexStateProxy(CachelinePadded< Waiter< Atomic >> *next, std::uintptr_t expected, bool timedWaiter=false, WakerMetadata wakerMetadata={}, CachelinePadded< Waiter< Atomic >> *waiters=nullptr, CachelinePadded< Waiter< Atomic >> *ready=nullptr)
auto timedLock(Atomic &state, Deadline deadline, MakeProxy proxy)