23 #include <system_error> 36 template <
typename>
class Atom = std::atomic,
37 class BatonType = SaturatingSemaphore<true, Atom>>
99 ~ShutdownSemError()
noexcept override;
122 template <
template <
typename>
class Atom>
149 #define LIFOSEM_DECLARE_POOL(Atom, capacity) \ 153 LifoSemRawNode<Atom>::Pool& LifoSemRawNode<Atom>::pool() { \ 154 static Pool* instance = new Pool((capacity)); \ 165 template <
typename Handoff,
template <
typename>
class Atom>
169 "Handoff too big for small-object optimization, use indirection");
172 "Handoff alignment constraint not satisfied");
174 template <
typename...
Args>
176 new (&this->raw) Handoff(std::forward<Args>(args)...);
180 handoff().~Handoff();
182 memset(&this->raw,
'F',
sizeof(this->raw));
187 return *
static_cast<Handoff*
>(
static_cast<void*
>(&this->raw));
191 return *
static_cast<const Handoff*
>(
static_cast<const void*
>(&this->raw));
195 template <
typename Handoff,
template <
typename>
class Atom>
224 IsShutdownShift = 33,
233 SeqMask = ~(SeqIncr - 1),
247 assert(!isNodeIdx());
251 return (bits & IsNodeIdxMask) != 0;
254 return (bits & IsShutdownMask) != 0;
257 return (bits & IsLockedMask) != 0;
278 return LifoSemHead{(bits & (SeqMask | IsShutdownMask)) + SeqIncr};
282 return LifoSemHead{(bits & (SeqMask | IsShutdownMask | IsNodeIdxMask)) +
290 assert(isNodeIdx() ||
value() == 0);
291 assert(!isShutdown());
293 return LifoSemHead{(bits & SeqMask) | IsNodeIdxMask | _idx};
300 assert(!isNodeIdx());
304 rv =
LifoSemHead{(rv.bits & ~IsNodeIdxMask) | (IsNodeIdxMask - 1)};
312 assert(delta > 0 && delta <=
value());
337 return bits == rhs.
bits;
340 return !(*
this ==
rhs);
351 template <
typename Handoff,
template <
typename>
class Atom = std::atomic>
362 auto idx = incrOrPop(1);
364 idxToNode(idx).handoff().post();
381 while (n > 0 && (idx = incrOrPop(n)) != 0) {
383 idxToNode(idx).handoff().post();
390 return UNLIKELY(head_->load(std::memory_order_acquire).isShutdown());
400 auto h = head_->load(std::memory_order_acquire);
401 while (!
h.isShutdown()) {
402 if (head_->compare_exchange_strong(
h,
h.withShutdown())) {
404 h =
h.withShutdown();
411 while (
h.isNodeIdx()) {
414 h = head_->load(std::memory_order_acquire);
417 auto& node = idxToNode(
h.idx());
418 auto repl =
h.withPop(node.next);
419 if (head_->compare_exchange_strong(
h, repl)) {
422 node.setShutdownNotice();
423 node.handoff().post();
432 auto rv = decrOrPush(n, 0);
434 (rv == WaitResult::DECR && n == 0) ||
435 (rv != WaitResult::DECR && n == 1));
437 return rv == WaitResult::DECR;
449 auto rv = decrOrPush(n, 0);
451 (rv == WaitResult::DECR && n < prev) ||
452 (rv != WaitResult::DECR && n == prev));
453 if (rv != WaitResult::DECR) {
467 auto res = try_wait_until(deadline);
471 template <
typename Rep,
typename Period>
476 template <
typename Clock,
typename Duration>
478 const std::chrono::time_point<Clock, Duration>& deadline) {
489 auto rv = tryWaitOrPush(*node);
490 if (
UNLIKELY(rv == WaitResult::SHUTDOWN)) {
491 assert(isShutdown());
495 if (rv == WaitResult::PUSH) {
496 if (!node->handoff().try_wait_until(deadline)) {
497 if (tryRemoveNode(*node)) {
508 node->handoff().wait();
511 if (
UNLIKELY(node->isShutdownNotice())) {
514 "blocking wait() interrupted by semaphore shutdown");
532 auto h = head_->load(std::memory_order_acquire);
533 return h.isNodeIdx() ? 0 :
h.value();
550 template <
typename...
Args>
554 auto& node = idxToNode(idx);
555 node.clearShutdownNotice();
557 node.init(std::forward<Args>(args)...);
576 return decrOrPush(n, nodeToIdx(waiterNode));
595 auto removeidx = nodeToIdx(removenode);
596 auto head = head_->load(std::memory_order_acquire);
599 if (head.isLocked()) {
601 head = head_->load(std::memory_order_acquire);
604 if (!head.isNodeIdx()) {
607 if (head_->compare_exchange_weak(
610 std::memory_order_acquire,
611 std::memory_order_relaxed)) {
616 head = head.withLock();
618 auto idx = head.idx();
619 if (idx == removeidx) {
622 head.withoutLock(removenode.
next), std::memory_order_release);
625 auto node = &idxToNode(idx);
628 if (idx == removeidx) {
630 node->next = removenode.
next;
634 node = &idxToNode(idx);
638 head_->store(head.withoutLock(head.idx()), std::memory_order_release);
649 auto head = head_->load(std::memory_order_acquire);
650 if (head.isLocked()) {
654 if (head.isNodeIdx()) {
655 auto& node = idxToNode(head.idx());
656 if (head_->compare_exchange_strong(head, head.withPop(node.next))) {
661 auto after = head.withValueIncr(n);
662 if (head_->compare_exchange_strong(head, after)) {
682 auto head = head_->load(std::memory_order_acquire);
684 if (head.isLocked()) {
689 if (!head.isNodeIdx() && head.value() > 0) {
691 auto delta =
std::min(n, head.value());
692 if (head_->compare_exchange_strong(head, head.withValueDecr(delta))) {
694 return WaitResult::DECR;
699 return WaitResult::PUSH;
703 return WaitResult::SHUTDOWN;
706 auto& node = idxToNode(idx);
707 node.next = head.isNodeIdx() ? head.idx() : 0;
708 if (head_->compare_exchange_strong(head, head.withPush(idx))) {
710 return WaitResult::PUSH;
720 template <
template <
typename>
class Atom,
class BatonType>
static constexpr LifoSemHead fresh(uint32_t value)
LifoSemHead withValueIncr(uint32_t delta) const
static uint32_t nodeToIdx(const LifoSemNode< Handoff, Atom > &node)
void init(Args &&...args)
uint32_t tryWait(uint32_t n)
constexpr bool isNodeIdx() const
std::chrono::steady_clock::time_point now()
std::unique_ptr< LifoSemNode< BatonType, Atom >, LifoSemNodeRecycler< BatonType, Atom > > UniquePtr
bool post()
Silently saturates if value is already 2^32-1.
internal::ArgsMatcher< InnerMatcher > Args(const InnerMatcher &matcher)
—— Concurrent Priority Queue Implementation ——
constexpr LifoSemImpl(uint32_t v=0)
LifoSemHead withShutdown() const
requires E e noexcept(noexcept(s.error(std::move(e))))
constexpr bool isShutdown() const
std::aligned_storage< sizeof(void *), alignof(void *)>::type raw
FOLLY_PUSH_WARNING RHS rhs
uint32_t locateElem(const T *elem) const
bool tryRemoveNode(const LifoSemNode< Handoff, Atom > &removenode)
const Handoff & handoff() const
uint32_t valueGuess() const
static LifoSemNode< Handoff, Atom > & idxToNode(uint32_t idx)
constexpr LifoSemBase(uint32_t initialValue=0)
Constructor.
UniquePtr allocateNode(Args &&...args)
Returns a node that can be passed to decrOrLink.
bool tryWait()
Returns true iff value was decremented.
constexpr uint32_t seq() const
bool isShutdown() const
Returns true iff shutdown() has been called.
LifoSemHead withValueDecr(uint32_t delta) const
Returns the LifoSemHead that results from decrementing the value.
uint32_t incrOrPop(uint32_t n)
uint32_t allocIndex(Args &&...args)
constexpr bool operator!=(const LifoSemHead &rhs) const
LifoSemHead withoutLock(uint32_t idxNext) const
bool try_wait_until(const std::chrono::time_point< Clock, Duration > &deadline)
constexpr bool isLocked() const
bool try_wait_for(const std::chrono::duration< Rep, Period > &timeout)
constexpr bool operator==(const LifoSemHead &rhs) const
uint64_t value(const typename LockFreeRingBuffer< T, Atom >::Cursor &rbcursor)
WaitResult decrOrPush(uint32_t &n, uint32_t idx)
void recycleIndex(uint32_t idx)
Gives up ownership previously granted by alloc()
The exception thrown when wait()ing on an isShutdown() LifoSem.
LifoSemHead withPush(uint32_t _idx) const
Returns the LifoSemHead that results from pushing a new waiter node.
LifoSemHead withPop(uint32_t idxNext) const
bool isShutdownNotice() const
void operator()(LifoSemNode< Handoff, Atom > *elem) const
WaitResult tryWaitOrPush(LifoSemNode< Handoff, Atom > &waiterNode)
CachelinePadded< folly::AtomicStruct< LifoSemHead, Atom > > head_
#define FOLLY_SAFE_DCHECK(expr, msg)
LifoSemHead withLock() const
static Pool & pool()
Storage for all of the waiter nodes for LifoSem-s that use Atom.
void clearShutdownNotice()
folly::IndexedMemPool< LifoSemRawNode< Atom >, 32, 200, Atom > Pool