22 #include <glog/logging.h> 32 template <
typename T,
bool MayBlock,
typename WeightFn>
35 template <
typename T,
bool MayBlock,
typename WeightFn>
38 template <
typename T,
bool MayBlock,
typename WeightFn>
41 template <
typename T,
bool MayBlock,
typename WeightFn>
44 template <
template <
typename,
bool,
typename>
class Q,
bool MayBlock>
46 auto dur = std::chrono::microseconds(100);
49 struct CustomWeightFn {
55 Q<int, MayBlock, CustomWeightFn> q(10000);
86 basic_test<DSPSC, false>();
87 basic_test<DMPSC, false>();
88 basic_test<DSPMC, false>();
89 basic_test<DMPMC, false>();
90 basic_test<DSPSC, true>();
91 basic_test<DMPSC, true>();
92 basic_test<DSPMC, true>();
93 basic_test<DMPMC, true>();
96 template <
template <
typename,
bool,
typename>
class Q,
bool MayBlock>
101 Foo(
const Foo&) =
delete;
102 Foo& operator=(
const Foo&) =
delete;
103 Foo(Foo&& other)
noexcept : v_(other.v_) {}
104 Foo& operator=(Foo&& other)
noexcept {
110 struct CustomWeightFn {
116 auto dur = std::chrono::microseconds(100);
119 Q<Foo, MayBlock, CustomWeightFn> q(100);
131 move_test<DSPSC, false>();
132 move_test<DMPSC, false>();
133 move_test<DSPMC, false>();
134 move_test<DMPMC, false>();
135 move_test<DSPSC, true>();
136 move_test<DMPSC, true>();
137 move_test<DSPMC, true>();
138 move_test<DMPMC, true>();
141 template <
template <
typename,
bool,
typename>
class Q,
bool MayBlock>
143 struct CustomWeightFn {
149 Q<int, MayBlock, CustomWeightFn> q(1000);
157 q.reset_capacity(2000);
161 q.reset_capacity(1000);
171 TEST(DynamicBoundedQueue, capacity) {
172 capacity_test<DSPSC, false>();
173 capacity_test<DMPSC, false>();
174 capacity_test<DSPMC, false>();
175 capacity_test<DMPMC, false>();
176 capacity_test<DSPSC, true>();
177 capacity_test<DMPSC, true>();
178 capacity_test<DSPMC, true>();
179 capacity_test<DMPMC, true>();
182 template <
typename ProdFunc,
typename ConsFunc,
typename EndFunc>
186 const ProdFunc& prodFn,
187 const ConsFunc& consFn,
188 const EndFunc& endFn) {
189 std::atomic<bool>
start{
false};
190 std::atomic<int> ready{0};
193 std::vector<std::thread> prodThr(nprod);
194 for (
int tid = 0; tid < nprod; ++tid) {
195 prodThr[tid] = std::thread([&, tid] {
197 while (!
start.load()) {
205 std::vector<std::thread> consThr(ncons);
206 for (
int tid = 0; tid < ncons; ++tid) {
207 consThr[tid] = std::thread([&, tid] {
209 while (!
start.load()) {
217 while (ready.load() < (nprod + ncons)) {
226 for (
int i = 0;
i < nprod; ++
i) {
229 for (
int i = 0;
i < ncons; ++
i) {
236 return std::chrono::duration_cast<std::chrono::nanoseconds>(tend - tbegin)
240 template <
bool SingleProducer,
bool SingleConsumer,
bool MayBlock>
242 if (SingleProducer) {
245 if (SingleConsumer) {
252 std::atomic<uint64_t>
sum(0);
254 auto prod = [&](
int tid) {
255 for (
int i = tid;
i <
ops;
i += nprod) {
260 }
else if ((
i % 3) == 1) {
261 auto dur = std::chrono::microseconds(100);
271 auto cons = [&](
int tid) {
273 for (
int i = tid;
i <
ops;
i += ncons) {
279 }
else if ((
i % 3) == 1) {
280 auto dur = std::chrono::microseconds(100);
287 if (nprod == 1 && ncons == 1) {
292 sum.fetch_add(mysum);
301 run_once(nprod, ncons, prod, cons, endfn);
304 TEST(DynamicBoundedQueue, enq_deq) {
306 enq_deq_test<true, true, false>(1, 1);
307 enq_deq_test<true, true, true>(1, 1);
309 enq_deq_test<false, true, false>(1, 1);
310 enq_deq_test<false, true, true>(1, 1);
311 enq_deq_test<false, true, false>(2, 1);
312 enq_deq_test<false, true, true>(2, 1);
313 enq_deq_test<false, true, false>(10, 1);
314 enq_deq_test<false, true, true>(10, 1);
316 enq_deq_test<true, false, false>(1, 1);
317 enq_deq_test<true, false, true>(1, 1);
318 enq_deq_test<true, false, false>(1, 2);
319 enq_deq_test<true, false, true>(1, 2);
320 enq_deq_test<true, false, false>(1, 10);
321 enq_deq_test<true, false, true>(1, 10);
323 enq_deq_test<false, false, false>(1, 1);
324 enq_deq_test<false, false, true>(1, 1);
325 enq_deq_test<false, false, false>(2, 1);
326 enq_deq_test<false, false, true>(2, 1);
327 enq_deq_test<false, false, false>(10, 1);
328 enq_deq_test<false, false, true>(10, 1);
329 enq_deq_test<false, false, false>(1, 2);
330 enq_deq_test<false, false, true>(1, 2);
331 enq_deq_test<false, false, false>(1, 10);
332 enq_deq_test<false, false, true>(1, 10);
333 enq_deq_test<false, false, false>(2, 2);
334 enq_deq_test<false, false, true>(2, 2);
335 enq_deq_test<false, false, false>(10, 10);
336 enq_deq_test<false, false, true>(10, 10);
339 template <
typename RepFunc>
341 int reps = FLAGS_reps;
347 for (
int r = 0; r < reps; ++r) {
353 const uint64_t minute = 60000000000ULL;
354 if (sum > minute && r >= 1) {
364 std::cout <<
" " << std::setw(4) << max / ops <<
unit;
365 std::cout <<
" " << std::setw(4) << avg / ops <<
unit;
366 std::cout <<
" " << std::setw(4) << res / ops <<
unit;
367 std::cout << std::endl;
371 template <
template <
typename,
bool,
typename>
class Q,
typename T,
int Op>
375 Q<T, Op == 3 || Op == 4 || Op == 5, folly::DefaultWeightFn<T>> q(
377 std::atomic<uint64_t>
sum(0);
378 auto prod = [&](
int tid) {
379 for (
int i = tid;
i <
ops;
i += nprod) {
380 if (
Op == 0 ||
Op == 3) {
381 while (!q.try_enqueue(
i)) {
384 }
else if (
Op == 1 ||
Op == 4) {
385 while (!q.try_enqueue_for(
i, std::chrono::microseconds(1000))) {
393 auto cons = [&](
int tid) {
396 for (
int i = tid;
i <
ops;
i += ncons) {
397 if (
Op == 0 ||
Op == 3) {
398 while (!q.try_dequeue(v)) {
401 }
else if (
Op == 1 ||
Op == 4) {
402 while (!q.try_dequeue_for(v, std::chrono::microseconds(1000))) {
408 if (nprod == 1 && ncons == 1) {
409 DCHECK_EQ(
int(v),
i);
413 sum.fetch_add(mysum);
421 return run_once(nprod, ncons, prod, cons, endfn);
427 template <
typename T>
450 template <
typename Rep,
typename Period>
453 const std::chrono::duration<Rep, Period>& duration) {
458 q_.blockingRead(item);
462 return q_.read(item);
465 template <
typename Rep,
typename Period>
468 const std::chrono::duration<Rep, Period>& duration) {
473 template <
typename T,
bool,
typename>
476 template <
typename T>
495 template <
typename Rep,
typename Period>
505 return q_.
read(item);
508 template <
typename Rep,
typename Period>
514 template <
typename T,
bool,
typename>
522 for (
size_t i = 0;
i <
M; ++
i) {
532 std::cout <<
".............................................................." 536 template <
typename T>
539 <<
"===========================================" << std::endl;
540 if (np == 1 && nc == 1) {
541 bench<DSPSC, T, 0>(1, 1,
"DSPSC try spin only ");
542 bench<DSPSC, T, 1>(1, 1,
"DSPSC timed spin only ");
543 bench<DSPSC, T, 2>(1, 1,
"DSPSC wait spin only ");
544 bench<DSPSC, T, 3>(1, 1,
"DSPSC try may block ");
545 bench<DSPSC, T, 4>(1, 1,
"DSPSC timed may block ");
546 bench<DSPSC, T, 5>(1, 1,
"DSPSC wait may block ");
550 bench<DMPSC, T, 0>(np, 1,
"DMPSC try spin only ");
551 bench<DMPSC, T, 1>(np, 1,
"DMPSC timed spin only ");
552 bench<DMPSC, T, 2>(np, 1,
"DMPSC wait spin only ");
553 bench<DMPSC, T, 3>(np, 1,
"DMPSC try may block ");
554 bench<DMPSC, T, 4>(np, 1,
"DMPSC timed may block ");
555 bench<DMPSC, T, 5>(np, 1,
"DMPSC wait may block ");
559 bench<DSPMC, T, 0>(1, nc,
"DSPMC try spin only ");
560 bench<DSPMC, T, 1>(1, nc,
"DSPMC timed spin only ");
561 bench<DSPMC, T, 2>(1, nc,
"DSPMC wait spin only ");
562 bench<DSPMC, T, 3>(1, nc,
"DSPMC try may block ");
563 bench<DSPMC, T, 4>(1, nc,
"DSPMC timed may block ");
564 bench<DSPMC, T, 5>(1, nc,
"DSPMC wait may block ");
567 bench<DMPMC, T, 0>(np, nc,
"DMPMC try spin only ");
568 bench<DMPMC, T, 1>(np, nc,
"DMPMC timed spin only ");
569 bench<DMPMC, T, 2>(np, nc,
"DMPMC wait spin only ");
570 bench<DMPMC, T, 3>(np, nc,
"DMPMC try may block ");
571 bench<DMPMC, T, 4>(np, nc,
"DMPMC timed may block ");
572 bench<DMPMC, T, 5>(np, nc,
"DMPMC wait may block ");
574 if (np == 1 && nc == 1) {
575 bench<FPCQ, T, 0>(1, 1,
"folly::PCQ read ");
578 bench<FMPMC, T, 3>(np, nc,
"folly::MPMC read ");
579 bench<FMPMC, T, 4>(np, nc,
"folly::MPMC tryReadUntil ");
580 bench<FMPMC, T, 5>(np, nc,
"folly::MPMC blockingRead ");
581 std::cout <<
"==============================================================" 586 std::cout <<
"====================== " << std::setw(2) << np <<
" prod" 587 <<
" " << std::setw(2) << nc <<
" cons" 588 <<
" ======================" << std::endl;
589 type_benches<uint32_t>(np, nc,
"=== uint32_t ======");
598 std::cout <<
"==============================================================" 600 std::cout << std::setw(2) << FLAGS_reps <<
" reps of " << std::setw(8)
601 << FLAGS_ops <<
" handoffs\n";
603 std::cout <<
"Using capacity " << FLAGS_capacity <<
" for all queues\n";
604 std::cout <<
"==============================================================" 606 std::cout <<
"Test name Max time Avg time Min time" 609 for (
int np : {1, 8, 32}) {
610 for (
int nc : {1, 8, 32}) {
bool try_enqueue_for(const T &, const std::chrono::duration< Rep, Period > &)
std::atomic< int64_t > sum(0)
#define ASSERT_EQ(val1, val2)
FOLLY_ALWAYS_INLINE void dequeue(T &elem)
Dequeue functions.
FOLLY_ALWAYS_INLINE void enqueue(const T &v)
Enqueue functions.
bool try_enqueue(const T &v)
folly::ProducerConsumerQueue< T > q_
uint64_t run_once(int nprod, int ncons, const ProdFunc &prodFn, const ConsFunc &consFn, const EndFunc &endFn)
constexpr detail::Map< Move > move
std::chrono::steady_clock::time_point now()
TEST(DynamicBoundedQueue, basic)
FOLLY_ALWAYS_INLINE bool try_dequeue(T &elem)
FOLLY_ALWAYS_INLINE bool try_enqueue_for(const T &v, const std::chrono::duration< Rep, Period > &duration)
void enq_deq_test(const int nprod, const int ncons)
requires E e noexcept(noexcept(s.error(std::move(e))))
DEFINE_bool(bench, false,"run benchmark")
uint64_t runBench(const std::string &name, int ops, const RepFunc &repFn)
DEFINE_int32(reps, 10,"number of reps")
DEFINE_int64(capacity, 1000000,"capacity")
bool try_dequeue_for(T &, const std::chrono::duration< Rep, Period > &)
bool try_enqueue_for(const T &v, const std::chrono::duration< Rep, Period > &duration)
void type_benches(const int np, const int nc, const std::string &name)
**Optimized Holders **The template hazptr_array< M > provides most of the functionality *of M hazptr_holder s but with faster construction destruction *for M
bool try_dequeue_for(T &item, const std::chrono::duration< Rep, Period > &duration)
bool try_enqueue(const T &&v)
void benches(const int np, const int nc)
bool write(Args &&...recordArgs)
bool try_dequeue(T &item)
bool try_enqueue(const T &v)
#define ASSERT_FALSE(condition)
bool try_dequeue(T &item)
FOLLY_ALWAYS_INLINE bool try_enqueue(const T &v)
#define ASSERT_TRUE(condition)
FOLLY_ALWAYS_INLINE bool try_dequeue_for(T &elem, const std::chrono::duration< Rep, Period > &duration)
uint64_t bench(const int nprod, const int ncons, const std::string &name)