22 #include <glog/logging.h> 32 template <
typename T,
bool MayBlock>
35 template <
typename T,
bool MayBlock>
38 template <
typename T,
bool MayBlock>
41 template <
typename T,
bool MayBlock>
44 template <
template <
typename,
bool>
class Q,
bool MayBlock>
72 basic_test<USPSC, false>();
73 basic_test<UMPSC, false>();
74 basic_test<USPMC, false>();
75 basic_test<UMPMC, false>();
76 basic_test<USPSC, true>();
77 basic_test<UMPSC, true>();
78 basic_test<USPMC, true>();
79 basic_test<UMPMC, true>();
82 template <
template <
typename,
bool>
class Q,
bool MayBlock>
88 ASSERT_FALSE(q.try_dequeue_for(v, std::chrono::microseconds(1)));
95 TEST(UnboundedQueue, timeout) {
96 timeout_test<USPSC, false>();
97 timeout_test<UMPSC, false>();
98 timeout_test<USPMC, false>();
99 timeout_test<UMPMC, false>();
100 timeout_test<USPSC, true>();
101 timeout_test<UMPSC, true>();
102 timeout_test<USPMC, true>();
103 timeout_test<UMPMC, true>();
106 template <
template <
typename,
bool>
class Q,
bool MayBlock>
109 auto res = q.try_peek();
111 for (
int i = 0;
i < 1000; ++
i) {
114 for (
int i = 0;
i < 700; ++
i) {
124 peek_test<USPSC, false>();
125 peek_test<UMPSC, false>();
126 peek_test<USPSC, true>();
127 peek_test<UMPSC, true>();
130 TEST(UnboundedQueue, cleanup_on_destruction) {
133 explicit Foo(
int* p) : p_(p) {}
149 for (
int i = 0;
i < num; ++
i) {
157 template <
typename ProdFunc,
typename ConsFunc,
typename EndFunc>
161 const ProdFunc& prodFn,
162 const ConsFunc& consFn,
163 const EndFunc& endFn) {
164 std::atomic<bool>
start{
false};
165 std::atomic<int> ready{0};
168 std::vector<std::thread> prodThr(nprod);
169 for (
int tid = 0; tid < nprod; ++tid) {
170 prodThr[tid] = std::thread([&, tid] {
172 while (!
start.load()) {
180 std::vector<std::thread> consThr(ncons);
181 for (
int tid = 0; tid < ncons; ++tid) {
182 consThr[tid] = std::thread([&, tid] {
184 while (!
start.load()) {
192 while (ready.load() < (nprod + ncons)) {
201 for (
int i = 0;
i < nprod; ++
i) {
204 for (
int i = 0;
i < ncons; ++
i) {
211 return std::chrono::duration_cast<std::chrono::nanoseconds>(tend - tbegin)
215 template <
bool SingleProducer,
bool SingleConsumer,
bool MayBlock>
217 if (SingleProducer) {
220 if (SingleConsumer) {
226 std::atomic<uint64_t>
sum(0);
228 auto prod = [&](
int tid) {
229 for (
int i = tid;
i <
ops;
i += nprod) {
234 auto cons = [&](
int tid) {
236 for (
int i = tid;
i <
ops;
i += ncons) {
240 if (SingleConsumer) {
253 }
else if ((
i % 3) == 1) {
254 auto duration = std::chrono::milliseconds(1);
261 if (nprod == 1 && ncons == 1) {
264 if (SingleConsumer) {
269 sum.fetch_add(mysum);
278 run_once(nprod, ncons, prod, cons, endfn);
281 TEST(UnboundedQueue, enq_deq) {
283 enq_deq_test<true, true, false>(1, 1);
284 enq_deq_test<true, true, true>(1, 1);
286 enq_deq_test<false, true, false>(1, 1);
287 enq_deq_test<false, true, true>(1, 1);
288 enq_deq_test<false, true, false>(2, 1);
289 enq_deq_test<false, true, true>(2, 1);
290 enq_deq_test<false, true, false>(10, 1);
291 enq_deq_test<false, true, true>(10, 1);
293 enq_deq_test<true, false, false>(1, 1);
294 enq_deq_test<true, false, true>(1, 1);
295 enq_deq_test<true, false, false>(1, 2);
296 enq_deq_test<true, false, true>(1, 2);
297 enq_deq_test<true, false, false>(1, 10);
298 enq_deq_test<true, false, true>(1, 10);
300 enq_deq_test<false, false, false>(1, 1);
301 enq_deq_test<false, false, true>(1, 1);
302 enq_deq_test<false, false, false>(2, 1);
303 enq_deq_test<false, false, true>(2, 1);
304 enq_deq_test<false, false, false>(10, 1);
305 enq_deq_test<false, false, true>(10, 1);
306 enq_deq_test<false, false, false>(1, 2);
307 enq_deq_test<false, false, true>(1, 2);
308 enq_deq_test<false, false, false>(1, 10);
309 enq_deq_test<false, false, true>(1, 10);
310 enq_deq_test<false, false, false>(2, 2);
311 enq_deq_test<false, false, true>(2, 2);
312 enq_deq_test<false, false, false>(10, 10);
313 enq_deq_test<false, false, true>(10, 10);
316 template <
typename RepFunc>
318 int reps = FLAGS_reps;
324 for (
int r = 0; r < reps; ++r) {
330 const uint64_t minute = 60000000000UL;
331 if (sum > minute && r >= 2) {
341 std::cout <<
" " << std::setw(4) << max / ops <<
unit;
342 std::cout <<
" " << std::setw(4) << avg / ops <<
unit;
343 std::cout <<
" " << std::setw(4) << res / ops <<
unit;
344 std::cout << std::endl;
348 template <
template <
typename,
bool>
class Q,
typename T,
int Op>
352 Q<T, Op == 3 || Op == 4 || Op == 5> q;
353 std::atomic<uint64_t>
sum(0);
354 auto prod = [&](
int tid) {
355 for (
int i = tid;
i <
ops;
i += nprod) {
359 auto cons = [&](
int tid) {
361 for (
int i = tid;
i <
ops;
i += ncons) {
363 if (
Op == 0 ||
Op == 3) {
364 while (
UNLIKELY(!q.try_dequeue(v))) {
367 }
else if (
Op == 1 ||
Op == 4) {
368 auto duration = std::chrono::microseconds(1000);
369 while (
UNLIKELY(!q.try_dequeue_for(v, duration))) {
376 if (nprod == 1 && ncons == 1) {
377 DCHECK_EQ(
int(v),
i);
381 sum.fetch_add(mysum);
389 return run_once(nprod, ncons, prod, cons, endfn);
395 template <
typename T>
402 template <
typename...
Args>
404 q_.blockingWrite(std::forward<Args>(args)...);
408 q_.blockingRead(item);
412 return q_.read(item);
415 template <
typename Rep,
typename Period>
418 const std::chrono::duration<Rep, Period>& duration)
noexcept {
420 return q_.tryReadUntil(deadline, item);
424 template <
typename T,
bool ignore>
427 template <
typename T>
432 PCQ() : q_(FLAGS_capacity) {}
434 template <
typename...
Args>
436 while (!q_.
write(std::forward<Args>(args)...)) {
446 return q_.
read(item);
449 template <
typename Rep,
typename Period>
455 template <
typename T,
bool ignore>
463 for (
size_t i = 0;
i <
M; ++
i) {
473 std::cout <<
".............................................................." 477 template <
typename T>
480 <<
"===========================================" << std::endl;
481 if (np == 1 && nc == 1) {
482 bench<USPSC, T, 0>(1, 1,
"Unbounded SPSC try spin only ");
483 bench<USPSC, T, 1>(1, 1,
"Unbounded SPSC timed spin only ");
484 bench<USPSC, T, 2>(1, 1,
"Unbounded SPSC wait spin only ");
485 bench<USPSC, T, 3>(1, 1,
"Unbounded SPSC try may block ");
486 bench<USPSC, T, 4>(1, 1,
"Unbounded SPSC timed may block ");
487 bench<USPSC, T, 5>(1, 1,
"Unbounded SPSC wait may block ");
491 bench<UMPSC, T, 0>(np, 1,
"Unbounded MPSC try spin only ");
492 bench<UMPSC, T, 1>(np, 1,
"Unbounded MPSC timed spin only ");
493 bench<UMPSC, T, 2>(np, 1,
"Unbounded MPSC wait spin only ");
494 bench<UMPSC, T, 3>(np, 1,
"Unbounded MPSC try may block ");
495 bench<UMPSC, T, 4>(np, 1,
"Unbounded MPSC timed may block ");
496 bench<UMPSC, T, 5>(np, 1,
"Unbounded MPSC wait may block ");
500 bench<USPMC, T, 0>(1, nc,
"Unbounded SPMC try spin only ");
501 bench<USPMC, T, 1>(1, nc,
"Unbounded SPMC timed spin only ");
502 bench<USPMC, T, 2>(1, nc,
"Unbounded SPMC wait spin only ");
503 bench<USPMC, T, 3>(1, nc,
"Unbounded SPMC try may block ");
504 bench<USPMC, T, 4>(1, nc,
"Unbounded SPMC timed may block ");
505 bench<USPMC, T, 5>(1, nc,
"Unbounded SPMC wait may block ");
508 bench<UMPMC, T, 0>(np, nc,
"Unbounded MPMC try spin only ");
509 bench<UMPMC, T, 1>(np, nc,
"Unbounded MPMC timed spin only ");
510 bench<UMPMC, T, 2>(np, nc,
"Unbounded MPMC wait spin only ");
511 bench<UMPMC, T, 3>(np, nc,
"Unbounded MPMC try may block ");
512 bench<UMPMC, T, 4>(np, nc,
"Unbounded MPMC timed may block ");
513 bench<UMPMC, T, 5>(np, nc,
"Unbounded MPMC wait may block ");
515 if (np == 1 && nc == 1) {
516 bench<FPCQ, T, 0>(1, 1,
"folly::PCQ read ");
519 bench<FMPMC, T, 3>(np, nc,
"folly::MPMC read ");
520 bench<FMPMC, T, 4>(np, nc,
"folly::MPMC tryReadUntil ");
521 bench<FMPMC, T, 5>(np, nc,
"folly::MPMC blockingRead ");
522 std::cout <<
"==============================================================" 527 std::cout <<
"====================== " << std::setw(2) << np <<
" prod" 528 <<
" " << std::setw(2) << nc <<
" cons" 529 <<
" ======================" << std::endl;
530 type_benches<uint32_t>(np, nc,
"=== uint32_t ======");
539 std::cout <<
"==============================================================" 541 std::cout << std::setw(2) << FLAGS_reps <<
" reps of " << std::setw(8)
542 << FLAGS_ops <<
" handoffs\n";
544 std::cout <<
"$ numactl -N 1 $dir/unbounded_queue_test --bench\n";
546 std::cout <<
"Using capacity " << FLAGS_capacity
547 <<
" for folly::ProducerConsumerQueue and\n" 548 <<
"folly::MPMCQueue\n";
549 std::cout <<
"==============================================================" 551 std::cout <<
"Test name Max time Avg time Min time" 554 for (
int nc : {1, 2, 4, 8, 16, 32}) {
559 for (
int np : {1, 2, 4, 8, 16, 32}) {
564 for (
int np : {2, 4, 8, 16, 32}) {
565 for (
int nc : {2, 4, 8, 16, 32}) {
FOLLY_ALWAYS_INLINE const T * try_peek() noexcept
std::atomic< int64_t > sum(0)
#define ASSERT_EQ(val1, val2)
void type_benches(const int np, const int nc, const std::string &name)
void enqueue(Args &&...args)
#define EXPECT_EQ(val1, val2)
constexpr detail::Map< Move > move
std::chrono::steady_clock::time_point now()
void enq_deq_test(const int nprod, const int ncons)
internal::ArgsMatcher< InnerMatcher > Args(const InnerMatcher &matcher)
void enqueue(Args &&...args)
requires E e noexcept(noexcept(s.error(std::move(e))))
DEFINE_bool(bench, false,"run benchmark")
DEFINE_int32(reps, 10,"number of reps")
bool try_dequeue_for(T &item, const std::chrono::duration< Rep, Period > &duration) noexcept
void benches(const int np, const int nc)
uint64_t runBench(const std::string &name, int ops, const RepFunc &repFn)
FOLLY_ALWAYS_INLINE bool try_dequeue_for(T &item, const std::chrono::duration< Rep, Period > &duration) noexcept
uint64_t run_once(int nprod, int ncons, const ProdFunc &prodFn, const ConsFunc &consFn, const EndFunc &endFn)
**Optimized Holders **The template hazptr_array< M > provides most of the functionality *of M hazptr_holder s but with faster construction destruction *for M
T exchange(T &obj, U &&new_value)
bool write(Args &&...recordArgs)
bool try_dequeue(T &item)
bool try_dequeue_for(T &, const std::chrono::duration< Rep, Period > &) noexcept
#define ASSERT_FALSE(condition)
TEST(UnboundedQueue, basic)
bool try_dequeue(T &item)
FOLLY_ALWAYS_INLINE void dequeue(T &item) noexcept
uint64_t bench(const int nprod, const int ncons, const std::string &name)
FOLLY_ALWAYS_INLINE void enqueue(const T &arg)
DEFINE_int64(capacity, 256 *1024,"capacity")
#define ASSERT_TRUE(condition)
FOLLY_ALWAYS_INLINE bool try_dequeue(T &item) noexcept