proxygen
RelaxedConcurrentPriorityQueueTest.cpp File Reference
#include <thread>
#include <boost/thread.hpp>
#include <folly/Random.h>
#include <folly/SpinLock.h>
#include <folly/experimental/FlatCombiningPriorityQueue.h>
#include <folly/experimental/RelaxedConcurrentPriorityQueue.h>
#include <folly/portability/GFlags.h>
#include <folly/portability/GTest.h>
#include <folly/test/DeterministicSchedule.h>
#include <glog/logging.h>

Go to the source code of this file.

Classes

class  Queue< E >
 
class  GlobalLockPQ< T >
 

Typedefs

using DSched = folly::test::DeterministicSchedule
 
using FCPQ = folly::FlatCombiningPriorityQueue< int >
 

Functions

 DEFINE_bool (bench, false,"run benchmark")
 
 DEFINE_int32 (reps, 1,"number of reps")
 
 DEFINE_int64 (ops, 32,"number of operations per rep")
 
 DEFINE_int64 (elems, 64,"number of elements")
 
template<class PriorityQueue >
void basicOpsTest ()
 
 TEST (CPQ, BasicOpsTest)
 
template<typename Func >
static uint64_t run_once (const Func &fn)
 execute the function for nthreads More...
 
template<class PriorityQueue >
void singleThreadTest ()
 
 TEST (CPQ, SingleThrStrictImplTest)
 
 TEST (CPQ, SingleThrRelaxedImplTest)
 
template<class PriorityQueue >
void concurrentPopforSharedBuffer ()
 
 TEST (CPQ, ConcurrentPopStrictImplTest)
 
 TEST (CPQ, ConcurrentPopRelaxedImplTest)
 
template<class PriorityQueue >
void concurrentPush ()
 
 TEST (CPQ, ConcurrentPushStrictImplTest)
 
 TEST (CPQ, ConcurrentPushRelaxedImplTest)
 
template<class PriorityQueue >
void concurrentOps (int ops)
 
template<class PriorityQueue >
void concurrentSizeTest (int ops)
 
 TEST (CPQ, ConcurrentMixedStrictImplTest)
 
 TEST (CPQ, ConcurrentMixedRelaxedImplTest)
 
 TEST (CPQ, StrictImplSizeTest)
 
 TEST (CPQ, RelaxedImplSizeTest)
 
template<class PriorityQueue >
void multiPusherPopper (int PushThr, int PopThr)
 
 TEST (CPQ, PusherPopperBlockingTest)
 
 TEST (CPQ, PusherPopperSpinningTest)
 
template<class PriorityQueue >
void blockingFirst ()
 
template<class PriorityQueue >
void concurrentBlocking ()
 
 TEST (CPQ, PopBlockingTest)
 
 TEST (CPQ, MixedBlockingTest)
 
template<class PriorityQueue , template< typename > class Atom = std::atomic>
static void DSchedMixedTest ()
 
 TEST (CPQ, DSchedMixedStrictTest)
 
 TEST (CPQ, DSchedMixedRelaxedTest)
 
template<class PriorityQueue >
static uint64_t producer_consumer_test (std::string name, uint32_t PushThr, uint32_t PopThr, uint64_t initial_size)
 
template<class PriorityQueue >
static uint64_t throughtput_test (std::string name, uint64_t initial_size)
 
template<class PriorityQueue >
static void accuracy_test (std::string name, uint64_t initial_size, uint32_t top_percent)
 
 TEST (CPQ, ThroughtputBench)
 
 TEST (CPQ, ProducerConsumerBench)
 
 TEST (CPQ, Accuracy)
 

Variables

static std::vector< int > nthr = {1, 2, 4, 8}
 
static uint32_t nthreads
 
static std::vector< int > sizes = {0, 1024}
 

Typedef Documentation

using FCPQ = folly::FlatCombiningPriorityQueue<int>

Definition at line 1052 of file RelaxedConcurrentPriorityQueueTest.cpp.

Function Documentation

template<class PriorityQueue >
static void accuracy_test ( std::string  name,
uint64_t  initial_size,
uint32_t  top_percent 
)
static

Definition at line 1004 of file RelaxedConcurrentPriorityQueueTest.cpp.

References counter, folly::pushmi::operators::filter, i, folly::Random::rand32(), rng, stop(), uint64_t, and val.

1004  {
1005  int avg = 0;
1006  int reps = 15;
1007  int valid = initial_size / top_percent;
1008  if (valid < 1) {
1009  return;
1010  }
1011  int target = initial_size - valid;
1012  for (int r = 0; r < reps; ++r) {
1013  PriorityQueue pq;
1014  std::unordered_set<int> filter;
1016  rng.seed(initial_size + r);
1017 
1018  // initialize the queue according to initial_size
1019  // eliminate repeated priorities
1020  for (uint64_t i = 0; i < initial_size; i++) {
1021  int val;
1022  do {
1023  val = folly::Random::rand32(rng) % initial_size;
1024  } while (filter.find(val) != filter.end());
1025  filter.insert(val);
1026  pq.push(val);
1027  }
1028 
1029  int counter = 0;
1030  int stop = valid;
1031  for (uint64_t i = 0; i < initial_size; i++) {
1032  int val;
1033  pq.pop(val);
1034  if (val >= target) {
1035  stop--;
1036  }
1037  if (stop > 0 && val < target) {
1038  counter++;
1039  }
1040  if (stop == 0) {
1041  break;
1042  }
1043  }
1044  avg += counter;
1045  }
1046  avg /= reps;
1047  std::cout << std::setw(16) << name << " ";
1048  std::cout << "Lower priority popped: " << avg;
1049  std::cout << std::endl;
1050 }
PUSHMI_INLINE_VAR constexpr detail::filter_fn filter
Definition: filter.h:75
double val
Definition: String.cpp:273
auto rng
Definition: CollectTest.cpp:31
const char * name
Definition: http_parser.c:437
static void stop()
std::mt19937 DefaultGenerator
Definition: Random.h:97
std::atomic< int > counter
static uint32_t rand32()
Definition: Random.h:213
template<class PriorityQueue >
void basicOpsTest ( )

Definition at line 40 of file RelaxedConcurrentPriorityQueueTest.cpp.

References EXPECT_EQ, EXPECT_FALSE, and EXPECT_TRUE.

40  {
41  int res;
42  PriorityQueue pq;
43 
44  EXPECT_TRUE(pq.empty());
45  EXPECT_EQ(pq.size(), 0);
46  pq.push(1);
47  pq.push(2);
48  pq.push(3);
49 
50  EXPECT_FALSE(pq.empty());
51  EXPECT_EQ(pq.size(), 3);
52  pq.pop(res);
53  EXPECT_EQ(res, 3);
54  pq.pop(res);
55  EXPECT_EQ(res, 2);
56  pq.pop(res);
57  EXPECT_EQ(res, 1);
58  EXPECT_TRUE(pq.empty());
59  EXPECT_EQ(pq.size(), 0);
60 
61  pq.push(3);
62  pq.push(2);
63  pq.push(1);
64 
65  pq.pop(res);
66  EXPECT_EQ(res, 3);
67  pq.pop(res);
68  EXPECT_EQ(res, 2);
69  pq.pop(res);
70  EXPECT_EQ(res, 1);
71  EXPECT_TRUE(pq.empty());
72  EXPECT_EQ(pq.size(), 0);
73 }
#define EXPECT_EQ(val1, val2)
Definition: gtest.h:1922
#define EXPECT_TRUE(condition)
Definition: gtest.h:1859
#define EXPECT_FALSE(condition)
Definition: gtest.h:1862
template<class PriorityQueue >
void blockingFirst ( )

Definition at line 523 of file RelaxedConcurrentPriorityQueueTest.cpp.

References b, c, EXPECT_EQ, folly::pushmi::detail::t, and val.

523  {
524  PriorityQueue pq;
525  int nPop = 16;
526 
527  boost::barrier b{static_cast<unsigned int>(nPop + 1)};
528  std::atomic<int> finished{0};
529 
530  std::vector<std::thread> threads_pop(nPop);
531  for (int tid = 0; tid < nPop; ++tid) {
532  threads_pop[tid] = std::thread([&] {
533  int val;
534  b.wait();
535  pq.pop(val);
536  finished.fetch_add(1, std::memory_order_acq_rel);
537  });
538  }
539 
540  b.wait();
541  int c = 0;
542  // push to Mound one by one
543  // the popping threads should wake up one by one
544  do {
545  pq.push(1);
546  c++;
547  while (finished.load(std::memory_order_acquire) != c)
548  ;
549  EXPECT_EQ(finished.load(std::memory_order_acquire), c);
550  } while (c < nPop);
551 
552  for (auto& t : threads_pop) {
553  t.join();
554  }
555 }
char b
#define EXPECT_EQ(val1, val2)
Definition: gtest.h:1922
double val
Definition: String.cpp:273
char c
template<class PriorityQueue >
void concurrentBlocking ( )

Definition at line 558 of file RelaxedConcurrentPriorityQueueTest.cpp.

References b, EXPECT_EQ, folly::pushmi::detail::t, uint32_t, and val.

Referenced by TEST().

558  {
559  uint32_t nThrs = 16;
560 
561  for (int iter = 0; iter < FLAGS_reps * 10; iter++) {
562  PriorityQueue pq;
563  boost::barrier b{static_cast<unsigned int>(nThrs + nThrs + 1)};
564  std::atomic<uint32_t> finished{0};
565  std::vector<std::thread> threads_pop(nThrs);
566  for (uint32_t tid = 0; tid < nThrs; ++tid) {
567  threads_pop[tid] = std::thread([&] {
568  b.wait();
569  int val;
570  pq.pop(val);
571  finished.fetch_add(1, std::memory_order_acq_rel);
572  });
573  }
574 
575  std::vector<std::thread> threads_push(nThrs);
576  for (uint32_t tid = 0; tid < nThrs; ++tid) {
577  threads_push[tid] = std::thread([&, tid] {
578  b.wait();
579  pq.push(tid);
580  while (finished.load(std::memory_order_acquire) != nThrs)
581  ;
582  EXPECT_EQ(finished.load(std::memory_order_acquire), nThrs);
583  });
584  }
585 
586  b.wait();
587  for (auto& t : threads_pop) {
588  t.join();
589  }
590  for (auto& t : threads_push) {
591  t.join();
592  }
593  }
594 }
char b
#define EXPECT_EQ(val1, val2)
Definition: gtest.h:1922
double val
Definition: String.cpp:273
template<class PriorityQueue >
void concurrentOps ( int  ops)

initialize the queue

operations

clean up work

Definition at line 291 of file RelaxedConcurrentPriorityQueueTest.cpp.

References i, nthreads, ops, folly::Random::rand32(), rng, run_once(), folly::pushmi::detail::t, uint32_t, uint64_t, and val.

291  {
292  for (int t : nthr) {
293  PriorityQueue pq;
294  std::atomic<uint64_t> counter_push(0);
295  std::atomic<uint64_t> counter_pop(0);
296  nthreads = t;
297 
298  boost::barrier rb0{nthreads};
299  boost::barrier rb1{nthreads};
300  boost::barrier rb2{nthreads};
301  std::atomic<int> to_end(0);
302 
303  auto fn = [&](uint32_t tid) {
305  rng.seed(tid);
306  uint64_t local_push = 0;
307  uint64_t local_pop = 0;
308  int res;
309 
311  for (int i = tid; i < FLAGS_elems; i += nthreads) {
312  int val = folly::Random::rand32(rng) % FLAGS_elems + 1;
313  local_push++;
314  pq.push(val);
315  }
316  rb0.wait();
317 
319  for (int i = 0; i < ops; i++) {
320  if (ops % 2 == 0) {
321  int val = folly::Random::rand32(rng) % FLAGS_elems + 1;
322  local_push++;
323  pq.push(val);
324  } else {
325  pq.pop(res);
326  local_pop++;
327  }
328  }
329  rb1.wait();
330  // collecting the ops info for checking purpose
331  counter_push.fetch_add(local_push, std::memory_order_seq_cst);
332  counter_pop.fetch_add(local_pop, std::memory_order_seq_cst);
333  rb2.wait();
335  uint64_t r = counter_push.load(std::memory_order_seq_cst) -
336  counter_pop.load(std::memory_order_seq_cst);
337  while (true) {
338  uint64_t index = to_end.fetch_add(1, std::memory_order_acq_rel);
339  if (index < r) {
340  pq.pop(res);
341  } else {
342  break;
343  }
344  }
345  };
346  run_once(fn);
347  // the total push and pop ops should be the same
348  }
349 }
static uint64_t run_once(const Func &fn)
execute the function for nthreads
double val
Definition: String.cpp:273
auto rng
Definition: CollectTest.cpp:31
const int ops
static uint32_t nthreads
std::mt19937 DefaultGenerator
Definition: Random.h:97
static uint32_t rand32()
Definition: Random.h:213
static std::vector< int > nthr
template<class PriorityQueue >
void concurrentPopforSharedBuffer ( )

concurrent pop should made the queue empty with executing the eaqual elements pop function

Definition at line 171 of file RelaxedConcurrentPriorityQueueTest.cpp.

References EXPECT_EQ, i, nthreads, folly::Random::rand32(), rng, run_once(), folly::pushmi::detail::t, uint32_t, uint64_t, and val.

Referenced by TEST().

171  {
172  for (int t : nthr) {
173  PriorityQueue pq;
174 
176  rng.seed(FLAGS_elems);
177  uint64_t check_sum = 0;
178  // random push
179  for (int i = 0; i < FLAGS_elems; i++) {
180  int val = folly::Random::rand32(rng) % FLAGS_elems + 1;
181  pq.push(val);
182  check_sum += val;
183  }
184 
185  std::atomic<uint64_t> pop_sum(0);
186  std::atomic<int> to_end(0);
187 
188  nthreads = t;
189  auto fn = [&](uint32_t tid) {
190  int val = tid;
191  while (true) {
192  int index = to_end.fetch_add(1, std::memory_order_acq_rel);
193  if (index < FLAGS_elems) {
194  pq.pop(val);
195  } else {
196  break;
197  }
198  pop_sum.fetch_add(val, std::memory_order_acq_rel);
199  }
200  };
201  run_once(fn);
202  // check the sum of returned values of successful pop
203  EXPECT_EQ(pop_sum, check_sum);
204  }
205 }
static uint64_t run_once(const Func &fn)
execute the function for nthreads
#define EXPECT_EQ(val1, val2)
Definition: gtest.h:1922
double val
Definition: String.cpp:273
auto rng
Definition: CollectTest.cpp:31
static uint32_t nthreads
std::mt19937 DefaultGenerator
Definition: Random.h:97
static uint32_t rand32()
Definition: Random.h:213
static std::vector< int > nthr
template<class PriorityQueue >
void concurrentPush ( )

executing fixed number of push, counting the element number & total value.

Definition at line 246 of file RelaxedConcurrentPriorityQueueTest.cpp.

References EXPECT_EQ, i, nthreads, folly::Random::rand32(), rng, run_once(), folly::pushmi::detail::t, uint32_t, uint64_t, and val.

246  {
247  for (int t : nthr) {
248  PriorityQueue pq;
249  std::atomic<uint64_t> counter_sum(0);
250  nthreads = t;
251 
252  auto fn = [&](uint32_t tid) {
254  rng.seed(tid);
255  uint64_t local_sum = 0;
256  for (int i = tid; i < FLAGS_elems; i += nthreads) {
257  int val = folly::Random::rand32(rng) % FLAGS_elems + 1;
258  local_sum += val;
259  pq.push(val);
260  }
261  counter_sum.fetch_add(local_sum, std::memory_order_acq_rel);
262  };
263  run_once(fn);
264 
265  // check the total number of elements
266  uint64_t actual_sum = 0;
267  for (int i = 0; i < FLAGS_elems; i++) {
268  int res = 0;
269  pq.pop(res);
270  actual_sum += res;
271  }
272  EXPECT_EQ(actual_sum, counter_sum);
273  }
274 }
static uint64_t run_once(const Func &fn)
execute the function for nthreads
#define EXPECT_EQ(val1, val2)
Definition: gtest.h:1922
double val
Definition: String.cpp:273
auto rng
Definition: CollectTest.cpp:31
static uint32_t nthreads
std::mt19937 DefaultGenerator
Definition: Random.h:97
static uint32_t rand32()
Definition: Random.h:213
static std::vector< int > nthr
template<class PriorityQueue >
void concurrentSizeTest ( int  ops)

initialize the queue

operations

Definition at line 352 of file RelaxedConcurrentPriorityQueueTest.cpp.

References EXPECT_EQ, EXPECT_TRUE, i, nthreads, ops, folly::Random::rand32(), rng, run_once(), folly::pushmi::detail::t, uint32_t, uint64_t, and val.

352  {
353  for (int t : nthr) {
354  PriorityQueue pq;
355  nthreads = t;
356  EXPECT_TRUE(pq.empty());
357  auto fn = [&](uint32_t tid) {
359  rng.seed(tid);
360  uint64_t local_push = 0;
361  int res;
362 
364  for (int i = tid; i < FLAGS_elems; i += nthreads) {
365  int val = folly::Random::rand32(rng) % FLAGS_elems + 1;
366  local_push++;
367  pq.push(val);
368  }
369 
371  for (int i = 0; i < ops; i++) {
372  int val = folly::Random::rand32(rng) % FLAGS_elems + 1;
373  pq.push(val);
374  pq.pop(res);
375  }
376  };
377  run_once(fn);
378  // the total push and pop ops should be the same
379  EXPECT_EQ(pq.size(), FLAGS_elems);
380  }
381 }
static uint64_t run_once(const Func &fn)
execute the function for nthreads
#define EXPECT_EQ(val1, val2)
Definition: gtest.h:1922
double val
Definition: String.cpp:273
auto rng
Definition: CollectTest.cpp:31
const int ops
static uint32_t nthreads
std::mt19937 DefaultGenerator
Definition: Random.h:97
#define EXPECT_TRUE(condition)
Definition: gtest.h:1859
static uint32_t rand32()
Definition: Random.h:213
static std::vector< int > nthr
DEFINE_bool ( bench  ,
false  ,
"run benchmark  
)
DEFINE_int32 ( reps  ,
,
"number of reps"   
)
DEFINE_int64 ( ops  ,
32  ,
"number of operations per rep"   
)
DEFINE_int64 ( elems  ,
64  ,
"number of elements"   
)
template<class PriorityQueue , template< typename > class Atom = std::atomic>
static void DSchedMixedTest ( )
static

Definition at line 630 of file RelaxedConcurrentPriorityQueueTest.cpp.

References EXPECT_EQ, i, folly::test::DeterministicSchedule::join(), folly::Random::rand32(), rng, folly::pushmi::detail::t, folly::test::DeterministicSchedule::thread(), threads, uint64_t, and val.

Referenced by TEST().

630  {
631  for (int r = 0; r < FLAGS_reps; r++) {
632  // the thread number is 32
633  int thr = 8;
634  PriorityQueue pq;
636  rng.seed(thr);
637  uint64_t pre_sum = 0;
638  uint64_t pre_size = 0;
639  for (int i = 0; i < FLAGS_elems; i++) {
640  int val = folly::Random::rand32(rng) % FLAGS_elems + 1;
641  pq.push(val);
642  pre_sum += val;
643  }
644  pre_size = FLAGS_elems;
645  Atom<uint64_t> atom_push_sum{0};
646  Atom<uint64_t> atom_pop_sum{0};
647  std::vector<std::thread> threads(thr);
648  for (int tid = 0; tid < thr; ++tid) {
649  threads[tid] = DSched::thread([&]() {
651  tl_rng.seed(thr);
652  uint64_t pop_sum = 0;
653  uint64_t push_sum = 0;
654  for (int i = 0; i < FLAGS_ops; i++) {
655  int val = folly::Random::rand32(tl_rng) % FLAGS_elems + 1;
656  pq.push(val);
657  push_sum += val;
658  pq.pop(val);
659  pop_sum += val;
660  }
661  atom_push_sum.fetch_add(push_sum, std::memory_order_acq_rel);
662  atom_pop_sum.fetch_add(pop_sum, std::memory_order_acq_rel);
663  });
664  }
665 
666  for (auto& t : threads) {
667  DSched::join(t);
668  }
669 
670  // It checks the number of elements remain in Mound
671  while (pre_size > 0) {
672  pre_size--;
673  int val = -1;
674  pq.pop(val);
675  atom_pop_sum += val;
676  }
677  // Check the accumulation of popped and pushed priorities
678  EXPECT_EQ(atom_pop_sum, pre_sum + atom_push_sum);
679  }
680 }
#define EXPECT_EQ(val1, val2)
Definition: gtest.h:1922
double val
Definition: String.cpp:273
static std::thread thread(Func &&func, Args &&...args)
std::vector< std::thread::id > threads
auto rng
Definition: CollectTest.cpp:31
std::mt19937 DefaultGenerator
Definition: Random.h:97
static void join(std::thread &child)
static uint32_t rand32()
Definition: Random.h:213
template<class PriorityQueue >
void multiPusherPopper ( int  PushThr,
int  PopThr 
)

Definition at line 428 of file RelaxedConcurrentPriorityQueueTest.cpp.

References EXPECT_EQ, i, ops, folly::Random::rand32(), folly::pushmi::detail::t, uint32_t, and val.

Referenced by TEST().

428  {
429  int ops = FLAGS_ops;
430  uint32_t total_threads = PushThr + PopThr;
431 
432  PriorityQueue pq;
433  std::atomic<uint64_t> sum_push(0);
434  std::atomic<uint64_t> sum_pop(0);
435 
436  auto fn_popthr = [&](uint32_t tid) {
437  for (int i = tid; i < ops; i += PopThr) {
438  int val;
439  pq.pop(val);
440  sum_pop.fetch_add(val, std::memory_order_acq_rel);
441  }
442  };
443 
444  auto fn_pushthr = [&](uint32_t tid) {
446  rng_t.seed(tid);
447  for (int i = tid; i < ops; i += PushThr) {
448  int val = folly::Random::rand32(rng_t);
449  pq.push(val);
450  sum_push.fetch_add(val, std::memory_order_acq_rel);
451  }
452  };
453  boost::barrier barrier_start{total_threads + 1};
454 
455  std::vector<std::thread> threads_push(PushThr);
456  for (int tid = 0; tid < PushThr; ++tid) {
457  threads_push[tid] = std::thread([&, tid] {
458  barrier_start.wait();
459  fn_pushthr(tid);
460  });
461  }
462  std::vector<std::thread> threads_pop(PopThr);
463  for (int tid = 0; tid < PopThr; ++tid) {
464  threads_pop[tid] = std::thread([&, tid] {
465  barrier_start.wait();
466  fn_popthr(tid);
467  });
468  }
469 
470  barrier_start.wait(); // start the execution
471  // begin time measurement
472  for (auto& t : threads_push) {
473  t.join();
474  }
475  for (auto& t : threads_pop) {
476  t.join();
477  }
478  EXPECT_EQ(sum_pop, sum_push);
479 }
#define EXPECT_EQ(val1, val2)
Definition: gtest.h:1922
double val
Definition: String.cpp:273
const int ops
std::mt19937 DefaultGenerator
Definition: Random.h:97
static uint32_t rand32()
Definition: Random.h:213
template<class PriorityQueue >
static uint64_t producer_consumer_test ( std::string  name,
uint32_t  PushThr,
uint32_t  PopThr,
uint64_t  initial_size 
)
static

Definition at line 868 of file RelaxedConcurrentPriorityQueueTest.cpp.

References count, i, max, min, name, now(), ops, folly::Random::rand32(), rng, sum(), folly::pushmi::detail::t, uint32_t, uint64_t, and val.

Referenced by TEST().

872  {
873  int ops = 1 << 18;
874  int reps = 15;
875  if (name.find("RCPQ") != std::string::npos) {
876  ops <<= 3;
877  }
878  uint64_t min = UINTMAX_MAX;
879  uint64_t max = 0;
880  uint64_t sum = 0;
881  uint32_t total_threads = PushThr + PopThr;
882 
883  for (int r = 0; r < reps; ++r) {
884  uint64_t dur;
885  PriorityQueue pq;
886 
888  rng.seed(initial_size);
889  // initialize the queue according to initial_size
890  for (uint64_t i = 0; i < initial_size; i++) {
891  int val = folly::Random::rand32(rng) % ops;
892  pq.push(val);
893  }
894 
895  auto fn_popthr = [&](uint32_t tid) {
896  for (int i = tid; i < ops; i += PopThr) {
897  int val;
898  pq.pop(val);
899  }
900  };
901 
902  auto fn_pushthr = [&](uint32_t tid) {
904  rng_t.seed(tid);
905  for (int i = tid; i < ops; i += PushThr) {
906  int val = folly::Random::rand32(rng_t) % ops;
907  pq.push(val);
908  }
909  };
910  boost::barrier barrier_start{total_threads + 1};
911  std::vector<std::thread> threads_push(PushThr);
912  for (uint32_t tid = 0; tid < PushThr; ++tid) {
913  threads_push[tid] = std::thread([&, tid] {
914  barrier_start.wait();
915  fn_pushthr(tid);
916  });
917  }
918  std::vector<std::thread> threads_pop(PopThr);
919  for (uint32_t tid = 0; tid < PopThr; ++tid) {
920  threads_pop[tid] = std::thread([&, tid] {
921  barrier_start.wait();
922  fn_popthr(tid);
923  });
924  }
925  barrier_start.wait(); // start the execution
926  // begin time measurement
927  auto tbegin = std::chrono::steady_clock::now();
928  for (auto& t : threads_push) {
929  t.join();
930  }
931  for (auto& t : threads_pop) {
932  t.join();
933  }
934  // end time measurement
935  auto tend = std::chrono::steady_clock::now();
936  dur = std::chrono::duration_cast<std::chrono::nanoseconds>(tend - tbegin)
937  .count();
938  sum += dur;
939  min = std::min(min, dur);
940  max = std::max(max, dur);
941  }
942  uint64_t avg = sum / reps;
943  std::cout << std::setw(12) << name;
944  std::cout << " " << std::setw(8) << max / ops << " ns";
945  std::cout << " " << std::setw(8) << avg / ops << " ns";
946  std::cout << " " << std::setw(8) << min / ops << " ns";
947  std::cout << std::endl;
948  return min;
949 }
std::atomic< int64_t > sum(0)
LogLevel max
Definition: LogLevel.cpp:31
std::chrono::steady_clock::time_point now()
double val
Definition: String.cpp:273
auto rng
Definition: CollectTest.cpp:31
const char * name
Definition: http_parser.c:437
LogLevel min
Definition: LogLevel.cpp:30
const int ops
std::mt19937 DefaultGenerator
Definition: Random.h:97
int * count
static uint32_t rand32()
Definition: Random.h:213
template<typename Func >
static uint64_t run_once ( const Func &  fn)
static

execute the function for nthreads

Definition at line 94 of file RelaxedConcurrentPriorityQueueTest.cpp.

References count, now(), nthreads, folly::pushmi::detail::t, threads, uint32_t, and uint64_t.

Referenced by concurrentOps(), concurrentPopforSharedBuffer(), concurrentPush(), concurrentSizeTest(), and throughtput_test().

94  {
95  boost::barrier barrier_start{nthreads + 1};
96  std::vector<std::thread> threads(nthreads);
97  for (uint32_t tid = 0; tid < nthreads; ++tid) {
98  threads[tid] = std::thread([&, tid] {
99  barrier_start.wait();
100  fn(tid);
101  });
102  }
103 
104  barrier_start.wait(); // start the execution
105  auto tbegin = std::chrono::steady_clock::now();
106  for (auto& t : threads) {
107  t.join();
108  }
109 
110  // end time measurement
111  uint64_t duration = 0;
112  auto tend = std::chrono::steady_clock::now();
113  duration = std::chrono::duration_cast<std::chrono::nanoseconds>(tend - tbegin)
114  .count();
115  return duration;
116 }
std::chrono::steady_clock::time_point now()
std::vector< std::thread::id > threads
static uint32_t nthreads
int * count
template<class PriorityQueue >
void singleThreadTest ( )

Definition at line 119 of file RelaxedConcurrentPriorityQueueTest.cpp.

References counter, EXPECT_EQ, i, folly::Random::rand32(), rng, sum(), uint64_t, and val.

119  {
120  PriorityQueue pq;
121 
123  rng.seed(FLAGS_elems);
124  uint64_t expect_sum = 0;
125  // random push
126  for (int i = 0; i < FLAGS_elems; i++) {
127  int val = folly::Random::rand32(rng) % FLAGS_elems + 1;
128  expect_sum += val;
129  pq.push(val);
130  }
131 
132  int val = 0;
133  int counter = 0;
134  uint64_t sum = 0;
135  while (counter < FLAGS_elems) {
136  pq.pop(val);
137  sum += val;
138  counter++;
139  }
140 
141  EXPECT_EQ(sum, expect_sum);
142 }
std::atomic< int64_t > sum(0)
#define EXPECT_EQ(val1, val2)
Definition: gtest.h:1922
double val
Definition: String.cpp:273
auto rng
Definition: CollectTest.cpp:31
std::mt19937 DefaultGenerator
Definition: Random.h:97
std::atomic< int > counter
static uint32_t rand32()
Definition: Random.h:213
TEST ( CPQ  ,
BasicOpsTest   
)

Definition at line 75 of file RelaxedConcurrentPriorityQueueTest.cpp.

75  {
76  // Spinning
77  basicOpsTest<RelaxedConcurrentPriorityQueue<int, false, true>>();
78  // Strict
79  basicOpsTest<RelaxedConcurrentPriorityQueue<int, false, true, 0>>();
80  basicOpsTest<RelaxedConcurrentPriorityQueue<int, false, true, 0, 1>>();
81  basicOpsTest<RelaxedConcurrentPriorityQueue<int, false, true, 0, 3>>();
82  // Relaxed
83  basicOpsTest<RelaxedConcurrentPriorityQueue<int, false, true, 1, 1>>();
84  basicOpsTest<RelaxedConcurrentPriorityQueue<int, false, true, 2, 1>>();
85  basicOpsTest<RelaxedConcurrentPriorityQueue<int, false, true, 3, 3>>();
86  // Blocking
87  basicOpsTest<RelaxedConcurrentPriorityQueue<int, true, true>>();
88  basicOpsTest<RelaxedConcurrentPriorityQueue<int, true, true, 0>>();
89  basicOpsTest<RelaxedConcurrentPriorityQueue<int, true, true, 2>>();
90 }
TEST ( CPQ  ,
SingleThrStrictImplTest   
)

Definition at line 144 of file RelaxedConcurrentPriorityQueueTest.cpp.

144  {
145  // spinning
146  singleThreadTest<RelaxedConcurrentPriorityQueue<int, false, false, 0>>();
147  singleThreadTest<RelaxedConcurrentPriorityQueue<int, false, false, 0, 1>>();
148  singleThreadTest<RelaxedConcurrentPriorityQueue<int, false, false, 0, 8>>();
149  // blocking
150  singleThreadTest<RelaxedConcurrentPriorityQueue<int, true, false, 0>>();
151  singleThreadTest<RelaxedConcurrentPriorityQueue<int, true, false, 0, 1>>();
152  singleThreadTest<RelaxedConcurrentPriorityQueue<int, true, false, 0, 8>>();
153 }
TEST ( CPQ  ,
SingleThrRelaxedImplTest   
)

Definition at line 155 of file RelaxedConcurrentPriorityQueueTest.cpp.

155  {
156  // spinning
157  singleThreadTest<RelaxedConcurrentPriorityQueue<int, false, false, 1>>();
158  singleThreadTest<RelaxedConcurrentPriorityQueue<int, false, false, 8>>();
159  singleThreadTest<RelaxedConcurrentPriorityQueue<int, false, false, 8, 2>>();
160  singleThreadTest<RelaxedConcurrentPriorityQueue<int, false, false, 8, 8>>();
161  singleThreadTest<RelaxedConcurrentPriorityQueue<int, false, false, 2, 128>>();
162  singleThreadTest<RelaxedConcurrentPriorityQueue<int, false, false, 100, 8>>();
163  // blocking
164  singleThreadTest<RelaxedConcurrentPriorityQueue<int, true, false, 1>>();
165  singleThreadTest<RelaxedConcurrentPriorityQueue<int, true, false, 8>>();
166 }
TEST ( CPQ  ,
ConcurrentPushStrictImplTest   
)

Definition at line 276 of file RelaxedConcurrentPriorityQueueTest.cpp.

276  {
277  concurrentPush<RelaxedConcurrentPriorityQueue<int, false, false, 0>>();
278  concurrentPush<RelaxedConcurrentPriorityQueue<int, false, false, 0, 8>>();
279  concurrentPush<RelaxedConcurrentPriorityQueue<int, false, false, 0, 128>>();
280 }
TEST ( CPQ  ,
ConcurrentPushRelaxedImplTest   
)

Definition at line 282 of file RelaxedConcurrentPriorityQueueTest.cpp.

282  {
283  concurrentPush<RelaxedConcurrentPriorityQueue<int, false, false>>();
284  concurrentPush<RelaxedConcurrentPriorityQueue<int, false, false, 1, 8>>();
285  concurrentPush<RelaxedConcurrentPriorityQueue<int, false, false, 2, 128>>();
286  concurrentPush<RelaxedConcurrentPriorityQueue<int, false, false, 128, 8>>();
287  concurrentPush<RelaxedConcurrentPriorityQueue<int, false, false, 8, 8>>();
288 }
TEST ( CPQ  ,
ConcurrentMixedStrictImplTest   
)

Definition at line 385 of file RelaxedConcurrentPriorityQueueTest.cpp.

References folly::size().

385  {
386  for (auto size : sizes) {
387  concurrentOps<RelaxedConcurrentPriorityQueue<int, false, false, 0>>(size);
388  concurrentOps<RelaxedConcurrentPriorityQueue<int, false, false, 0, 1>>(
389  size);
390  concurrentOps<RelaxedConcurrentPriorityQueue<int, false, false, 0, 32>>(
391  size);
392  }
393 }
constexpr auto size(C const &c) -> decltype(c.size())
Definition: Access.h:45
static std::vector< int > sizes
TEST ( CPQ  ,
ConcurrentMixedRelaxedImplTest   
)

Definition at line 395 of file RelaxedConcurrentPriorityQueueTest.cpp.

References folly::size().

395  {
396  for (auto size : sizes) {
397  concurrentOps<RelaxedConcurrentPriorityQueue<int, false, false>>(size);
398  concurrentOps<RelaxedConcurrentPriorityQueue<int, false, false, 1, 32>>(
399  size);
400  concurrentOps<RelaxedConcurrentPriorityQueue<int, false, false, 32, 1>>(
401  size);
402  concurrentOps<RelaxedConcurrentPriorityQueue<int, false, false, 8, 8>>(
403  size);
404  concurrentOps<RelaxedConcurrentPriorityQueue<int, true, false, 8, 8>>(size);
405  }
406 }
constexpr auto size(C const &c) -> decltype(c.size())
Definition: Access.h:45
static std::vector< int > sizes
TEST ( CPQ  ,
StrictImplSizeTest   
)

Definition at line 408 of file RelaxedConcurrentPriorityQueueTest.cpp.

References folly::size().

408  {
409  for (auto size : sizes) {
410  concurrentSizeTest<RelaxedConcurrentPriorityQueue<int, false, true, 0>>(
411  size);
412  concurrentSizeTest<RelaxedConcurrentPriorityQueue<int, true, true, 0>>(
413  size);
414  }
415 }
constexpr auto size(C const &c) -> decltype(c.size())
Definition: Access.h:45
static std::vector< int > sizes
TEST ( CPQ  ,
RelaxedImplSizeTest   
)

Definition at line 417 of file RelaxedConcurrentPriorityQueueTest.cpp.

References folly::size().

417  {
418  for (auto size : sizes) {
419  concurrentSizeTest<RelaxedConcurrentPriorityQueue<int, false, true>>(size);
420  concurrentSizeTest<RelaxedConcurrentPriorityQueue<int, true, true, 2, 8>>(
421  size);
422  concurrentSizeTest<RelaxedConcurrentPriorityQueue<int, false, true, 8, 2>>(
423  size);
424  }
425 }
constexpr auto size(C const &c) -> decltype(c.size())
Definition: Access.h:45
static std::vector< int > sizes
TEST ( CPQ  ,
PusherPopperBlockingTest   
)

Definition at line 481 of file RelaxedConcurrentPriorityQueueTest.cpp.

References i, and multiPusherPopper().

481  {
482  for (auto i : nthr) {
483  for (auto j : nthr) {
484  // Original
485  multiPusherPopper<RelaxedConcurrentPriorityQueue<int, true, false, 0, 1>>(
486  i, j);
487  // Relaxed
488  multiPusherPopper<RelaxedConcurrentPriorityQueue<int, true, false, 1, 8>>(
489  i, j);
494  multiPusherPopper<RelaxedConcurrentPriorityQueue<int, true, false>>(i, j);
497  }
498  }
499 }
void multiPusherPopper(int PushThr, int PopThr)
static std::vector< int > nthr
TEST ( CPQ  ,
PusherPopperSpinningTest   
)

Definition at line 501 of file RelaxedConcurrentPriorityQueueTest.cpp.

References i, and multiPusherPopper().

501  {
502  for (auto i : nthr) {
503  for (auto j : nthr) {
504  // Original
507  // Relaxed
514  multiPusherPopper<RelaxedConcurrentPriorityQueue<int, false, false>>(
515  i, j);
518  }
519  }
520 }
void multiPusherPopper(int PushThr, int PopThr)
static std::vector< int > nthr
TEST ( CPQ  ,
PopBlockingTest   
)

Definition at line 596 of file RelaxedConcurrentPriorityQueueTest.cpp.

596  {
597  // strict
598  blockingFirst<RelaxedConcurrentPriorityQueue<int, true, false, 0, 1>>();
599  blockingFirst<RelaxedConcurrentPriorityQueue<int, true, false, 0, 16>>();
600  // relaxed
601  blockingFirst<RelaxedConcurrentPriorityQueue<int, true, false>>();
602  blockingFirst<RelaxedConcurrentPriorityQueue<int, true, false, 8, 8>>();
603  blockingFirst<RelaxedConcurrentPriorityQueue<int, true, false, 16, 1>>();
604  // Spinning
605  blockingFirst<RelaxedConcurrentPriorityQueue<int, false, false>>();
606  blockingFirst<RelaxedConcurrentPriorityQueue<int, false, false, 8, 8>>();
607  blockingFirst<RelaxedConcurrentPriorityQueue<int, false, false, 16, 1>>();
608 }
TEST ( CPQ  ,
MixedBlockingTest   
)

Definition at line 610 of file RelaxedConcurrentPriorityQueueTest.cpp.

References concurrentBlocking().

610  {
611  // strict
612  concurrentBlocking<RelaxedConcurrentPriorityQueue<int, true, false, 0, 1>>();
613  concurrentBlocking<RelaxedConcurrentPriorityQueue<int, true, false, 0, 16>>();
614  // relaxed
615  concurrentBlocking<RelaxedConcurrentPriorityQueue<int, true, false>>();
616  concurrentBlocking<RelaxedConcurrentPriorityQueue<int, true, false, 8, 8>>();
617  concurrentBlocking<RelaxedConcurrentPriorityQueue<int, true, false, 16, 1>>();
618  // Spinning
619  concurrentBlocking<RelaxedConcurrentPriorityQueue<int, false, false>>();
620  concurrentBlocking<RelaxedConcurrentPriorityQueue<int, false, false, 8, 8>>();
623 }
TEST ( CPQ  ,
DSchedMixedStrictTest   
)

Definition at line 682 of file RelaxedConcurrentPriorityQueueTest.cpp.

References DSchedMixedTest(), and folly::test::DeterministicSchedule::uniform().

682  {
683  DSched sched(DSched::uniform(0));
686  int,
687  false,
688  false,
689  0,
690  25,
693  DeterministicAtomic>();
696  int,
697  true,
698  false,
699  0,
700  25,
701  DeterministicMutex,
702  DeterministicAtomic>,
703  DeterministicAtomic>();
706  int,
707  false,
708  false,
709  0,
710  1,
711  DeterministicMutex,
712  DeterministicAtomic>,
713  DeterministicAtomic>();
716  int,
717  true,
718  false,
719  0,
720  1,
721  DeterministicMutex,
722  DeterministicAtomic>,
723  DeterministicAtomic>();
726  int,
727  false,
728  false,
729  0,
730  128,
731  DeterministicMutex,
732  DeterministicAtomic>,
733  DeterministicAtomic>();
736  int,
737  true,
738  false,
739  0,
740  128,
741  DeterministicMutex,
742  DeterministicAtomic>,
743  DeterministicAtomic>();
744 }
DeterministicAtomicImpl< T, DeterministicSchedule > DeterministicAtomic
static std::function< size_t(size_t)> uniform(uint64_t seed)
static void DSchedMixedTest()
TEST ( CPQ  ,
DSchedMixedRelaxedTest   
)

Definition at line 746 of file RelaxedConcurrentPriorityQueueTest.cpp.

References DSchedMixedTest(), and folly::test::DeterministicSchedule::uniform().

746  {
747  DSched sched(DSched::uniform(0));
750  int,
751  false,
752  false,
753  16,
754  25,
757  DeterministicAtomic>();
760  int,
761  true,
762  false,
763  16,
764  25,
765  DeterministicMutex,
766  DeterministicAtomic>,
767  DeterministicAtomic>();
770  int,
771  false,
772  false,
773  1,
774  16,
775  DeterministicMutex,
776  DeterministicAtomic>,
777  DeterministicAtomic>();
780  int,
781  true,
782  false,
783  1,
784  16,
785  DeterministicMutex,
786  DeterministicAtomic>,
787  DeterministicAtomic>();
790  int,
791  false,
792  false,
793  16,
794  1,
795  DeterministicMutex,
796  DeterministicAtomic>,
797  DeterministicAtomic>();
800  int,
801  true,
802  false,
803  16,
804  1,
805  DeterministicMutex,
806  DeterministicAtomic>,
807  DeterministicAtomic>();
810  int,
811  false,
812  false,
813  16,
814  16,
815  DeterministicMutex,
816  DeterministicAtomic>,
817  DeterministicAtomic>();
820  int,
821  true,
822  false,
823  16,
824  16,
825  DeterministicMutex,
826  DeterministicAtomic>,
827  DeterministicAtomic>();
828 }
DeterministicAtomicImpl< T, DeterministicSchedule > DeterministicAtomic
static std::function< size_t(size_t)> uniform(uint64_t seed)
static void DSchedMixedTest()
TEST ( CPQ  ,
ThroughtputBench   
)

Definition at line 1054 of file RelaxedConcurrentPriorityQueueTest.cpp.

References i, nthreads, nthrs, and s.

1054  {
1055  if (!FLAGS_bench) {
1056  return;
1057  }
1058  std::vector<int> test_sizes = {64, 512, 65536};
1059  std::vector<int> nthrs = {1, 2, 4, 8, 12, 14, 16, 28, 32, 56};
1060 
1061  std::cout << "Threads have equal chance to push and pop. \n"
1062  << "The bench caculates the avg execution time for\n"
1063  << "one operation (push OR pop).\n"
1064  << "GL : std::priority_queue protected by global lock\n"
1065  << "FL : flatcombinning priority queue\n"
1066  << "RCPQ: the relaxed concurrent priority queue\n"
1067  << std::endl;
1068  std::cout << "\nTest_name, Max time, Avg time, Min time" << std::endl;
1069  for (auto s : test_sizes) {
1070  std::cout << "\n ------ Initial size: " << s << " ------" << std::endl;
1071  for (int i : nthrs) {
1072  nthreads = i;
1073  std::cout << "Thread number: " << i << std::endl;
1074  throughtput_test<GlobalLockPQ<int>>("GL", s);
1075  throughtput_test<FCPQ>("FC", s);
1076  throughtput_test<RelaxedConcurrentPriorityQueue<int>>("RCPQ", s);
1077  }
1078  }
1079 }
const int nthrs
static uint32_t nthreads
static set< string > s
TEST ( CPQ  ,
ProducerConsumerBench   
)

Definition at line 1081 of file RelaxedConcurrentPriorityQueueTest.cpp.

References m, nthrs, producer_consumer_test(), and s.

1081  {
1082  if (!FLAGS_bench) {
1083  return;
1084  }
1085  std::vector<int> test_sizes = {0, 512, 65536};
1086  std::vector<int> nthrs = {1, 2, 4, 8, 12, 16, 24};
1087 
1088  std::cout << "<Producer, Consumer> pattern \n"
1089  << "The bench caculates the avg execution time for\n"
1090  << "push AND pop pair(two operations).\n"
1091  << "GL : std::priority_queue protected by global lock\n"
1092  << "FL : flatcombinning priority queue\n"
1093  << "RCPQ SPN: RCPQ spinning\n"
1094  << "RCPQ BLK: RCPQ blocking\n"
1095  << std::endl;
1096  for (int s : test_sizes) {
1097  std::cout << "\n ------ Scalability ------" << std::endl;
1098  for (int m : nthrs) {
1099  for (int n : nthrs) {
1100  if (m != n) {
1101  continue;
1102  }
1103  std::cout << "<" << m << " , " << n << "> , size = " << s << ":"
1104  << std::endl;
1105  producer_consumer_test<GlobalLockPQ<int>>("GL", m, n, s);
1106  producer_consumer_test<FCPQ>("FC", m, n, s);
1109  "RCPQ SPN", m, n, s);
1112  "RCPQ BLK", m, n, s);
1113  }
1114  }
1115  std::cout << "\n ------ Unbalanced(Producer<Consumer) ------" << std::endl;
1116  for (int m : nthrs) {
1117  for (int n : nthrs) {
1118  if (m > 4 || n - 4 <= m) {
1119  continue;
1120  }
1121  std::cout << "<" << m << " , " << n << "> , size = " << s << ":"
1122  << std::endl;
1123  producer_consumer_test<GlobalLockPQ<int>>("GL", m, n, s);
1124  producer_consumer_test<FCPQ>("FC", m, n, s);
1127  "RCPQ SPN", m, n, s);
1130  "RCPQ BLK", m, n, s);
1131  }
1132  }
1133 
1134  std::cout << "\n ------ Unbalanced(Producer>Consumer) ------" << std::endl;
1135  for (int m : nthrs) {
1136  for (int n : nthrs) {
1137  if (m <= 8 || n > m - 4 || n % 4 != 0) {
1138  continue;
1139  }
1140  std::cout << "<" << m << " , " << n << "> , size = " << s << ":"
1141  << std::endl;
1142  producer_consumer_test<GlobalLockPQ<int>>("GL", m, n, s);
1143  producer_consumer_test<FCPQ>("FC", m, n, s);
1146  "RCPQ SPN", m, n, s);
1149  "RCPQ BLK", m, n, s);
1150  }
1151  }
1152  }
1153 }
static uint64_t producer_consumer_test(std::string name, uint32_t PushThr, uint32_t PopThr, uint64_t initial_size)
const int nthrs
static map< string, int > m
static set< string > s
TEST ( CPQ  ,
Accuracy   
)

Definition at line 1155 of file RelaxedConcurrentPriorityQueueTest.cpp.

References s.

1155  {
1156  if (!FLAGS_bench) {
1157  return;
1158  }
1159  std::vector<int> test_sizes = {512, 65536, 1 << 20};
1160  std::vector<int> rates = {1000, 100, 10};
1161  for (auto s : test_sizes) {
1162  for (auto p : rates) {
1163  std::cout << "\n------ Size: " << s << " Get top: " << 100. / p << "%"
1164  << " (Num: " << s / p << ")"
1165  << " ------" << std::endl;
1166  accuracy_test<Queue<int>>("FIFO Q", s, p);
1167  accuracy_test<RelaxedConcurrentPriorityQueue<int, false, false, 0>>(
1168  "RCPQ(strict)", s, p);
1169  accuracy_test<RelaxedConcurrentPriorityQueue<int, false, false, 2>>(
1170  "RCPQ(batch=2)", s, p);
1171  accuracy_test<RelaxedConcurrentPriorityQueue<int, false, false, 8>>(
1172  "RCPQ(batch=8)", s, p);
1173  accuracy_test<RelaxedConcurrentPriorityQueue<int, false, false, 16>>(
1174  "RCPQ(batch=16)", s, p);
1175  accuracy_test<RelaxedConcurrentPriorityQueue<int, false, false, 50>>(
1176  "RCPQ(batch=50)", s, p);
1177  }
1178  }
1179 }
static set< string > s
template<class PriorityQueue >
static uint64_t throughtput_test ( std::string  name,
uint64_t  initial_size 
)
static

Definition at line 952 of file RelaxedConcurrentPriorityQueueTest.cpp.

References counter, i, max, min, name, nthreads, ops, folly::Random::rand32(), rng, run_once(), sum(), uint32_t, uint64_t, and val.

952  {
953  int ops = 1 << 18;
954  int reps = 15;
955  uint64_t min = UINTMAX_MAX;
956  uint64_t max = 0;
957  uint64_t sum = 0;
958 
959  for (int r = 0; r < reps; ++r) {
960  uint64_t dur;
961  PriorityQueue pq;
962 
964  rng.seed(initial_size);
965  // initialize the queue according to initial_size
966  for (uint64_t i = 0; i < initial_size; i++) {
967  int val = folly::Random::rand32(rng) % (ops + 1);
968  pq.push(val);
969  }
970 
971  auto fn = [&](uint32_t tid) {
973  rng_tl.seed(tid);
974  uint32_t counter = 0;
975  for (int i = tid; i < ops; i += nthreads) {
976  int val;
977  counter++;
978  if (counter % 2) {
979  val = folly::Random::rand32(rng_tl) % (ops + 1);
980  pq.push(val);
981  } else {
982  pq.pop(val);
983  }
984  }
985  };
986 
987  dur = run_once(fn);
988  sum += dur;
989  min = std::min(min, dur);
990  max = std::max(max, dur);
991  }
992 
993  uint64_t avg = sum / reps;
994  std::cout << std::setw(12) << name;
995  std::cout << " " << std::setw(8) << max / ops << " ns";
996  std::cout << " " << std::setw(8) << avg / ops << " ns";
997  std::cout << " " << std::setw(8) << min / ops << " ns";
998  std::cout << std::endl;
999  return min;
1000 }
static uint64_t run_once(const Func &fn)
execute the function for nthreads
std::atomic< int64_t > sum(0)
LogLevel max
Definition: LogLevel.cpp:31
double val
Definition: String.cpp:273
auto rng
Definition: CollectTest.cpp:31
const char * name
Definition: http_parser.c:437
LogLevel min
Definition: LogLevel.cpp:30
const int ops
static uint32_t nthreads
std::mt19937 DefaultGenerator
Definition: Random.h:97
std::atomic< int > counter
static uint32_t rand32()
Definition: Random.h:213

Variable Documentation

std::vector<int> nthr = {1, 2, 4, 8}
static

Definition at line 35 of file RelaxedConcurrentPriorityQueueTest.cpp.

std::vector<int> sizes = {0, 1024}
static

Definition at line 383 of file RelaxedConcurrentPriorityQueueTest.cpp.