28 #include <glog/logging.h> 76 typename PriorityQueue = std::priority_queue<T>,
78 template <
typename>
class Atom = std::atomic>
79 class FlatCombiningPriorityQueue
81 FlatCombiningPriorityQueue<T, PriorityQueue, Mutex, Atom>,
84 using FCPQ = FlatCombiningPriorityQueue<T, PriorityQueue, Mutex, Atom>;
90 typename = decltype(PriorityQueue(std::declval<PQArgs>()...))>
91 explicit FlatCombiningPriorityQueue(
93 const size_t maxSize = 0,
102 pq_(
std::forward<PQArgs>(args)...) {}
107 auto fn = [&] { res = pq_.empty(); };
108 const_cast<FCPQ*
>(
this)->requestFC(fn);
113 size_t size()
const {
115 auto fn = [&] { res = pq_.size(); };
116 const_cast<FCPQ*
>(
this)->requestFC(fn);
125 bool try_push(
const T&
val) {
126 return try_push_impl(
134 bool try_pop(
T& val) {
143 bool try_peek(
T& val) {
144 return try_peek_impl(
151 void push(
const T& val) {
188 template <
typename Rep,
typename Period>
190 const std::chrono::duration<Rep, Period>& timeout) {
199 template <
typename Rep,
typename Period>
202 const std::chrono::duration<Rep, Period>& timeout) {
208 template <
typename Rep,
typename Period>
210 const std::chrono::duration<Rep, Period>& timeout) {
219 template <
typename Clock,
typename Duration>
221 const std::chrono::time_point<Clock, Duration>& deadline) {
223 if (try_pop_impl(val, deadline)) {
229 template <
typename Clock,
typename Duration>
232 const std::chrono::time_point<Clock, Duration>& deadline) {
233 return try_push_impl(val, deadline);
236 template <
typename Clock,
typename Duration>
238 const std::chrono::time_point<Clock, Duration>& deadline) {
240 if (try_peek_impl(val, deadline)) {
249 detail::Futex<Atom> empty_{};
250 detail::Futex<Atom> full_{};
252 bool isTrue(detail::Futex<Atom>& futex) {
253 return futex.load(std::memory_order_relaxed) != 0;
256 void setFutex(detail::Futex<Atom>& futex,
uint32_t val) {
257 futex.store(val, std::memory_order_relaxed);
260 bool futexSignal(detail::Futex<Atom>& futex) {
269 template <
typename Clock,
typename Duration>
272 const std::chrono::time_point<Clock, Duration>&
when);
274 template <
typename Clock,
typename Duration>
277 const std::chrono::time_point<Clock, Duration>&
when);
279 template <
typename Clock,
typename Duration>
282 const std::chrono::time_point<Clock, Duration>&
when);
289 typename PriorityQueue,
291 template <
typename>
class Atom>
292 template <
typename Clock,
typename Duration>
294 FlatCombiningPriorityQueue<T, PriorityQueue, Mutex, Atom>::try_push_impl(
296 const std::chrono::time_point<Clock, Duration>&
when) {
302 if (maxSize_ > 0 && pq_.size() == maxSize_) {
307 DCHECK(maxSize_ == 0 || pq_.size() < maxSize_);
310 wake = futexSignal(empty_);
313 }
catch (
const std::bad_alloc&) {
330 while (isTrue(full_)) {
346 typename PriorityQueue,
348 template <
typename>
class Atom>
349 template <
typename Clock,
typename Duration>
351 FlatCombiningPriorityQueue<T, PriorityQueue, Mutex, Atom>::try_pop_impl(
353 const std::chrono::time_point<Clock, Duration>& when) {
363 wake = futexSignal(full_);
376 while (isTrue(empty_)) {
392 typename PriorityQueue,
394 template <
typename>
class Atom>
395 template <
typename Clock,
typename Duration>
397 FlatCombiningPriorityQueue<T, PriorityQueue, Mutex, Atom>::try_peek_impl(
399 const std::chrono::time_point<Clock, Duration>& when) {
416 while (isTrue(empty_)) {
bool wake(bool publishing, Waiter &waiter, WakerMetadata metadata, Waiter *&sleepers)
std::chrono::steady_clock::time_point now()
constexpr detail::Map< Move > move
—— Concurrent Priority Queue Implementation ——
FutexResult futexWait(const Futex *futex, uint32_t expected, uint32_t waitMask)
constexpr auto size(C const &c) -> decltype(c.size())
constexpr auto empty(C const &c) -> decltype(c.empty())
folly::FlatCombiningPriorityQueue< int > FCPQ
FutexResult futexWaitUntil(const Futex *futex, uint32_t expected, std::chrono::time_point< Clock, Duration > const &deadline, uint32_t waitMask)
Future< Unit > when(bool p, F &&thunk)
int futexWake(const Futex *futex, int count, uint32_t wakeMask)