proxygen
DistributedMutex-inl.h
Go to the documentation of this file.
1 /*
2  * Copyright 2004-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
17 
18 #include <folly/CachelinePadded.h>
19 #include <folly/Likely.h>
20 #include <folly/Portability.h>
21 #include <folly/ScopeGuard.h>
22 #include <folly/Utility.h>
23 #include <folly/detail/Futex.h>
24 #include <folly/lang/Align.h>
25 #include <folly/portability/Asm.h>
31 
32 #include <glog/logging.h>
33 
34 #include <atomic>
35 #include <cstdint>
36 #include <limits>
37 #include <stdexcept>
38 #include <thread>
39 
40 namespace folly {
41 namespace detail {
42 namespace distributed_mutex {
43 // kUnlocked is used to show unlocked state
44 //
45 // When locking threads encounter kUnlocked in the underlying storage, they
46 // can just acquire the lock without any further effort
47 constexpr auto kUnlocked = std::uintptr_t{0b0};
48 // kLocked is used to show that the mutex is currently locked, and future
49 // attempts to lock the mutex should enqueue on the central storage
50 //
51 // Locking threads find this on central storage only when there is a
52 // contention chain that is undergoing wakeups, in every other case, a locker
53 // will either find kUnlocked or an arbitrary address with the kLocked bit set
54 constexpr auto kLocked = std::uintptr_t{0b1};
55 // kTimedWaiter is set when there is at least one timed waiter on the mutex
56 //
57 // Timed waiters do not follow the sleeping strategy employed by regular,
58 // non-timed threads. They sleep on the central mutex atomic through an
59 // extended futex() interface that allows sleeping with the same semantics for
60 // non-standard integer widths
61 //
62 // When a regular non-timed thread unlocks or enqueues on the mutex, and sees
63 // a timed waiter, it takes ownership of all the timed waiters. The thread
64 // that has taken ownership of the timed waiter releases the timed waiters
65 // when it gets a chance at the critical section. At which point it issues a
66 // wakeup to single timed waiter, timed waiters always issue wake() calls to
67 // other timed waiters
68 constexpr auto kTimedWaiter = std::uintptr_t{0b10};
69 
70 // kUninitialized means that the thread has just enqueued, and has not yet
71 // gotten to initializing itself with the address of its successor
72 //
73 // this becomes significant for threads that are trying to wake up the
74 // uninitialized thread, if they see that the thread is not yet initialized,
75 // they can do nothing but spin, and wait for the thread to get initialized
76 constexpr auto kUninitialized = std::uint32_t{0b0};
77 // kWaiting will be set in the waiter's futex structs while they are spinning
78 // while waiting for the mutex
79 constexpr auto kWaiting = std::uint32_t{0b1};
80 // kWake will be set by threads that are waking up waiters that have enqueued
81 constexpr auto kWake = std::uint32_t{0b10};
82 // kSkipped will be set by a waker when they see that a waiter has been
83 // preempted away by the kernel, in this case the thread that got skipped will
84 // have to wake up and put itself back on the queue
85 constexpr auto kSkipped = std::uint32_t{0b11};
86 // kAboutToWait will be set by a waiter that enqueues itself with the purpose
87 // of waiting on a futex
88 constexpr auto kAboutToWait = std::uint32_t{0b100};
89 // kSleeping will be set by a waiter right before enqueueing on a futex. When
90 // a thread wants to wake up a waiter that has enqueued on a futex, it should
91 // set the futex to contain kWake
92 //
93 // a thread that is unlocking and wants to skip over a sleeping thread also
94 // calls futex_.exchange(kSleeping) on the sleeping thread's futex word. It
95 // does this to 1. detect whether the sleeping thread had actually gone to
96 // sleeping on the futex word so it can skip it, and 2. to synchronize with
97 // other non atomic writes in the sleeping thread's context (such as the write
98 // to track the next waiting thread).
99 //
100 // We reuse kSleeping instead of say using another constant kEarlyDelivery to
101 // avoid situations where a thread has to enter kernel mode due to calling
102 // futexWait() twice because of the presence of a waking thread. This
103 // situation can arise when an unlocking thread goes to skip over a sleeping
104 // thread, sees that the thread has slept and move on, but the sleeping thread
105 // had not yet entered futex(). This interleaving causes the thread calling
106 // futex() to return spuriously, as the futex word is not what it should be
107 constexpr auto kSleeping = std::uint32_t{0b101};
108 
109 // The number of spins that we are allowed to do before we resort to marking a
110 // thread as having slept
111 //
112 // This is just a magic number from benchmarks
113 constexpr auto kScheduledAwaySpinThreshold = std::chrono::nanoseconds{200};
114 // The maximum number of spins before a thread starts yielding its processor
115 // in hopes of getting skipped
116 constexpr auto kMaxSpins = 4000;
117 
127  public:
128  // This is the thread that initiated wakeups for the contention chain.
129  // There can only ever be one thread that initiates the wakeup for a
130  // chain in the spin only version of this mutex. When a thread that just
131  // woke up sees this as the next thread to wake up, it knows that it is the
132  // terminal node in the contention chain. This means that it was the one
133  // that took off the thread that had acquired the mutex off the centralized
134  // state. Therefore, the current thread is the last in it's contention
135  // chain. It will fall back to centralized storage to pick up the next
136  // waiter or release the mutex
137  //
138  // When we move to a full sleeping implementation, this might need to change
139  // to a small_vector<> to account for failed wakeups, or we can put threads
140  // to sleep on the central futex, which is an easier implementation
141  // strategy. Although, since this is allocated on the stack, we can set a
142  // prohitively large threshold to avoid heap allocations, this strategy
143  // however, might cause increased cache misses on wakeup signalling
144  std::uintptr_t waker_{0};
145 };
146 
152 template <template <typename> class Atomic>
153 class Waiter {
154  public:
155  explicit Waiter(std::uint64_t futex) : futex_{futex} {}
156 
157  // the atomic that this thread will spin on while waiting for the mutex to
158  // be unlocked
159  Atomic<std::uint64_t> futex_{kUninitialized};
160  // metadata for the waker
161  WakerMetadata wakerMetadata_{};
162  // The successor of this node. This will be the thread that had its address
163  // on the mutex previously
164  std::uintptr_t next_{0};
165  // the list of threads that the waker had previously seen to be sleeping on
166  // a futex(),
167  //
168  // this is given to the current thread as a means to pass on
169  // information. When the current thread goes to unlock the mutex and does
170  // not see contention, it should go and wake up the head of this list. If
171  // the current thread sees a contention chain on the mutex, it should pass
172  // on this list to the next thread that gets woken up
173  std::uintptr_t waiters_{0};
174  // The futex that this waiter will sleep on
175  //
176  // how can we reuse futex_ from above for futex management?
177  Futex<Atomic> sleeper_{kUninitialized};
178 };
179 
190 inline std::chrono::nanoseconds time() {
191  return std::chrono::nanoseconds{asm_rdtsc()};
192 }
193 
198 template <typename Type>
199 Type* extractAddress(std::uintptr_t from) {
200  // shift one bit off the end, to get all 1s followed by a single 0
202  mask >>= 1;
203  mask <<= 1;
204  CHECK(!(mask & 0b1));
205 
206  return reinterpret_cast<Type*>(from & mask);
207 }
208 
215 inline std::uint64_t strip(std::chrono::nanoseconds t) {
216  auto time = t.count();
217  return time << 8;
218 }
219 
225  return from >> 8;
226 }
227 
228 template <template <typename> class Atomic, bool TimePublishing>
229 class DistributedMutex<Atomic, TimePublishing>::DistributedMutexStateProxy {
230  public:
234  : next_{exchange(other.next_, nullptr)},
235  expected_{exchange(other.expected_, 0)},
236  wakerMetadata_{exchange(other.wakerMetadata_, {})},
237  waiters_{exchange(other.waiters_, nullptr)},
238  ready_{exchange(other.ready_, nullptr)} {}
239 
242  explicit operator bool() const {
243  return expected_;
244  }
245 
246  // private:
249  friend class DistributedMutex<Atomic, TimePublishing>;
250 
253  std::uintptr_t expected,
254  bool timedWaiter = false,
255  WakerMetadata wakerMetadata = {},
256  CachelinePadded<Waiter<Atomic>>* waiters = nullptr,
257  CachelinePadded<Waiter<Atomic>>* ready = nullptr)
258  : next_{next},
259  expected_{expected},
260  timedWaiters_{timedWaiter},
261  wakerMetadata_{wakerMetadata},
262  waiters_{waiters},
263  ready_{ready} {}
264 
265  // the next thread that is to be woken up, this being null at the time of
266  // unlock() shows that the current thread acquired the mutex without
267  // contention or it was the terminal thread in the queue of threads waking up
269  // this is the value that the current thread should expect to find on
270  // unlock, and if this value is not there on unlock, the current thread
271  // should assume that other threads are enqueued waiting for the mutex
272  //
273  // note that if the mutex has the same state set at unlock time, and this is
274  // set to an address (and not say kLocked in the case of a terminal waker)
275  // then it must have been the case that no other thread had enqueued itself,
276  // since threads in the domain of this mutex do not share stack space
277  //
278  // if we want to support stack sharing, we can solve the problem by looping
279  // at lock time, and setting a variable that says whether we have acquired
280  // the lock or not perhaps
281  std::uintptr_t expected_{0};
282  // a boolean that will be set when the mutex has timed waiters that the
283  // current thread is responsible for waking, in such a case, the current
284  // thread will issue an atomic_notify_one() call after unlocking the mutex
285  //
286  // note that a timed waiter will itself always have this flag set. This is
287  // done so we can avoid having to issue a atomic_notify_all() call (and
288  // subsequently a thundering herd) when waking up timed-wait threads
289  bool timedWaiters_{false};
290  // metadata passed along from the thread that woke this thread up
291  WakerMetadata wakerMetadata_{};
292  // the list of threads that are waiting on a futex
293  //
294  // the current threads is meant to wake up this list of waiters if it is
295  // able to commit an unlock() on the mutex without seeing a contention chain
296  CachelinePadded<Waiter<Atomic>>* waiters_{nullptr};
297  // after a thread has woken up from a futex() call, it will have the rest of
298  // the threads that it were waiting behind it in this list, a thread that
299  // unlocks has to wake up threads from this list if it has any, before it
300  // goes to sleep to prevent pathological unfairness
302 };
303 
304 template <template <typename> class Atomic, bool TimePublishing>
306  : state_{kUnlocked} {}
307 
308 template <typename Waiter>
309 bool spin(Waiter& waiter) {
310  auto spins = 0;
311  while (true) {
312  // publish our current time in the futex as a part of the spin waiting
313  // process
314  //
315  // if we are under the maximum number of spins allowed before sleeping, we
316  // publish the exact timestamp, otherwise we publish the minimum possible
317  // timestamp to force the waking thread to skip us
318  ++spins;
319  auto now = (spins < kMaxSpins) ? time() : decltype(time())::zero();
320  auto data = strip(now) | kWaiting;
321  auto signal = waiter.futex_.exchange(data, std::memory_order_acq_rel);
323 
324  // if we got skipped, make a note of it and return if we got a skipped
325  // signal or a signal to wake up
326  auto skipped = signal == kSkipped;
327  if (skipped || (signal == kWake)) {
328  return !skipped;
329  }
330 
331  // if we are under the spin threshold, pause to allow the other
332  // hyperthread to run. If not, then sleep
333  if (spins < kMaxSpins) {
335  } else {
336  Sleeper::sleep();
337  }
338  }
339 }
340 
341 template <typename Waiter>
342 void doFutexWake(Waiter* waiter) {
343  if (waiter) {
344  // We can use a simple store operation here and not worry about checking
345  // to see if the thread had actually started waiting on the futex, that is
346  // already done in tryWake() when a sleeping thread is collected
347  //
348  // We now do not know whether the waiter had already enqueued on the futex
349  // or whether it had just stored kSleeping in its futex and was about to
350  // call futexWait(). We treat both these scenarios the same
351  //
352  // the below can theoretically cause a problem if we set the
353  // wake signal and the waiter was in between setting kSleeping in its
354  // futex and enqueueing on the futex. In this case the waiter will just
355  // return from futexWait() immediately. This leaves the address that the
356  // waiter was using for futexWait() possibly dangling, and the thread that
357  // we woke in the exchange above might have used that address for some
358  // other object
359  //
360  // however, even if the thread had indeed woken up simply becasue of the
361  // above exchange(), the futexWake() below is not incorrect. It is not
362  // incorrect because futexWake() does not actually change the memory of
363  // the futex word. It just uses the address to do a lookup in the kernel
364  // futex table. And even if we call futexWake() on some other address,
365  // and that address was being used to wait on futex() that thread will
366  // protect itself from spurious wakeups, check the value in the futex word
367  // and enqueue itself back on the futex
368  //
369  // this dangilng pointer possibility is why we use a pointer to the futex
370  // word, and avoid dereferencing after the store() operation
371  auto sleeper = &(*waiter)->sleeper_;
372  sleeper->store(kWake, std::memory_order_release);
373  futexWake(sleeper, 1);
374  }
375 }
376 
377 template <typename Waiter>
378 bool doFutexWait(Waiter* waiter, Waiter*& next) {
379  // first we get ready to sleep by calling exchange() on the futex with a
380  // kSleeping value
381  DCHECK((*waiter)->futex_.load(std::memory_order_relaxed) == kAboutToWait);
382 
383  // note the semantics of using a futex here, when we exchange the sleeper_
384  // with kSleeping, we are getting ready to sleep, but before sleeping we get
385  // ready to sleep, and we return from futexWait() when the value of
386  // sleeper_ might have changed. We can also wake up because of a spurious
387  // wakeup, so we always check against the value in sleeper_ after returning
388  // from futexWait(), if the value is not kWake, then we continue
389  auto pre = (*waiter)->sleeper_.exchange(kSleeping, std::memory_order_acq_rel);
390 
391  // Seeing a kSleeping on a futex word before we set it ourselves means only
392  // one thing - an unlocking thread caught us before we went to futex(), and
393  // we now have the lock, so we abort
394  //
395  // if we were given an early delivery, we can return from this function with
396  // a true, meaning that we now have the lock
397  if (pre == kSleeping) {
398  return true;
399  }
400 
401  // if we reach here then were were not given an early delivery, and any
402  // thread that goes to wake us up will see a consistent view of the rest of
403  // the contention chain (since the next_ variable is set before the
404  // kSleeping exchange above)
405  while (pre != kWake) {
406  // before enqueueing on the futex, we wake any waiters that we were
407  // possibly responsible for
408  doFutexWake(exchange(next, nullptr));
409 
410  // then we wait on the futex
411  //
412  // note that we have to protect ourselves against spurious wakeups here.
413  // Because the corresponding futexWake() above does not synchronize
414  // wakeups around the futex word. Because doing so would become
415  // inefficient
416  futexWait(&(*waiter)->sleeper_, kSleeping);
417  pre = (*waiter)->sleeper_.load(std::memory_order_acquire);
418  DCHECK((pre == kSleeping) || (pre == kWake));
419  }
420 
421  // when coming out of a futex, we might have some other sleeping threads
422  // that we were supposed to wake up, assign that to the next pointer
423  DCHECK(next == nullptr);
424  next = extractAddress<Waiter>((*waiter)->next_);
425  return false;
426 }
427 
428 template <typename Waiter>
429 bool wait(Waiter* waiter, bool shouldSleep, Waiter*& next) {
430  if (shouldSleep) {
431  return doFutexWait(waiter, next);
432  }
433 
434  return spin(**waiter);
435 }
436 
438  bool& timedWaiter,
439  std::uintptr_t& previous) {
440  // the previous value in the mutex can never be kTimedWaiter, timed waiters
441  // always set (kTimedWaiter | kLocked) in the mutex word when they try and
442  // acquire the mutex
443  DCHECK(previous != kTimedWaiter);
444 
445  if (UNLIKELY(previous & kTimedWaiter)) {
446  // record whether there was a timed waiter in the previous mutex state, and
447  // clear the timed bit from the previous state
448  timedWaiter = true;
449  previous = previous & (~kTimedWaiter);
450  }
451 }
452 
453 template <template <typename> class Atomic, bool TimePublishing>
456  // first try and acquire the lock as a fast path, the underlying
457  // implementation is slightly faster than using std::atomic::exchange() as
458  // is used in this function. So we get a small perf boost in the
459  // uncontended case
460  if (auto state = try_lock()) {
461  return state;
462  }
463 
464  auto previous = std::uintptr_t{0};
465  auto waitMode = kUninitialized;
466  auto nextWaitMode = kAboutToWait;
467  auto timedWaiter = false;
468  CachelinePadded<Waiter<Atomic>>* nextSleeper = nullptr;
469  while (true) {
470  // construct the state needed to wait
471  auto&& state = CachelinePadded<Waiter<Atomic>>{waitMode};
472  auto&& address = reinterpret_cast<std::uintptr_t>(&state);
473  DCHECK(!(address & 0b1));
474 
475  // set the locked bit in the address we will be persisting in the mutex
476  address |= kLocked;
477 
478  // attempt to acquire the mutex, mutex acquisition is successful if the
479  // previous value is zeroed out
480  //
481  // we use memory_order_acq_rel here because we want the read-modify-write
482  // operation to be both acquire and release. Acquire becasue if this is a
483  // successful lock acquisition, we want to acquire state any other thread
484  // has released from a prior unlock. We want release semantics becasue
485  // other threads that read the address of this value should see the full
486  // well-initialized node we are going to wait on if the mutex acquisition
487  // was unsuccessful
488  previous = state_.exchange(address, std::memory_order_acq_rel);
489  recordTimedWaiterAndClearTimedBit(timedWaiter, previous);
490  state->next_ = previous;
491  if (previous == kUnlocked) {
492  return {nullptr, address, timedWaiter, {}, nullptr, nextSleeper};
493  }
494  DCHECK(previous & kLocked);
495 
496  // wait until we get a signal from another thread, if this returns false,
497  // we got skipped and had probably been scheduled out, so try again
498  if (!wait(&state, (waitMode == kAboutToWait), nextSleeper)) {
499  std::swap(waitMode, nextWaitMode);
500  continue;
501  }
502 
503  // at this point it is safe to access the other fields in the waiter state,
504  // since the thread that woke us up is gone and nobody will be touching this
505  // state again, note that this requires memory ordering, and this is why we
506  // use memory_order_acquire (among other reasons) in the above wait
507  //
508  // first we see if the value we took off the mutex state was the thread that
509  // initated the wakeups, if so, we are the terminal node of the current
510  // contention chain. If we are the terminal node, then we should expect to
511  // see a kLocked in the mutex state when we unlock, if we see that, we can
512  // commit the unlock to the centralized mutex state. If not, we need to
513  // continue wakeups
514  //
515  // a nice consequence of passing kLocked as the current address if we are
516  // the terminal node is that it naturally just works with the algorithm. If
517  // we get a contention chain when coming out of a contention chain, the tail
518  // of the new contention chain will have kLocked set as the previous, which,
519  // as it happens "just works", since we have now established a recursive
520  // relationship until broken
521  auto next = previous;
522  auto expected = address;
523  if (previous == state->wakerMetadata_.waker_) {
524  next = 0;
525  expected = kLocked;
526  }
527 
528  // if we are just coming out of a futex call, then it means that the next
529  // waiter we are responsible for is also a waiter waiting on a futex, so
530  // we return that list in the list of ready threads. We wlil be waking up
531  // the ready threads on unlock no matter what
532  return {extractAddress<CachelinePadded<Waiter<Atomic>>>(next),
533  expected,
534  timedWaiter,
535  state->wakerMetadata_,
537  nextSleeper};
538  }
539 }
540 
542  auto currentTime = recover(strip(time()));
543  auto nodeTime = recover(value);
544  auto preempted = currentTime > nodeTime + kScheduledAwaySpinThreshold.count();
545 
546  // we say that the thread has been preempted if its timestamp says so, and
547  // also if it is neither uninitialized nor skipped
548  DCHECK(value != kSkipped);
549  return (preempted) && (value != kUninitialized);
550 }
551 
552 inline bool isSleeper(std::uintptr_t value) {
553  return (value == kAboutToWait);
554 }
555 
556 template <typename Waiter>
557 std::uintptr_t tryWake(
558  bool publishing,
559  Waiter* waiter,
560  std::uintptr_t value,
561  WakerMetadata metadata,
562  Waiter*& sleepers) {
563  // first we see if we can wake the current thread that is spinning
564  if ((!publishing || !preempted(value)) && !isSleeper(value)) {
565  // we need release here because of the write to wakerMetadata_
566  (*waiter)->wakerMetadata_ = metadata;
567  (*waiter)->waiters_ = reinterpret_cast<std::uintptr_t>(sleepers);
568  (*waiter)->futex_.store(kWake, std::memory_order_release);
569  return 0;
570  }
571 
572  // if the thread is not a sleeper, and we were not able to catch it before
573  // preemption, we can just return a false, it is safe to read next_ because
574  // the thread was preempted. Preemption signals can only come after the
575  // thread has set the next_ pointer, since the timestamp writes only start
576  // occurring after that point
577  //
578  // if a thread was preempted it must have stored next_ in the waiter struct,
579  // as the store to futex_ that resets the value from kUninitialized happens
580  // after the write to next
581  CHECK(publishing);
582  if (!isSleeper(value)) {
583  // go on to the next one
584  //
585  // Also, we need a memory_order_release here to prevent missed wakeups. A
586  // missed wakeup here can happen when we see that a thread had been
587  // preempted and skip it. Then go on to release the lock, and then when
588  // the thread which got skipped does an exchange on the central storage,
589  // still sees the locked bit, and never gets woken up
590  //
591  // Can we relax this?
592  DCHECK(preempted(value));
593  auto next = (*waiter)->next_;
594  (*waiter)->futex_.store(kSkipped, std::memory_order_release);
595  return next;
596  }
597 
598  // if we are here the thread is a sleeper
599  //
600  // we attempt to catch the thread before it goes to futex(). If we are able
601  // to catch the thread before it sleeps on a futex, we are done, and don't
602  // need to go any further
603  //
604  // if we are not able to catch the thread before it goes to futex, we
605  // collect the current thread in the list of sleeping threads represented by
606  // sleepers, and return the next thread in the list and return false along
607  // with the previous next value
608  //
609  // it is safe to read the next_ pointer in the waiter struct if we were
610  // unable to catch the thread before it went to futex() because we use
611  // acquire-release ordering for the exchange operation below. And if we see
612  // that the thread was already sleeping, we have synchronized with the write
613  // to next_ in the context of the sleeping thread
614  //
615  // Also we need to set the value of waiters_ and wakerMetadata_ in the
616  // thread before doing the exchange because we need to pass on the list of
617  // sleepers in the event that we were able to catch the thread before it
618  // went to futex(). If we were unable to catch the thread before it slept,
619  // these fields will be ignored when the thread wakes up anyway
620  DCHECK(isSleeper(value));
621  (*waiter)->wakerMetadata_ = metadata;
622  (*waiter)->waiters_ = reinterpret_cast<std::uintptr_t>(sleepers);
623  auto pre = (*waiter)->sleeper_.exchange(kSleeping, std::memory_order_acq_rel);
624 
625  // we were able to catch the thread before it went to sleep, return true
626  if (pre != kSleeping) {
627  return 0;
628  }
629 
630  // otherwise return false, with the value of next_, it is safe to read next
631  // because of the same logic as when a thread was preempted
632  //
633  // we also need to collect this sleeper in the list of sleepers being built
634  // up
635  auto next = (*waiter)->next_;
636  (*waiter)->next_ = reinterpret_cast<std::uintptr_t>(sleepers);
637  sleepers = waiter;
638  return next;
639 }
640 
641 template <typename Waiter>
642 bool wake(
643  bool publishing,
644  Waiter& waiter,
645  WakerMetadata metadata,
646  Waiter*& sleepers) {
647  // loop till we find a node that is either at the end of the list (as
648  // specified by metadata) or we find a node that is active (as specified by
649  // the last published timestamp of the node)
650  auto current = &waiter;
651  while (current) {
652  auto value = (*current)->futex_.load(std::memory_order_acquire);
653  auto next = tryWake(publishing, current, value, metadata, sleepers);
654  if (!next) {
655  return true;
656  }
657 
658  // we need to read the value of the next node in the list before skipping
659  // it, this is because after we skip it the node might wake up and enqueue
660  // itself, and thereby gain a new next node
661  CHECK(publishing);
662  current =
663  (next == metadata.waker_) ? nullptr : extractAddress<Waiter>(next);
664  }
665 
666  return false;
667 }
668 
669 template <typename Atomic>
670 void wakeTimedWaiters(Atomic* state, bool timedWaiters) {
671  if (UNLIKELY(timedWaiters)) {
672  atomic_notify_one(state);
673  }
674 }
675 
676 template <typename Atomic, typename Proxy, typename Sleepers>
677 bool tryUnlockClean(Atomic& state, Proxy& proxy, Sleepers sleepers) {
678  auto expected = proxy.expected_;
679  while (true) {
680  if (state.compare_exchange_strong(
681  expected,
682  kUnlocked,
683  std::memory_order_release,
684  std::memory_order_relaxed)) {
685  // if we were able to commit an unlocked, we need to wake up the futex
686  // waiters, if any
687  doFutexWake(sleepers);
688  return true;
689  }
690 
691  // if we failed the compare_exchange_strong() above, we check to see if
692  // the failure was because of the presence of a timed waiter. If that
693  // was the case then we try one more time with the kTimedWaiter bit set
694  if (UNLIKELY(expected == (proxy.expected_ | kTimedWaiter))) {
695  proxy.timedWaiters_ = true;
696  continue;
697  }
698 
699  // otherwise break, we have a contention chain
700  return false;
701  }
702 }
703 
704 template <template <typename> class Atomic, bool Publish>
707  // we always wake up ready threads and timed waiters if we saw either
708  DCHECK(proxy) << "Invalid proxy passed to DistributedMutex::unlock()";
709  SCOPE_EXIT {
710  doFutexWake(proxy.ready_);
712  };
713 
714  // if there is a wait queue we are responsible for, try and start wakeups,
715  // don't bother with the mutex state
716  auto sleepers = proxy.waiters_;
717  if (proxy.next_) {
718  if (wake(Publish, *proxy.next_, proxy.wakerMetadata_, sleepers)) {
719  return;
720  }
721 
722  // At this point, if are in the if statement, we were not the terminal
723  // node of the wakeup chain. Terminal nodes have the next_ pointer set to
724  // null in lock()
725  //
726  // So we need to pretend we were the end of the contention chain. Coming
727  // out of a contention chain always has the kLocked state set in the
728  // mutex. Unless there is another contention chain lined up, which does
729  // not matter since we are the terminal node anyway
730  proxy.expected_ = kLocked;
731  }
732 
733  while (true) {
734  // otherwise, since we don't have anyone we need to wake up, we try and
735  // release the mutex just as is
736  //
737  // if this is successful, we can return, the unlock was successful, we have
738  // committed a nice kUnlocked to the central storage, yay
739  if (tryUnlockClean(state_, proxy, sleepers)) {
740  return;
741  }
742 
743  // here we have a contention chain built up on the mutex. We grab the
744  // wait queue and start executing wakeups. We leave a locked bit on the
745  // centralized storage and handoff control to the head of the queue
746  //
747  // we use memory_order_acq_rel here because we want to see the
748  // full well-initialized node that the other thread is waiting on
749  //
750  // If we are unable to wake the contention chain, it is possible that when
751  // we come back to looping here, a new contention chain will form. In
752  // that case we need to use kLocked as the waker_ value because the
753  // terminal node of the new chain will see kLocked in the central storage
754  auto head = state_.exchange(kLocked, std::memory_order_acq_rel);
756  auto next = extractAddress<CachelinePadded<Waiter<Atomic>>>(head);
757  DCHECK((head & kLocked) && (head != kLocked)) << "incorrect state " << head;
758  if (wake(Publish, *next, {exchange(proxy.expected_, kLocked)}, sleepers)) {
759  break;
760  }
761  }
762 }
763 
764 template <template <typename> class Atomic, bool TimePublishing>
767  // Try and set the least significant bit of the centralized lock state to 1,
768  // indicating locked.
769  //
770  // If this succeeds, it must have been the case that we had a kUnlocked (or
771  // 0) in the centralized storage before, since that is the only case where a
772  // 0 can be found. So we assert that in debug mode
773  //
774  // If this fails, then it is a no-op
775  auto previous = atomic_fetch_set(state_, 0, std::memory_order_acquire);
776  if (!previous) {
777  return {nullptr, kLocked};
778  }
779 
780  return {nullptr, 0};
781 }
782 
783 template <typename Atomic, typename Deadline, typename MakeProxy>
784 auto timedLock(Atomic& state, Deadline deadline, MakeProxy proxy) {
785  while (true) {
786  // we put a bit on the central state to show that there is a timed waiter
787  // and go to sleep on the central state
788  //
789  // when this thread goes to unlock the mutex, it will expect a 0b1 in the
790  // mutex state (0b1, not 0b11), but then it will see that the value in the
791  // mutex state is 0b11 and not 0b1, meaning that there might have been
792  // another timed waiter. Even though there might not have been another
793  // timed waiter in the time being. This sort of missed wakeup is
794  // desirable for timed waiters; it helps avoid thundering herds of timed
795  // waiters. Because the mutex is packed in 8 bytes, and we need an
796  // address to be stored in those 8 bytes, we don't have much room to play
797  // with. The only other solution is to issue a futexWake(INT_MAX) to wake
798  // up all waiters when a clean unlock is committed, when a thread saw a
799  // timed waiter in the mutex previously.
800  //
801  // putting a 0b11 here works for a set of reasons that is a superset of
802  // the set of reasons that make it okay to put a kLocked (0b1) in the
803  // mutex state. Now that the thread has put (kTimedWaiter | kLocked)
804  // (0b11) in the mutex state and it expects a kLocked (0b1), there are two
805  // scenarios possible. The first being when there is no contention chain
806  // formation in the mutex from the time a timed waiter got a lock to
807  // unlock. In this case, the unlocker sees a 0b11 in the mutex state,
808  // adjusts to the presence of a timed waiter and cleanly unlocks with a
809  // kUnlocked (0b0). The second is when there is a contention chain.
810  // When a thread puts its address in the mutex and sees the timed bit, it
811  // records the presence of a timed waiter, and then pretends as if it
812  // hadn't seen the timed bit. So future contention chain releases, will
813  // terminate with a kLocked (0b1) and not a (kLocked | kTimedWaiter)
814  // (0b11). This just works naturally with the rest of the algorithm
815  // without incurring a perf hit for the regular non-timed case
816  //
817  // this strategy does however mean, that when threads try to acquire the
818  // mutex and all time out, there will be a wasteful syscall to issue wakeups
819  // to waiting threads. We don't do anything to try and minimize this
820  //
821  // we need to use a fetch_or() here because we need to convey two bits of
822  // information - 1, whether the mutex is locked or not, and 2, whether
823  // there is a timed waiter. The alternative here is to use the second bit
824  // to convey information only, we can use a fetch_set() on the second bit
825  // to make this faster, but that comes at the expense of requiring regular
826  // fast path lock attempts. Which use a single bit read-modify-write for
827  // better performance
828  auto data = kTimedWaiter | kLocked;
829  auto previous = state.fetch_or(data, std::memory_order_acquire);
830  if (!(previous & 0b1)) {
831  DCHECK(!previous);
832  return proxy(nullptr, kLocked, true);
833  }
834 
835  // wait on the futex until signalled, if we get a timeout, the try_lock
836  // fails
837  auto result = atomic_wait_until(&state, previous | data, deadline);
838  if (result == std::cv_status::timeout) {
839  return proxy(nullptr, std::uintptr_t{0}, false);
840  }
841  }
842 }
843 
844 template <template <typename> class Atomic, bool TimePublishing>
845 template <typename Clock, typename Duration>
848  const std::chrono::time_point<Clock, Duration>& deadline) {
849  // fast path for the uncontended case
850  //
851  // we get the time after trying to acquire the mutex because in the
852  // uncontended case, the price of getting the time is about 1/3 of the
853  // actual mutex acquisition. So we only pay the price of that extra bit of
854  // latency when needed
855  //
856  // this is even higher when VDSO is involved on architectures that do not
857  // offer a direct interface to the timestamp counter
858  if (auto state = try_lock()) {
859  return state;
860  }
861 
862  // fall back to the timed locking algorithm
863  using Proxy = DistributedMutexStateProxy;
864  return timedLock(state_, deadline, [](auto... as) { return Proxy{as...}; });
865 }
866 
867 template <template <typename> class Atomic, bool TimePublishing>
868 template <typename Rep, typename Period>
871  const std::chrono::duration<Rep, Period>& duration) {
872  // fast path for the uncontended case. Reasoning for doing this here is the
873  // same as in try_lock_until()
874  if (auto state = try_lock()) {
875  return state;
876  }
877 
878  // fall back to the timed locking algorithm
879  using Proxy = DistributedMutexStateProxy;
880  auto deadline = std::chrono::steady_clock::now() + duration;
881  return timedLock(state_, deadline, [](auto... as) { return Proxy{as...}; });
882 }
883 } // namespace distributed_mutex
884 } // namespace detail
885 } // namespace folly
bool wake(bool publishing, Waiter &waiter, WakerMetadata metadata, Waiter *&sleepers)
LogLevel max
Definition: LogLevel.cpp:31
Type * extractAddress(std::uintptr_t from)
std::chrono::steady_clock::time_point now()
Atom< std::uint32_t > Futex
Definition: Futex.h:51
#define SCOPE_EXIT
Definition: ScopeGuard.h:274
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
bool doFutexWait(Waiter *waiter, Waiter *&next)
FutexResult futexWait(const Futex *futex, uint32_t expected, uint32_t waitMask)
Definition: Futex-inl.h:100
bool preempted(std::uint64_t value)
bool atomic_fetch_set(Atomic &atomic, std::size_t bit, std::memory_order mo)
void recordTimedWaiterAndClearTimedBit(bool &timedWaiter, std::uintptr_t &previous)
std::uintptr_t tryWake(bool publishing, Waiter *waiter, std::uintptr_t value, WakerMetadata metadata, Waiter *&sleepers)
int current
std::uint64_t strip(std::chrono::nanoseconds t)
void wakeTimedWaiters(Atomic *state, bool timedWaiters)
std::uint64_t asm_rdtsc()
Definition: Asm.h:49
constexpr auto data(C &c) -> decltype(c.data())
Definition: Access.h:71
std::uint64_t recover(std::uint64_t from)
bool wait(Waiter *waiter, bool shouldSleep, Waiter *&next)
DistributedMutexStateProxy try_lock_until(const std::chrono::time_point< Clock, Duration > &deadline)
PUSHMI_INLINE_VAR constexpr struct folly::pushmi::operators::from_fn from
T exchange(T &obj, U &&new_value)
Definition: Utility.h:120
bool tryUnlockClean(Atomic &state, Proxy &proxy, Sleepers sleepers)
void atomic_notify_one(const std::atomic< Integer > *atomic)
DistributedMutexStateProxy try_lock_for(const std::chrono::duration< Rep, Period > &duration)
bool isSleeper(std::uintptr_t value)
uint64_t value(const typename LockFreeRingBuffer< T, Atom >::Cursor &rbcursor)
void swap(SwapTrackingAlloc< T > &, SwapTrackingAlloc< T > &)
Definition: F14TestUtil.h:414
#define UNLIKELY(x)
Definition: Likely.h:48
Collect as()
Definition: Base.h:811
std::chrono::nanoseconds time()
static void sleep() noexcept
Definition: Sleeper.h:47
state
Definition: http_parser.c:272
int futexWake(const Futex *futex, int count, uint32_t wakeMask)
Definition: Futex-inl.h:107
std::cv_status atomic_wait_until(const std::atomic< Integer > *atomic, Integer expected, const std::chrono::time_point< Clock, Duration > &deadline)
def next(obj)
Definition: ast.py:58
void asm_volatile_pause()
Definition: Asm.h:37
DistributedMutexStateProxy(CachelinePadded< Waiter< Atomic >> *next, std::uintptr_t expected, bool timedWaiter=false, WakerMetadata wakerMetadata={}, CachelinePadded< Waiter< Atomic >> *waiters=nullptr, CachelinePadded< Waiter< Atomic >> *ready=nullptr)
auto timedLock(Atomic &state, Deadline deadline, MakeProxy proxy)