107 template <
typename>
class Atom = std::atomic,
152 return done_.
ready();
160 disconnected_.
post();
164 disconnected_.
reset();
168 return disconnected_.
ready();
199 template <
typename Func>
202 std::is_nothrow_constructible<
205 "Try using a smaller function object that can fit in folly::Function " 206 "without allocation, or use the custom interface of requestFC() to " 207 "manage the requested function's arguments and results explicitly " 208 "in a custom request structure without allocation.");
209 fn_ = std::forward<Func>(fn);
284 return m_.try_lock();
294 template <
typename LockHolder>
302 template <
typename LockHolder>
304 l = LockHolder(
m_, std::defer_lock);
308 template <
typename OpFunc>
310 std::lock_guard<Mutex>
guard(
m_);
336 template <
typename OpFunc>
337 void requestFC(OpFunc&& opFn,
Rec* rec =
nullptr,
bool syncop =
true) {
338 auto dummy = [](Req&) {};
340 std::forward<OpFunc>(opFn),
347 template <
typename OpFunc,
typename FillFunc>
350 const FillFunc& fillFn,
352 bool syncop =
true) {
353 auto dummy = [](Req&) {};
355 std::forward<OpFunc>(opFn),
362 template <
typename OpFunc,
typename FillFunc,
typename ResFn>
365 const FillFunc& fillFn,
367 Rec* rec =
nullptr) {
370 std::forward<OpFunc>(opFn),
391 if (rec ==
nullptr) {
394 auto idx = rec->getIndex();
442 template <
typename OpFunc,
typename FillFunc,
typename ResFn>
445 const FillFunc& fillFn,
450 std::unique_lock<Mutex> l(this->
m_, std::defer_lock);
460 bool tc = (rec !=
nullptr);
467 if (rec ==
nullptr) {
483 Req& req = rec->getReq();
487 rec->setFn(std::forward<OpFunc>(opFn));
490 assert(!rec->isValid());
495 std::atomic_thread_fence(std::memory_order_seq_cst);
496 if (rec->isDisconnected()) {
497 rec->clearDisconnected();
505 Req& req = rec->getReq();
515 Rec& rec = recsPool_[idx];
517 auto head = recs_.load(std::memory_order_acquire);
519 if (recs_.compare_exchange_weak(head, idx)) {
526 return recs_.load(std::memory_order_acquire);
530 return recsPool_[idx].getNext();
558 }
while (combined < this->maxOps_);
583 std::lock_guard<Mutex>
guard(
m_);
587 if (count < maxOps_) {
608 while (!rec.isDone()) {
610 std::unique_lock<Mutex> l(
m_, std::defer_lock);
617 if (++count == 1000) {
632 throw std::runtime_error(
633 "FlatCombining::combinedOp(Req&) must be overridden in the derived" 634 " class if called.");
644 Req& req = rec.getReq();
647 rec.setLast(passes_);
655 while (idx != NULL_INDEX) {
656 Rec& rec = recsPool_[idx];
657 auto next = rec.getNext();
658 bool valid = rec.isValid();
663 rec.setDisconnected();
665 std::atomic_thread_fence(std::memory_order_seq_cst);
666 valid = rec.isValid();
FOLLY_ALWAYS_INLINE bool ready() const noexcept
FOLLY_ALWAYS_INLINE void wait(const WaitOptions &opt=wait_options()) noexcept
folly::SaturatingSemaphore< true, Atom > pending_
FlatCombining(const bool dedicated=true, const uint32_t numRecs=0, const uint32_t maxOps=0)
void holdLock(LockHolder &l, std::defer_lock_t)
const uint32_t kDefaultMaxOps
void requestFC(OpFunc &&opFn, const FillFunc &fillFn, Rec *rec=nullptr, bool syncop=true)
uint64_t getNumPasses() const
void requestNoFC(OpFunc &opFn)
bool isDisconnected() const
—— Concurrent Priority Queue Implementation ——
folly::SaturatingSemaphore< false, Atom > valid_
size_t nextIndex(size_t idx)
folly::SaturatingSemaphore< false, Atom > disconnected_
FOLLY_ALWAYS_INLINE void post() noexcept
constexpr std::size_t hardware_destructive_interference_size
void holdLock(LockHolder &l)
folly::SaturatingSemaphore< false, Atom > done_
uint64_t getNumUncombined() const
const uint64_t kDefaultNumRecs
void requestFC(OpFunc &&opFn, const FillFunc &fillFn, const ResFn &resFn, Rec *rec=nullptr)
void awaitDoneTryLock(Rec &rec)
uint64_t getNumCombined() const
uint64_t combiningSession()
GuardImpl guard(ErrorHandler &&handler)
void processReq(Rec &rec)
void setNext(const size_t next)
uint32_t allocIndex(Args &&...args)
Combining request record.
void combinedOp(Req &)
The following member functions may be overridden for customization.
void setLast(const uint64_t pass)
const uint64_t kIdleThreshold
void dedicatedCombining()
uint64_t getNumSessions() const
void requestFC(OpFunc &&opFn, Rec *rec=nullptr, bool syncop=true)
uint64_t value(const typename LockFreeRingBuffer< T, Atom >::Cursor &rbcursor)
void recycleIndex(uint32_t idx)
Gives up ownership previously granted by alloc()
void requestOp(OpFunc &&opFn, const FillFunc &fillFn, const ResFn &resFn, Rec *rec, bool syncop, const bool custom)
void setIndex(const size_t index)
void asm_volatile_pause()