proxygen
LifoSem.h
Go to the documentation of this file.
1 /*
2  * Copyright 2014-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17 
18 #include <algorithm>
19 #include <atomic>
20 #include <cstdint>
21 #include <cstring>
22 #include <memory>
23 #include <system_error>
24 
25 #include <folly/CPortability.h>
26 #include <folly/CachelinePadded.h>
27 #include <folly/IndexedMemPool.h>
28 #include <folly/Likely.h>
29 #include <folly/lang/SafeAssert.h>
32 
33 namespace folly {
34 
35 template <
36  template <typename> class Atom = std::atomic,
37  class BatonType = SaturatingSemaphore<true, Atom>>
38 struct LifoSemImpl;
39 
94 typedef LifoSemImpl<> LifoSem;
95 
97 struct FOLLY_EXPORT ShutdownSemError : public std::runtime_error {
98  explicit ShutdownSemError(const std::string& msg);
99  ~ShutdownSemError() noexcept override;
100 };
101 
102 namespace detail {
103 
104 // Internally, a LifoSem is either a value or a linked list of wait nodes.
105 // This union is captured in the LifoSemHead type, which holds either a
106 // value or an indexed pointer to the list. LifoSemHead itself is a value
107 // type, the head is a mutable atomic box containing a LifoSemHead value.
108 // Each wait node corresponds to exactly one waiter. Values can flow
109 // through the semaphore either by going into and out of the head's value,
110 // or by direct communication from a poster to a waiter. The former path
111 // is taken when there are no pending waiters, the latter otherwise. The
112 // general flow of a post is to try to increment the value or pop-and-post
113 // a wait node. Either of those have the effect of conveying one semaphore
114 // unit. Waiting is the opposite, either a decrement of the value or
115 // push-and-wait of a wait node. The generic LifoSemBase abstracts the
116 // actual mechanism by which a wait node's post->wait communication is
117 // performed, which is why we have LifoSemRawNode and LifoSemNode.
118 
122 template <template <typename> class Atom>
125 
130 
131  bool isShutdownNotice() const {
132  return next == uint32_t(-1);
133  }
135  next = 0;
136  }
138  next = uint32_t(-1);
139  }
140 
142 
144  static Pool& pool();
145 };
146 
149 #define LIFOSEM_DECLARE_POOL(Atom, capacity) \
150  namespace folly { \
151  namespace detail { \
152  template <> \
153  LifoSemRawNode<Atom>::Pool& LifoSemRawNode<Atom>::pool() { \
154  static Pool* instance = new Pool((capacity)); \
155  return *instance; \
156  } \
157  } \
158  }
159 
165 template <typename Handoff, template <typename> class Atom>
166 struct LifoSemNode : public LifoSemRawNode<Atom> {
167  static_assert(
168  sizeof(Handoff) <= sizeof(LifoSemRawNode<Atom>::raw),
169  "Handoff too big for small-object optimization, use indirection");
170  static_assert(
171  alignof(Handoff) <= alignof(decltype(LifoSemRawNode<Atom>::raw)),
172  "Handoff alignment constraint not satisfied");
173 
174  template <typename... Args>
175  void init(Args&&... args) {
176  new (&this->raw) Handoff(std::forward<Args>(args)...);
177  }
178 
179  void destroy() {
180  handoff().~Handoff();
181 #ifndef NDEBUG
182  memset(&this->raw, 'F', sizeof(this->raw));
183 #endif
184  }
185 
186  Handoff& handoff() {
187  return *static_cast<Handoff*>(static_cast<void*>(&this->raw));
188  }
189 
190  const Handoff& handoff() const {
191  return *static_cast<const Handoff*>(static_cast<const void*>(&this->raw));
192  }
193 };
194 
195 template <typename Handoff, template <typename> class Atom>
198  elem->destroy();
199  auto idx = LifoSemRawNode<Atom>::pool().locateElem(elem);
201  }
202 };
203 
209 class LifoSemHead {
210  // What we really want are bitfields:
211  // uint64_t data : 32; uint64_t isNodeIdx : 1; uint64_t seq : 31;
212  // Unfortunately g++ generates pretty bad code for this sometimes (I saw
213  // -O3 code from gcc 4.7.1 copying the bitfields one at a time instead of
214  // in bulk, for example). We can generate better code anyway by assuming
215  // that setters won't be given values that cause under/overflow, and
216  // putting the sequence at the end where its planned overflow doesn't
217  // need any masking.
218  //
219  // data == 0 (empty list) with isNodeIdx is conceptually the same
220  // as data == 0 (no unclaimed increments) with !isNodeIdx, we always
221  // convert the former into the latter to make the logic simpler.
222  enum {
223  IsNodeIdxShift = 32,
224  IsShutdownShift = 33,
225  IsLockedShift = 34,
226  SeqShift = 35,
227  };
228  enum : uint64_t {
229  IsNodeIdxMask = uint64_t(1) << IsNodeIdxShift,
230  IsShutdownMask = uint64_t(1) << IsShutdownShift,
231  IsLockedMask = uint64_t(1) << IsLockedShift,
232  SeqIncr = uint64_t(1) << SeqShift,
233  SeqMask = ~(SeqIncr - 1),
234  };
235 
236  public:
238 
240 
241  inline uint32_t idx() const {
242  assert(isNodeIdx());
243  assert(uint32_t(bits) != 0);
244  return uint32_t(bits);
245  }
246  inline uint32_t value() const {
247  assert(!isNodeIdx());
248  return uint32_t(bits);
249  }
250  inline constexpr bool isNodeIdx() const {
251  return (bits & IsNodeIdxMask) != 0;
252  }
253  inline constexpr bool isShutdown() const {
254  return (bits & IsShutdownMask) != 0;
255  }
256  inline constexpr bool isLocked() const {
257  return (bits & IsLockedMask) != 0;
258  }
259  inline constexpr uint32_t seq() const {
260  return uint32_t(bits >> SeqShift);
261  }
262 
264 
267  static inline constexpr LifoSemHead fresh(uint32_t value) {
268  return LifoSemHead{value};
269  }
270 
273  inline LifoSemHead withPop(uint32_t idxNext) const {
274  assert(!isLocked());
275  assert(isNodeIdx());
276  if (idxNext == 0) {
277  // no isNodeIdx bit or data bits. Wraparound of seq bits is okay
278  return LifoSemHead{(bits & (SeqMask | IsShutdownMask)) + SeqIncr};
279  } else {
280  // preserve sequence bits (incremented with wraparound okay) and
281  // isNodeIdx bit, replace all data bits
282  return LifoSemHead{(bits & (SeqMask | IsShutdownMask | IsNodeIdxMask)) +
283  SeqIncr + idxNext};
284  }
285  }
286 
288  inline LifoSemHead withPush(uint32_t _idx) const {
289  assert(!isLocked());
290  assert(isNodeIdx() || value() == 0);
291  assert(!isShutdown());
292  assert(_idx != 0);
293  return LifoSemHead{(bits & SeqMask) | IsNodeIdxMask | _idx};
294  }
295 
298  inline LifoSemHead withValueIncr(uint32_t delta) const {
299  assert(!isLocked());
300  assert(!isNodeIdx());
301  auto rv = LifoSemHead{bits + SeqIncr + delta};
302  if (UNLIKELY(rv.isNodeIdx())) {
303  // value has overflowed into the isNodeIdx bit
304  rv = LifoSemHead{(rv.bits & ~IsNodeIdxMask) | (IsNodeIdxMask - 1)};
305  }
306  return rv;
307  }
308 
310  inline LifoSemHead withValueDecr(uint32_t delta) const {
311  assert(!isLocked());
312  assert(delta > 0 && delta <= value());
313  return LifoSemHead{bits + SeqIncr - delta};
314  }
315 
318  inline LifoSemHead withShutdown() const {
319  return LifoSemHead{bits | IsShutdownMask};
320  }
321 
322  // Returns LifoSemHead with lock bit set, but rest of bits unchanged.
323  inline LifoSemHead withLock() const {
324  assert(!isLocked());
325  return LifoSemHead{bits | IsLockedMask};
326  }
327 
328  // Returns LifoSemHead with lock bit unset, and updated seqno based
329  // on idx.
330  inline LifoSemHead withoutLock(uint32_t idxNext) const {
331  assert(isLocked());
332  // We need to treat this as a pop, as we may change the list head.
333  return LifoSemHead{bits & ~IsLockedMask}.withPop(idxNext);
334  }
335 
336  inline constexpr bool operator==(const LifoSemHead& rhs) const {
337  return bits == rhs.bits;
338  }
339  inline constexpr bool operator!=(const LifoSemHead& rhs) const {
340  return !(*this == rhs);
341  }
342 };
343 
351 template <typename Handoff, template <typename> class Atom = std::atomic>
352 struct LifoSemBase {
354  constexpr explicit LifoSemBase(uint32_t initialValue = 0)
355  : head_(LifoSemHead::fresh(initialValue)) {}
356 
357  LifoSemBase(LifoSemBase const&) = delete;
358  LifoSemBase& operator=(LifoSemBase const&) = delete;
359 
361  bool post() {
362  auto idx = incrOrPop(1);
363  if (idx != 0) {
364  idxToNode(idx).handoff().post();
365  return true;
366  }
367  return false;
368  }
369 
379  void post(uint32_t n) {
380  uint32_t idx;
381  while (n > 0 && (idx = incrOrPop(n)) != 0) {
382  // pop accounts for only 1
383  idxToNode(idx).handoff().post();
384  --n;
385  }
386  }
387 
389  bool isShutdown() const {
390  return UNLIKELY(head_->load(std::memory_order_acquire).isShutdown());
391  }
392 
398  void shutdown() {
399  // first set the shutdown bit
400  auto h = head_->load(std::memory_order_acquire);
401  while (!h.isShutdown()) {
402  if (head_->compare_exchange_strong(h, h.withShutdown())) {
403  // success
404  h = h.withShutdown();
405  break;
406  }
407  // compare_exchange_strong rereads h, retry
408  }
409 
410  // now wake up any waiters
411  while (h.isNodeIdx()) {
412  if (h.isLocked()) {
414  h = head_->load(std::memory_order_acquire);
415  continue;
416  }
417  auto& node = idxToNode(h.idx());
418  auto repl = h.withPop(node.next);
419  if (head_->compare_exchange_strong(h, repl)) {
420  // successful pop, wake up the waiter and move on. The next
421  // field is used to convey that this wakeup didn't consume a value
422  node.setShutdownNotice();
423  node.handoff().post();
424  h = repl;
425  }
426  }
427  }
428 
430  bool tryWait() {
431  uint32_t n = 1;
432  auto rv = decrOrPush(n, 0);
433  assert(
434  (rv == WaitResult::DECR && n == 0) ||
435  (rv != WaitResult::DECR && n == 1));
436  // SHUTDOWN is okay here, since we don't actually wait
437  return rv == WaitResult::DECR;
438  }
439 
444  auto const orig = n;
445  while (n > 0) {
446 #ifndef NDEBUG
447  auto prev = n;
448 #endif
449  auto rv = decrOrPush(n, 0);
450  assert(
451  (rv == WaitResult::DECR && n < prev) ||
452  (rv != WaitResult::DECR && n == prev));
453  if (rv != WaitResult::DECR) {
454  break;
455  }
456  }
457  return orig - n;
458  }
459 
465  void wait() {
466  auto const deadline = std::chrono::steady_clock::time_point::max();
467  auto res = try_wait_until(deadline);
468  FOLLY_SAFE_DCHECK(res, "infinity time has passed");
469  }
470 
471  template <typename Rep, typename Period>
472  bool try_wait_for(const std::chrono::duration<Rep, Period>& timeout) {
473  return try_wait_until(timeout + std::chrono::steady_clock::now());
474  }
475 
476  template <typename Clock, typename Duration>
478  const std::chrono::time_point<Clock, Duration>& deadline) {
479  // early check isn't required for correctness, but is an important
480  // perf win if we can avoid allocating and deallocating a node
481  if (tryWait()) {
482  return true;
483  }
484 
485  // allocateNode() won't compile unless Handoff has a default
486  // constructor
487  UniquePtr node = allocateNode();
488 
489  auto rv = tryWaitOrPush(*node);
490  if (UNLIKELY(rv == WaitResult::SHUTDOWN)) {
491  assert(isShutdown());
492  throw ShutdownSemError("wait() would block but semaphore is shut down");
493  }
494 
495  if (rv == WaitResult::PUSH) {
496  if (!node->handoff().try_wait_until(deadline)) {
497  if (tryRemoveNode(*node)) {
498  return false;
499  } else {
500  // We could not remove our node. Return to waiting.
501  //
502  // This only happens if we lose a removal race with post(),
503  // so we are not likely to wait long. This is only
504  // necessary to ensure we don't return node's memory back to
505  // IndexedMemPool before post() has had a chance to post to
506  // handoff(). In a stronger memory reclamation scheme, such
507  // as hazptr or rcu, this would not be necessary.
508  node->handoff().wait();
509  }
510  }
511  if (UNLIKELY(node->isShutdownNotice())) {
512  // this wait() didn't consume a value, it was triggered by shutdown
513  throw ShutdownSemError(
514  "blocking wait() interrupted by semaphore shutdown");
515  }
516 
517  // node->handoff().wait() can't return until after the node has
518  // been popped and post()ed, so it is okay for the UniquePtr to
519  // recycle the node now
520  }
521  // else node wasn't pushed, so it is safe to recycle
522  return true;
523  }
524 
529  // this is actually linearizable, but we don't promise that because
530  // we may want to add striping in the future to help under heavy
531  // contention
532  auto h = head_->load(std::memory_order_acquire);
533  return h.isNodeIdx() ? 0 : h.value();
534  }
535 
536  protected:
537  enum class WaitResult {
538  PUSH,
539  DECR,
540  SHUTDOWN,
541  };
542 
545  typedef std::
546  unique_ptr<LifoSemNode<Handoff, Atom>, LifoSemNodeRecycler<Handoff, Atom>>
548 
550  template <typename... Args>
552  auto idx = LifoSemRawNode<Atom>::pool().allocIndex();
553  if (idx != 0) {
554  auto& node = idxToNode(idx);
555  node.clearShutdownNotice();
556  try {
557  node.init(std::forward<Args>(args)...);
558  } catch (...) {
560  throw;
561  }
562  return UniquePtr(&node);
563  } else {
564  return UniquePtr();
565  }
566  }
567 
575  uint32_t n = 1;
576  return decrOrPush(n, nodeToIdx(waiterNode));
577  }
578 
579  private:
581 
583  auto raw = &LifoSemRawNode<Atom>::pool()[idx];
584  return *static_cast<LifoSemNode<Handoff, Atom>*>(raw);
585  }
586 
588  return LifoSemRawNode<Atom>::pool().locateElem(&node);
589  }
590 
591  // Locks the list head (blocking concurrent pushes and pops)
592  // and attempts to remove this node. Returns true if node was
593  // found and removed, false if not found.
594  bool tryRemoveNode(const LifoSemNode<Handoff, Atom>& removenode) {
595  auto removeidx = nodeToIdx(removenode);
596  auto head = head_->load(std::memory_order_acquire);
597  // Try to lock the head.
598  while (true) {
599  if (head.isLocked()) {
601  head = head_->load(std::memory_order_acquire);
602  continue;
603  }
604  if (!head.isNodeIdx()) {
605  return false;
606  }
607  if (head_->compare_exchange_weak(
608  head,
609  head.withLock(),
610  std::memory_order_acquire,
611  std::memory_order_relaxed)) {
612  break;
613  }
614  }
615  // Update local var to what head_ is, for better assert() checking.
616  head = head.withLock();
617  bool result = false;
618  auto idx = head.idx();
619  if (idx == removeidx) {
620  // pop from head. Head seqno is updated.
621  head_->store(
622  head.withoutLock(removenode.next), std::memory_order_release);
623  return true;
624  }
625  auto node = &idxToNode(idx);
626  idx = node->next;
627  while (idx) {
628  if (idx == removeidx) {
629  // Pop from mid-list.
630  node->next = removenode.next;
631  result = true;
632  break;
633  }
634  node = &idxToNode(idx);
635  idx = node->next;
636  }
637  // Unlock and return result
638  head_->store(head.withoutLock(head.idx()), std::memory_order_release);
639  return result;
640  }
641 
646  while (true) {
647  assert(n > 0);
648 
649  auto head = head_->load(std::memory_order_acquire);
650  if (head.isLocked()) {
652  continue;
653  }
654  if (head.isNodeIdx()) {
655  auto& node = idxToNode(head.idx());
656  if (head_->compare_exchange_strong(head, head.withPop(node.next))) {
657  // successful pop
658  return head.idx();
659  }
660  } else {
661  auto after = head.withValueIncr(n);
662  if (head_->compare_exchange_strong(head, after)) {
663  // successful incr
664  return 0;
665  }
666  }
667  // retry
668  }
669  }
670 
679  assert(n > 0);
680 
681  while (true) {
682  auto head = head_->load(std::memory_order_acquire);
683 
684  if (head.isLocked()) {
686  continue;
687  }
688 
689  if (!head.isNodeIdx() && head.value() > 0) {
690  // decr
691  auto delta = std::min(n, head.value());
692  if (head_->compare_exchange_strong(head, head.withValueDecr(delta))) {
693  n -= delta;
694  return WaitResult::DECR;
695  }
696  } else {
697  // push
698  if (idx == 0) {
699  return WaitResult::PUSH;
700  }
701 
702  if (UNLIKELY(head.isShutdown())) {
703  return WaitResult::SHUTDOWN;
704  }
705 
706  auto& node = idxToNode(idx);
707  node.next = head.isNodeIdx() ? head.idx() : 0;
708  if (head_->compare_exchange_strong(head, head.withPush(idx))) {
709  // push succeeded
710  return WaitResult::PUSH;
711  }
712  }
713  }
714  // retry
715  }
716 };
717 
718 } // namespace detail
719 
720 template <template <typename> class Atom, class BatonType>
721 struct LifoSemImpl : public detail::LifoSemBase<BatonType, Atom> {
722  constexpr explicit LifoSemImpl(uint32_t v = 0)
723  : detail::LifoSemBase<BatonType, Atom>(v) {}
724 };
725 
726 } // namespace folly
LifoSemImpl LifoSem
Definition: LifoSem.h:38
static constexpr LifoSemHead fresh(uint32_t value)
Definition: LifoSem.h:267
LifoSemHead withValueIncr(uint32_t delta) const
Definition: LifoSem.h:298
*than *hazptr_holder h
Definition: Hazptr.h:116
LogLevel max
Definition: LogLevel.cpp:31
static uint32_t nodeToIdx(const LifoSemNode< Handoff, Atom > &node)
Definition: LifoSem.h:587
void init(Args &&...args)
Definition: LifoSem.h:175
uint32_t tryWait(uint32_t n)
Definition: LifoSem.h:443
PskType type
constexpr bool isNodeIdx() const
Definition: LifoSem.h:250
std::chrono::steady_clock::time_point now()
std::unique_ptr< LifoSemNode< BatonType, Atom >, LifoSemNodeRecycler< BatonType, Atom > > UniquePtr
Definition: LifoSem.h:547
bool post()
Silently saturates if value is already 2^32-1.
Definition: LifoSem.h:361
internal::ArgsMatcher< InnerMatcher > Args(const InnerMatcher &matcher)
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
constexpr LifoSemImpl(uint32_t v=0)
Definition: LifoSem.h:722
#define FOLLY_EXPORT
Definition: CPortability.h:133
LifoSemHead withShutdown() const
Definition: LifoSem.h:318
requires E e noexcept(noexcept(s.error(std::move(e))))
constexpr bool isShutdown() const
Definition: LifoSem.h:253
std::aligned_storage< sizeof(void *), alignof(void *)>::type raw
Definition: LifoSem.h:124
FOLLY_PUSH_WARNING RHS rhs
Definition: Traits.h:649
uint32_t locateElem(const T *elem) const
bool tryRemoveNode(const LifoSemNode< Handoff, Atom > &removenode)
Definition: LifoSem.h:594
const Handoff & handoff() const
Definition: LifoSem.h:190
uint32_t valueGuess() const
Definition: LifoSem.h:528
static LifoSemNode< Handoff, Atom > & idxToNode(uint32_t idx)
Definition: LifoSem.h:582
constexpr LifoSemBase(uint32_t initialValue=0)
Constructor.
Definition: LifoSem.h:354
LogLevel min
Definition: LogLevel.cpp:30
UniquePtr allocateNode(Args &&...args)
Returns a node that can be passed to decrOrLink.
Definition: LifoSem.h:551
bool tryWait()
Returns true iff value was decremented.
Definition: LifoSem.h:430
#define Atom
constexpr uint32_t seq() const
Definition: LifoSem.h:259
bool isShutdown() const
Returns true iff shutdown() has been called.
Definition: LifoSem.h:389
LifoSemHead withValueDecr(uint32_t delta) const
Returns the LifoSemHead that results from decrementing the value.
Definition: LifoSem.h:310
uint32_t incrOrPop(uint32_t n)
Definition: LifoSem.h:645
uint32_t allocIndex(Args &&...args)
constexpr bool operator!=(const LifoSemHead &rhs) const
Definition: LifoSem.h:339
LifoSemHead withoutLock(uint32_t idxNext) const
Definition: LifoSem.h:330
const char * string
Definition: Conv.cpp:212
bool try_wait_until(const std::chrono::time_point< Clock, Duration > &deadline)
Definition: LifoSem.h:477
constexpr bool isLocked() const
Definition: LifoSem.h:256
bool try_wait_for(const std::chrono::duration< Rep, Period > &timeout)
Definition: LifoSem.h:472
constexpr bool operator==(const LifoSemHead &rhs) const
Definition: LifoSem.h:336
uint64_t value(const typename LockFreeRingBuffer< T, Atom >::Cursor &rbcursor)
WaitResult decrOrPush(uint32_t &n, uint32_t idx)
Definition: LifoSem.h:678
void recycleIndex(uint32_t idx)
Gives up ownership previously granted by alloc()
The exception thrown when wait()ing on an isShutdown() LifoSem.
Definition: LifoSem.h:97
LifoSemHead withPush(uint32_t _idx) const
Returns the LifoSemHead that results from pushing a new waiter node.
Definition: LifoSem.h:288
uint32_t value() const
Definition: LifoSem.h:246
LifoSemHead withPop(uint32_t idxNext) const
Definition: LifoSem.h:273
uint32_t idx() const
Definition: LifoSem.h:241
bool isShutdownNotice() const
Definition: LifoSem.h:131
#define UNLIKELY(x)
Definition: Likely.h:48
void operator()(LifoSemNode< Handoff, Atom > *elem) const
Definition: LifoSem.h:197
WaitResult tryWaitOrPush(LifoSemNode< Handoff, Atom > &waiterNode)
Definition: LifoSem.h:574
void post(uint32_t n)
Definition: LifoSem.h:379
CachelinePadded< folly::AtomicStruct< LifoSemHead, Atom > > head_
Definition: LifoSem.h:580
#define FOLLY_SAFE_DCHECK(expr, msg)
Definition: SafeAssert.h:42
LifoSemHead withLock() const
Definition: LifoSem.h:323
static Pool & pool()
Storage for all of the waiter nodes for LifoSem-s that use Atom.
folly::IndexedMemPool< LifoSemRawNode< Atom >, 32, 200, Atom > Pool
Definition: LifoSem.h:141