proxygen
RelaxedConcurrentPriorityQueueTest.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2018-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include <thread>
17 
18 #include <boost/thread.hpp>
19 #include <folly/Random.h>
20 #include <folly/SpinLock.h>
26 #include <glog/logging.h>
27 
28 using namespace folly;
29 
30 DEFINE_bool(bench, false, "run benchmark");
31 DEFINE_int32(reps, 1, "number of reps");
32 DEFINE_int64(ops, 32, "number of operations per rep");
33 DEFINE_int64(elems, 64, "number of elements");
34 
35 static std::vector<int> nthr = {1, 2, 4, 8};
36 // threads number
38 
39 template <class PriorityQueue>
40 void basicOpsTest() {
41  int res;
42  PriorityQueue pq;
43 
44  EXPECT_TRUE(pq.empty());
45  EXPECT_EQ(pq.size(), 0);
46  pq.push(1);
47  pq.push(2);
48  pq.push(3);
49 
50  EXPECT_FALSE(pq.empty());
51  EXPECT_EQ(pq.size(), 3);
52  pq.pop(res);
53  EXPECT_EQ(res, 3);
54  pq.pop(res);
55  EXPECT_EQ(res, 2);
56  pq.pop(res);
57  EXPECT_EQ(res, 1);
58  EXPECT_TRUE(pq.empty());
59  EXPECT_EQ(pq.size(), 0);
60 
61  pq.push(3);
62  pq.push(2);
63  pq.push(1);
64 
65  pq.pop(res);
66  EXPECT_EQ(res, 3);
67  pq.pop(res);
68  EXPECT_EQ(res, 2);
69  pq.pop(res);
70  EXPECT_EQ(res, 1);
71  EXPECT_TRUE(pq.empty());
72  EXPECT_EQ(pq.size(), 0);
73 }
74 
75 TEST(CPQ, BasicOpsTest) {
76  // Spinning
77  basicOpsTest<RelaxedConcurrentPriorityQueue<int, false, true>>();
78  // Strict
79  basicOpsTest<RelaxedConcurrentPriorityQueue<int, false, true, 0>>();
80  basicOpsTest<RelaxedConcurrentPriorityQueue<int, false, true, 0, 1>>();
81  basicOpsTest<RelaxedConcurrentPriorityQueue<int, false, true, 0, 3>>();
82  // Relaxed
83  basicOpsTest<RelaxedConcurrentPriorityQueue<int, false, true, 1, 1>>();
84  basicOpsTest<RelaxedConcurrentPriorityQueue<int, false, true, 2, 1>>();
85  basicOpsTest<RelaxedConcurrentPriorityQueue<int, false, true, 3, 3>>();
86  // Blocking
87  basicOpsTest<RelaxedConcurrentPriorityQueue<int, true, true>>();
88  basicOpsTest<RelaxedConcurrentPriorityQueue<int, true, true, 0>>();
89  basicOpsTest<RelaxedConcurrentPriorityQueue<int, true, true, 2>>();
90 }
91 
93 template <typename Func>
94 static uint64_t run_once(const Func& fn) {
95  boost::barrier barrier_start{nthreads + 1};
96  std::vector<std::thread> threads(nthreads);
97  for (uint32_t tid = 0; tid < nthreads; ++tid) {
98  threads[tid] = std::thread([&, tid] {
99  barrier_start.wait();
100  fn(tid);
101  });
102  }
103 
104  barrier_start.wait(); // start the execution
105  auto tbegin = std::chrono::steady_clock::now();
106  for (auto& t : threads) {
107  t.join();
108  }
109 
110  // end time measurement
111  uint64_t duration = 0;
112  auto tend = std::chrono::steady_clock::now();
113  duration = std::chrono::duration_cast<std::chrono::nanoseconds>(tend - tbegin)
114  .count();
115  return duration;
116 }
117 
118 template <class PriorityQueue>
120  PriorityQueue pq;
121 
123  rng.seed(FLAGS_elems);
124  uint64_t expect_sum = 0;
125  // random push
126  for (int i = 0; i < FLAGS_elems; i++) {
127  int val = folly::Random::rand32(rng) % FLAGS_elems + 1;
128  expect_sum += val;
129  pq.push(val);
130  }
131 
132  int val = 0;
133  int counter = 0;
134  uint64_t sum = 0;
135  while (counter < FLAGS_elems) {
136  pq.pop(val);
137  sum += val;
138  counter++;
139  }
140 
141  EXPECT_EQ(sum, expect_sum);
142 }
143 
144 TEST(CPQ, SingleThrStrictImplTest) {
145  // spinning
146  singleThreadTest<RelaxedConcurrentPriorityQueue<int, false, false, 0>>();
147  singleThreadTest<RelaxedConcurrentPriorityQueue<int, false, false, 0, 1>>();
148  singleThreadTest<RelaxedConcurrentPriorityQueue<int, false, false, 0, 8>>();
149  // blocking
150  singleThreadTest<RelaxedConcurrentPriorityQueue<int, true, false, 0>>();
151  singleThreadTest<RelaxedConcurrentPriorityQueue<int, true, false, 0, 1>>();
152  singleThreadTest<RelaxedConcurrentPriorityQueue<int, true, false, 0, 8>>();
153 }
154 
155 TEST(CPQ, SingleThrRelaxedImplTest) {
156  // spinning
157  singleThreadTest<RelaxedConcurrentPriorityQueue<int, false, false, 1>>();
158  singleThreadTest<RelaxedConcurrentPriorityQueue<int, false, false, 8>>();
159  singleThreadTest<RelaxedConcurrentPriorityQueue<int, false, false, 8, 2>>();
160  singleThreadTest<RelaxedConcurrentPriorityQueue<int, false, false, 8, 8>>();
161  singleThreadTest<RelaxedConcurrentPriorityQueue<int, false, false, 2, 128>>();
162  singleThreadTest<RelaxedConcurrentPriorityQueue<int, false, false, 100, 8>>();
163  // blocking
164  singleThreadTest<RelaxedConcurrentPriorityQueue<int, true, false, 1>>();
165  singleThreadTest<RelaxedConcurrentPriorityQueue<int, true, false, 8>>();
166 }
167 
170 template <class PriorityQueue>
172  for (int t : nthr) {
173  PriorityQueue pq;
174 
176  rng.seed(FLAGS_elems);
177  uint64_t check_sum = 0;
178  // random push
179  for (int i = 0; i < FLAGS_elems; i++) {
180  int val = folly::Random::rand32(rng) % FLAGS_elems + 1;
181  pq.push(val);
182  check_sum += val;
183  }
184 
185  std::atomic<uint64_t> pop_sum(0);
186  std::atomic<int> to_end(0);
187 
188  nthreads = t;
189  auto fn = [&](uint32_t tid) {
190  int val = tid;
191  while (true) {
192  int index = to_end.fetch_add(1, std::memory_order_acq_rel);
193  if (index < FLAGS_elems) {
194  pq.pop(val);
195  } else {
196  break;
197  }
198  pop_sum.fetch_add(val, std::memory_order_acq_rel);
199  }
200  };
201  run_once(fn);
202  // check the sum of returned values of successful pop
203  EXPECT_EQ(pop_sum, check_sum);
204  }
205 }
206 
207 TEST(CPQ, ConcurrentPopStrictImplTest) {
208  // spinning
215  // blocking
222 }
223 
224 TEST(CPQ, ConcurrentPopRelaxedImplTest) {
225  // spinning
234  // blocking
241 }
242 
245 template <class PriorityQueue>
247  for (int t : nthr) {
248  PriorityQueue pq;
249  std::atomic<uint64_t> counter_sum(0);
250  nthreads = t;
251 
252  auto fn = [&](uint32_t tid) {
254  rng.seed(tid);
255  uint64_t local_sum = 0;
256  for (int i = tid; i < FLAGS_elems; i += nthreads) {
257  int val = folly::Random::rand32(rng) % FLAGS_elems + 1;
258  local_sum += val;
259  pq.push(val);
260  }
261  counter_sum.fetch_add(local_sum, std::memory_order_acq_rel);
262  };
263  run_once(fn);
264 
265  // check the total number of elements
266  uint64_t actual_sum = 0;
267  for (int i = 0; i < FLAGS_elems; i++) {
268  int res = 0;
269  pq.pop(res);
270  actual_sum += res;
271  }
272  EXPECT_EQ(actual_sum, counter_sum);
273  }
274 }
275 
276 TEST(CPQ, ConcurrentPushStrictImplTest) {
277  concurrentPush<RelaxedConcurrentPriorityQueue<int, false, false, 0>>();
278  concurrentPush<RelaxedConcurrentPriorityQueue<int, false, false, 0, 8>>();
279  concurrentPush<RelaxedConcurrentPriorityQueue<int, false, false, 0, 128>>();
280 }
281 
282 TEST(CPQ, ConcurrentPushRelaxedImplTest) {
283  concurrentPush<RelaxedConcurrentPriorityQueue<int, false, false>>();
284  concurrentPush<RelaxedConcurrentPriorityQueue<int, false, false, 1, 8>>();
285  concurrentPush<RelaxedConcurrentPriorityQueue<int, false, false, 2, 128>>();
286  concurrentPush<RelaxedConcurrentPriorityQueue<int, false, false, 128, 8>>();
287  concurrentPush<RelaxedConcurrentPriorityQueue<int, false, false, 8, 8>>();
288 }
289 
290 template <class PriorityQueue>
291 void concurrentOps(int ops) {
292  for (int t : nthr) {
293  PriorityQueue pq;
294  std::atomic<uint64_t> counter_push(0);
295  std::atomic<uint64_t> counter_pop(0);
296  nthreads = t;
297 
298  boost::barrier rb0{nthreads};
299  boost::barrier rb1{nthreads};
300  boost::barrier rb2{nthreads};
301  std::atomic<int> to_end(0);
302 
303  auto fn = [&](uint32_t tid) {
305  rng.seed(tid);
306  uint64_t local_push = 0;
307  uint64_t local_pop = 0;
308  int res;
309 
311  for (int i = tid; i < FLAGS_elems; i += nthreads) {
312  int val = folly::Random::rand32(rng) % FLAGS_elems + 1;
313  local_push++;
314  pq.push(val);
315  }
316  rb0.wait();
317 
319  for (int i = 0; i < ops; i++) {
320  if (ops % 2 == 0) {
321  int val = folly::Random::rand32(rng) % FLAGS_elems + 1;
322  local_push++;
323  pq.push(val);
324  } else {
325  pq.pop(res);
326  local_pop++;
327  }
328  }
329  rb1.wait();
330  // collecting the ops info for checking purpose
331  counter_push.fetch_add(local_push, std::memory_order_seq_cst);
332  counter_pop.fetch_add(local_pop, std::memory_order_seq_cst);
333  rb2.wait();
335  uint64_t r = counter_push.load(std::memory_order_seq_cst) -
336  counter_pop.load(std::memory_order_seq_cst);
337  while (true) {
338  uint64_t index = to_end.fetch_add(1, std::memory_order_acq_rel);
339  if (index < r) {
340  pq.pop(res);
341  } else {
342  break;
343  }
344  }
345  };
346  run_once(fn);
347  // the total push and pop ops should be the same
348  }
349 }
350 
351 template <class PriorityQueue>
353  for (int t : nthr) {
354  PriorityQueue pq;
355  nthreads = t;
356  EXPECT_TRUE(pq.empty());
357  auto fn = [&](uint32_t tid) {
359  rng.seed(tid);
360  uint64_t local_push = 0;
361  int res;
362 
364  for (int i = tid; i < FLAGS_elems; i += nthreads) {
365  int val = folly::Random::rand32(rng) % FLAGS_elems + 1;
366  local_push++;
367  pq.push(val);
368  }
369 
371  for (int i = 0; i < ops; i++) {
372  int val = folly::Random::rand32(rng) % FLAGS_elems + 1;
373  pq.push(val);
374  pq.pop(res);
375  }
376  };
377  run_once(fn);
378  // the total push and pop ops should be the same
379  EXPECT_EQ(pq.size(), FLAGS_elems);
380  }
381 }
382 
383 static std::vector<int> sizes = {0, 1024};
384 
385 TEST(CPQ, ConcurrentMixedStrictImplTest) {
386  for (auto size : sizes) {
387  concurrentOps<RelaxedConcurrentPriorityQueue<int, false, false, 0>>(size);
388  concurrentOps<RelaxedConcurrentPriorityQueue<int, false, false, 0, 1>>(
389  size);
390  concurrentOps<RelaxedConcurrentPriorityQueue<int, false, false, 0, 32>>(
391  size);
392  }
393 }
394 
395 TEST(CPQ, ConcurrentMixedRelaxedImplTest) {
396  for (auto size : sizes) {
397  concurrentOps<RelaxedConcurrentPriorityQueue<int, false, false>>(size);
398  concurrentOps<RelaxedConcurrentPriorityQueue<int, false, false, 1, 32>>(
399  size);
400  concurrentOps<RelaxedConcurrentPriorityQueue<int, false, false, 32, 1>>(
401  size);
402  concurrentOps<RelaxedConcurrentPriorityQueue<int, false, false, 8, 8>>(
403  size);
404  concurrentOps<RelaxedConcurrentPriorityQueue<int, true, false, 8, 8>>(size);
405  }
406 }
407 
408 TEST(CPQ, StrictImplSizeTest) {
409  for (auto size : sizes) {
410  concurrentSizeTest<RelaxedConcurrentPriorityQueue<int, false, true, 0>>(
411  size);
412  concurrentSizeTest<RelaxedConcurrentPriorityQueue<int, true, true, 0>>(
413  size);
414  }
415 }
416 
417 TEST(CPQ, RelaxedImplSizeTest) {
418  for (auto size : sizes) {
419  concurrentSizeTest<RelaxedConcurrentPriorityQueue<int, false, true>>(size);
420  concurrentSizeTest<RelaxedConcurrentPriorityQueue<int, true, true, 2, 8>>(
421  size);
422  concurrentSizeTest<RelaxedConcurrentPriorityQueue<int, false, true, 8, 2>>(
423  size);
424  }
425 }
426 
427 template <class PriorityQueue>
428 void multiPusherPopper(int PushThr, int PopThr) {
429  int ops = FLAGS_ops;
430  uint32_t total_threads = PushThr + PopThr;
431 
432  PriorityQueue pq;
433  std::atomic<uint64_t> sum_push(0);
434  std::atomic<uint64_t> sum_pop(0);
435 
436  auto fn_popthr = [&](uint32_t tid) {
437  for (int i = tid; i < ops; i += PopThr) {
438  int val;
439  pq.pop(val);
440  sum_pop.fetch_add(val, std::memory_order_acq_rel);
441  }
442  };
443 
444  auto fn_pushthr = [&](uint32_t tid) {
446  rng_t.seed(tid);
447  for (int i = tid; i < ops; i += PushThr) {
448  int val = folly::Random::rand32(rng_t);
449  pq.push(val);
450  sum_push.fetch_add(val, std::memory_order_acq_rel);
451  }
452  };
453  boost::barrier barrier_start{total_threads + 1};
454 
455  std::vector<std::thread> threads_push(PushThr);
456  for (int tid = 0; tid < PushThr; ++tid) {
457  threads_push[tid] = std::thread([&, tid] {
458  barrier_start.wait();
459  fn_pushthr(tid);
460  });
461  }
462  std::vector<std::thread> threads_pop(PopThr);
463  for (int tid = 0; tid < PopThr; ++tid) {
464  threads_pop[tid] = std::thread([&, tid] {
465  barrier_start.wait();
466  fn_popthr(tid);
467  });
468  }
469 
470  barrier_start.wait(); // start the execution
471  // begin time measurement
472  for (auto& t : threads_push) {
473  t.join();
474  }
475  for (auto& t : threads_pop) {
476  t.join();
477  }
478  EXPECT_EQ(sum_pop, sum_push);
479 }
480 
481 TEST(CPQ, PusherPopperBlockingTest) {
482  for (auto i : nthr) {
483  for (auto j : nthr) {
484  // Original
485  multiPusherPopper<RelaxedConcurrentPriorityQueue<int, true, false, 0, 1>>(
486  i, j);
487  // Relaxed
488  multiPusherPopper<RelaxedConcurrentPriorityQueue<int, true, false, 1, 8>>(
489  i, j);
494  multiPusherPopper<RelaxedConcurrentPriorityQueue<int, true, false>>(i, j);
497  }
498  }
499 }
500 
501 TEST(CPQ, PusherPopperSpinningTest) {
502  for (auto i : nthr) {
503  for (auto j : nthr) {
504  // Original
507  // Relaxed
514  multiPusherPopper<RelaxedConcurrentPriorityQueue<int, false, false>>(
515  i, j);
518  }
519  }
520 }
521 
522 template <class PriorityQueue>
524  PriorityQueue pq;
525  int nPop = 16;
526 
527  boost::barrier b{static_cast<unsigned int>(nPop + 1)};
528  std::atomic<int> finished{0};
529 
530  std::vector<std::thread> threads_pop(nPop);
531  for (int tid = 0; tid < nPop; ++tid) {
532  threads_pop[tid] = std::thread([&] {
533  int val;
534  b.wait();
535  pq.pop(val);
536  finished.fetch_add(1, std::memory_order_acq_rel);
537  });
538  }
539 
540  b.wait();
541  int c = 0;
542  // push to Mound one by one
543  // the popping threads should wake up one by one
544  do {
545  pq.push(1);
546  c++;
547  while (finished.load(std::memory_order_acquire) != c)
548  ;
549  EXPECT_EQ(finished.load(std::memory_order_acquire), c);
550  } while (c < nPop);
551 
552  for (auto& t : threads_pop) {
553  t.join();
554  }
555 }
556 
557 template <class PriorityQueue>
559  uint32_t nThrs = 16;
560 
561  for (int iter = 0; iter < FLAGS_reps * 10; iter++) {
562  PriorityQueue pq;
563  boost::barrier b{static_cast<unsigned int>(nThrs + nThrs + 1)};
564  std::atomic<uint32_t> finished{0};
565  std::vector<std::thread> threads_pop(nThrs);
566  for (uint32_t tid = 0; tid < nThrs; ++tid) {
567  threads_pop[tid] = std::thread([&] {
568  b.wait();
569  int val;
570  pq.pop(val);
571  finished.fetch_add(1, std::memory_order_acq_rel);
572  });
573  }
574 
575  std::vector<std::thread> threads_push(nThrs);
576  for (uint32_t tid = 0; tid < nThrs; ++tid) {
577  threads_push[tid] = std::thread([&, tid] {
578  b.wait();
579  pq.push(tid);
580  while (finished.load(std::memory_order_acquire) != nThrs)
581  ;
582  EXPECT_EQ(finished.load(std::memory_order_acquire), nThrs);
583  });
584  }
585 
586  b.wait();
587  for (auto& t : threads_pop) {
588  t.join();
589  }
590  for (auto& t : threads_push) {
591  t.join();
592  }
593  }
594 }
595 
596 TEST(CPQ, PopBlockingTest) {
597  // strict
598  blockingFirst<RelaxedConcurrentPriorityQueue<int, true, false, 0, 1>>();
599  blockingFirst<RelaxedConcurrentPriorityQueue<int, true, false, 0, 16>>();
600  // relaxed
601  blockingFirst<RelaxedConcurrentPriorityQueue<int, true, false>>();
602  blockingFirst<RelaxedConcurrentPriorityQueue<int, true, false, 8, 8>>();
603  blockingFirst<RelaxedConcurrentPriorityQueue<int, true, false, 16, 1>>();
604  // Spinning
605  blockingFirst<RelaxedConcurrentPriorityQueue<int, false, false>>();
606  blockingFirst<RelaxedConcurrentPriorityQueue<int, false, false, 8, 8>>();
607  blockingFirst<RelaxedConcurrentPriorityQueue<int, false, false, 16, 1>>();
608 }
609 
610 TEST(CPQ, MixedBlockingTest) {
611  // strict
612  concurrentBlocking<RelaxedConcurrentPriorityQueue<int, true, false, 0, 1>>();
613  concurrentBlocking<RelaxedConcurrentPriorityQueue<int, true, false, 0, 16>>();
614  // relaxed
615  concurrentBlocking<RelaxedConcurrentPriorityQueue<int, true, false>>();
616  concurrentBlocking<RelaxedConcurrentPriorityQueue<int, true, false, 8, 8>>();
617  concurrentBlocking<RelaxedConcurrentPriorityQueue<int, true, false, 16, 1>>();
618  // Spinning
619  concurrentBlocking<RelaxedConcurrentPriorityQueue<int, false, false>>();
620  concurrentBlocking<RelaxedConcurrentPriorityQueue<int, false, false, 8, 8>>();
623 }
624 
628 
629 template <class PriorityQueue, template <typename> class Atom = std::atomic>
630 static void DSchedMixedTest() {
631  for (int r = 0; r < FLAGS_reps; r++) {
632  // the thread number is 32
633  int thr = 8;
634  PriorityQueue pq;
636  rng.seed(thr);
637  uint64_t pre_sum = 0;
638  uint64_t pre_size = 0;
639  for (int i = 0; i < FLAGS_elems; i++) {
640  int val = folly::Random::rand32(rng) % FLAGS_elems + 1;
641  pq.push(val);
642  pre_sum += val;
643  }
644  pre_size = FLAGS_elems;
645  Atom<uint64_t> atom_push_sum{0};
646  Atom<uint64_t> atom_pop_sum{0};
647  std::vector<std::thread> threads(thr);
648  for (int tid = 0; tid < thr; ++tid) {
649  threads[tid] = DSched::thread([&]() {
651  tl_rng.seed(thr);
652  uint64_t pop_sum = 0;
653  uint64_t push_sum = 0;
654  for (int i = 0; i < FLAGS_ops; i++) {
655  int val = folly::Random::rand32(tl_rng) % FLAGS_elems + 1;
656  pq.push(val);
657  push_sum += val;
658  pq.pop(val);
659  pop_sum += val;
660  }
661  atom_push_sum.fetch_add(push_sum, std::memory_order_acq_rel);
662  atom_pop_sum.fetch_add(pop_sum, std::memory_order_acq_rel);
663  });
664  }
665 
666  for (auto& t : threads) {
667  DSched::join(t);
668  }
669 
670  // It checks the number of elements remain in Mound
671  while (pre_size > 0) {
672  pre_size--;
673  int val = -1;
674  pq.pop(val);
675  atom_pop_sum += val;
676  }
677  // Check the accumulation of popped and pushed priorities
678  EXPECT_EQ(atom_pop_sum, pre_sum + atom_push_sum);
679  }
680 }
681 
682 TEST(CPQ, DSchedMixedStrictTest) {
683  DSched sched(DSched::uniform(0));
686  int,
687  false,
688  false,
689  0,
690  25,
693  DeterministicAtomic>();
696  int,
697  true,
698  false,
699  0,
700  25,
701  DeterministicMutex,
702  DeterministicAtomic>,
703  DeterministicAtomic>();
706  int,
707  false,
708  false,
709  0,
710  1,
711  DeterministicMutex,
712  DeterministicAtomic>,
713  DeterministicAtomic>();
716  int,
717  true,
718  false,
719  0,
720  1,
721  DeterministicMutex,
722  DeterministicAtomic>,
723  DeterministicAtomic>();
726  int,
727  false,
728  false,
729  0,
730  128,
731  DeterministicMutex,
732  DeterministicAtomic>,
733  DeterministicAtomic>();
736  int,
737  true,
738  false,
739  0,
740  128,
741  DeterministicMutex,
742  DeterministicAtomic>,
743  DeterministicAtomic>();
744 }
745 
746 TEST(CPQ, DSchedMixedRelaxedTest) {
747  DSched sched(DSched::uniform(0));
750  int,
751  false,
752  false,
753  16,
754  25,
757  DeterministicAtomic>();
760  int,
761  true,
762  false,
763  16,
764  25,
765  DeterministicMutex,
766  DeterministicAtomic>,
767  DeterministicAtomic>();
770  int,
771  false,
772  false,
773  1,
774  16,
775  DeterministicMutex,
776  DeterministicAtomic>,
777  DeterministicAtomic>();
780  int,
781  true,
782  false,
783  1,
784  16,
785  DeterministicMutex,
786  DeterministicAtomic>,
787  DeterministicAtomic>();
790  int,
791  false,
792  false,
793  16,
794  1,
795  DeterministicMutex,
796  DeterministicAtomic>,
797  DeterministicAtomic>();
800  int,
801  true,
802  false,
803  16,
804  1,
805  DeterministicMutex,
806  DeterministicAtomic>,
807  DeterministicAtomic>();
810  int,
811  false,
812  false,
813  16,
814  16,
815  DeterministicMutex,
816  DeterministicAtomic>,
817  DeterministicAtomic>();
820  int,
821  true,
822  false,
823  16,
824  16,
825  DeterministicMutex,
826  DeterministicAtomic>,
827  DeterministicAtomic>();
828 }
829 
830 template <typename T>
831 class Queue {
832  std::queue<T> q_;
833 
834  public:
835  void push(const T& val) {
836  q_.push(val);
837  }
838  void pop(T& val) {
839  val = q_.front();
840  q_.pop();
841  }
842 };
843 
844 template <typename T>
846  std::priority_queue<T> q_;
848 
849  public:
850  void push(const T& val) {
851  std::lock_guard<std::mutex> g(m_);
852  q_.push(val);
853  }
854  void pop(T& val) {
855  while (true) {
856  std::lock_guard<std::mutex> g(m_);
857  if (q_.empty()) {
858  continue;
859  }
860  val = q_.top();
861  q_.pop();
862  return;
863  }
864  }
865 };
866 
867 template <class PriorityQueue>
870  uint32_t PushThr,
871  uint32_t PopThr,
872  uint64_t initial_size) {
873  int ops = 1 << 18;
874  int reps = 15;
875  if (name.find("RCPQ") != std::string::npos) {
876  ops <<= 3;
877  }
878  uint64_t min = UINTMAX_MAX;
879  uint64_t max = 0;
880  uint64_t sum = 0;
881  uint32_t total_threads = PushThr + PopThr;
882 
883  for (int r = 0; r < reps; ++r) {
884  uint64_t dur;
885  PriorityQueue pq;
886 
888  rng.seed(initial_size);
889  // initialize the queue according to initial_size
890  for (uint64_t i = 0; i < initial_size; i++) {
891  int val = folly::Random::rand32(rng) % ops;
892  pq.push(val);
893  }
894 
895  auto fn_popthr = [&](uint32_t tid) {
896  for (int i = tid; i < ops; i += PopThr) {
897  int val;
898  pq.pop(val);
899  }
900  };
901 
902  auto fn_pushthr = [&](uint32_t tid) {
904  rng_t.seed(tid);
905  for (int i = tid; i < ops; i += PushThr) {
906  int val = folly::Random::rand32(rng_t) % ops;
907  pq.push(val);
908  }
909  };
910  boost::barrier barrier_start{total_threads + 1};
911  std::vector<std::thread> threads_push(PushThr);
912  for (uint32_t tid = 0; tid < PushThr; ++tid) {
913  threads_push[tid] = std::thread([&, tid] {
914  barrier_start.wait();
915  fn_pushthr(tid);
916  });
917  }
918  std::vector<std::thread> threads_pop(PopThr);
919  for (uint32_t tid = 0; tid < PopThr; ++tid) {
920  threads_pop[tid] = std::thread([&, tid] {
921  barrier_start.wait();
922  fn_popthr(tid);
923  });
924  }
925  barrier_start.wait(); // start the execution
926  // begin time measurement
927  auto tbegin = std::chrono::steady_clock::now();
928  for (auto& t : threads_push) {
929  t.join();
930  }
931  for (auto& t : threads_pop) {
932  t.join();
933  }
934  // end time measurement
935  auto tend = std::chrono::steady_clock::now();
936  dur = std::chrono::duration_cast<std::chrono::nanoseconds>(tend - tbegin)
937  .count();
938  sum += dur;
939  min = std::min(min, dur);
940  max = std::max(max, dur);
941  }
942  uint64_t avg = sum / reps;
943  std::cout << std::setw(12) << name;
944  std::cout << " " << std::setw(8) << max / ops << " ns";
945  std::cout << " " << std::setw(8) << avg / ops << " ns";
946  std::cout << " " << std::setw(8) << min / ops << " ns";
947  std::cout << std::endl;
948  return min;
949 }
950 
951 template <class PriorityQueue>
953  int ops = 1 << 18;
954  int reps = 15;
955  uint64_t min = UINTMAX_MAX;
956  uint64_t max = 0;
957  uint64_t sum = 0;
958 
959  for (int r = 0; r < reps; ++r) {
960  uint64_t dur;
961  PriorityQueue pq;
962 
964  rng.seed(initial_size);
965  // initialize the queue according to initial_size
966  for (uint64_t i = 0; i < initial_size; i++) {
967  int val = folly::Random::rand32(rng) % (ops + 1);
968  pq.push(val);
969  }
970 
971  auto fn = [&](uint32_t tid) {
973  rng_tl.seed(tid);
974  uint32_t counter = 0;
975  for (int i = tid; i < ops; i += nthreads) {
976  int val;
977  counter++;
978  if (counter % 2) {
979  val = folly::Random::rand32(rng_tl) % (ops + 1);
980  pq.push(val);
981  } else {
982  pq.pop(val);
983  }
984  }
985  };
986 
987  dur = run_once(fn);
988  sum += dur;
989  min = std::min(min, dur);
990  max = std::max(max, dur);
991  }
992 
993  uint64_t avg = sum / reps;
994  std::cout << std::setw(12) << name;
995  std::cout << " " << std::setw(8) << max / ops << " ns";
996  std::cout << " " << std::setw(8) << avg / ops << " ns";
997  std::cout << " " << std::setw(8) << min / ops << " ns";
998  std::cout << std::endl;
999  return min;
1000 }
1001 
1002 template <class PriorityQueue>
1003 static void
1004 accuracy_test(std::string name, uint64_t initial_size, uint32_t top_percent) {
1005  int avg = 0;
1006  int reps = 15;
1007  int valid = initial_size / top_percent;
1008  if (valid < 1) {
1009  return;
1010  }
1011  int target = initial_size - valid;
1012  for (int r = 0; r < reps; ++r) {
1013  PriorityQueue pq;
1014  std::unordered_set<int> filter;
1016  rng.seed(initial_size + r);
1017 
1018  // initialize the queue according to initial_size
1019  // eliminate repeated priorities
1020  for (uint64_t i = 0; i < initial_size; i++) {
1021  int val;
1022  do {
1023  val = folly::Random::rand32(rng) % initial_size;
1024  } while (filter.find(val) != filter.end());
1025  filter.insert(val);
1026  pq.push(val);
1027  }
1028 
1029  int counter = 0;
1030  int stop = valid;
1031  for (uint64_t i = 0; i < initial_size; i++) {
1032  int val;
1033  pq.pop(val);
1034  if (val >= target) {
1035  stop--;
1036  }
1037  if (stop > 0 && val < target) {
1038  counter++;
1039  }
1040  if (stop == 0) {
1041  break;
1042  }
1043  }
1044  avg += counter;
1045  }
1046  avg /= reps;
1047  std::cout << std::setw(16) << name << " ";
1048  std::cout << "Lower priority popped: " << avg;
1049  std::cout << std::endl;
1050 }
1051 
1052 using FCPQ = folly::FlatCombiningPriorityQueue<int>;
1053 
1054 TEST(CPQ, ThroughtputBench) {
1055  if (!FLAGS_bench) {
1056  return;
1057  }
1058  std::vector<int> test_sizes = {64, 512, 65536};
1059  std::vector<int> nthrs = {1, 2, 4, 8, 12, 14, 16, 28, 32, 56};
1060 
1061  std::cout << "Threads have equal chance to push and pop. \n"
1062  << "The bench caculates the avg execution time for\n"
1063  << "one operation (push OR pop).\n"
1064  << "GL : std::priority_queue protected by global lock\n"
1065  << "FL : flatcombinning priority queue\n"
1066  << "RCPQ: the relaxed concurrent priority queue\n"
1067  << std::endl;
1068  std::cout << "\nTest_name, Max time, Avg time, Min time" << std::endl;
1069  for (auto s : test_sizes) {
1070  std::cout << "\n ------ Initial size: " << s << " ------" << std::endl;
1071  for (int i : nthrs) {
1072  nthreads = i;
1073  std::cout << "Thread number: " << i << std::endl;
1074  throughtput_test<GlobalLockPQ<int>>("GL", s);
1075  throughtput_test<FCPQ>("FC", s);
1076  throughtput_test<RelaxedConcurrentPriorityQueue<int>>("RCPQ", s);
1077  }
1078  }
1079 }
1080 
1081 TEST(CPQ, ProducerConsumerBench) {
1082  if (!FLAGS_bench) {
1083  return;
1084  }
1085  std::vector<int> test_sizes = {0, 512, 65536};
1086  std::vector<int> nthrs = {1, 2, 4, 8, 12, 16, 24};
1087 
1088  std::cout << "<Producer, Consumer> pattern \n"
1089  << "The bench caculates the avg execution time for\n"
1090  << "push AND pop pair(two operations).\n"
1091  << "GL : std::priority_queue protected by global lock\n"
1092  << "FL : flatcombinning priority queue\n"
1093  << "RCPQ SPN: RCPQ spinning\n"
1094  << "RCPQ BLK: RCPQ blocking\n"
1095  << std::endl;
1096  for (int s : test_sizes) {
1097  std::cout << "\n ------ Scalability ------" << std::endl;
1098  for (int m : nthrs) {
1099  for (int n : nthrs) {
1100  if (m != n) {
1101  continue;
1102  }
1103  std::cout << "<" << m << " , " << n << "> , size = " << s << ":"
1104  << std::endl;
1105  producer_consumer_test<GlobalLockPQ<int>>("GL", m, n, s);
1106  producer_consumer_test<FCPQ>("FC", m, n, s);
1109  "RCPQ SPN", m, n, s);
1112  "RCPQ BLK", m, n, s);
1113  }
1114  }
1115  std::cout << "\n ------ Unbalanced(Producer<Consumer) ------" << std::endl;
1116  for (int m : nthrs) {
1117  for (int n : nthrs) {
1118  if (m > 4 || n - 4 <= m) {
1119  continue;
1120  }
1121  std::cout << "<" << m << " , " << n << "> , size = " << s << ":"
1122  << std::endl;
1123  producer_consumer_test<GlobalLockPQ<int>>("GL", m, n, s);
1124  producer_consumer_test<FCPQ>("FC", m, n, s);
1127  "RCPQ SPN", m, n, s);
1130  "RCPQ BLK", m, n, s);
1131  }
1132  }
1133 
1134  std::cout << "\n ------ Unbalanced(Producer>Consumer) ------" << std::endl;
1135  for (int m : nthrs) {
1136  for (int n : nthrs) {
1137  if (m <= 8 || n > m - 4 || n % 4 != 0) {
1138  continue;
1139  }
1140  std::cout << "<" << m << " , " << n << "> , size = " << s << ":"
1141  << std::endl;
1142  producer_consumer_test<GlobalLockPQ<int>>("GL", m, n, s);
1143  producer_consumer_test<FCPQ>("FC", m, n, s);
1146  "RCPQ SPN", m, n, s);
1149  "RCPQ BLK", m, n, s);
1150  }
1151  }
1152  }
1153 }
1154 
1155 TEST(CPQ, Accuracy) {
1156  if (!FLAGS_bench) {
1157  return;
1158  }
1159  std::vector<int> test_sizes = {512, 65536, 1 << 20};
1160  std::vector<int> rates = {1000, 100, 10};
1161  for (auto s : test_sizes) {
1162  for (auto p : rates) {
1163  std::cout << "\n------ Size: " << s << " Get top: " << 100. / p << "%"
1164  << " (Num: " << s / p << ")"
1165  << " ------" << std::endl;
1166  accuracy_test<Queue<int>>("FIFO Q", s, p);
1167  accuracy_test<RelaxedConcurrentPriorityQueue<int, false, false, 0>>(
1168  "RCPQ(strict)", s, p);
1169  accuracy_test<RelaxedConcurrentPriorityQueue<int, false, false, 2>>(
1170  "RCPQ(batch=2)", s, p);
1171  accuracy_test<RelaxedConcurrentPriorityQueue<int, false, false, 8>>(
1172  "RCPQ(batch=8)", s, p);
1173  accuracy_test<RelaxedConcurrentPriorityQueue<int, false, false, 16>>(
1174  "RCPQ(batch=16)", s, p);
1175  accuracy_test<RelaxedConcurrentPriorityQueue<int, false, false, 50>>(
1176  "RCPQ(batch=50)", s, p);
1177  }
1178  }
1179 }
1180 
1181 /*
1182  * The folly::SpinningLock use CAS directly for try_lock, which is not
1183 efficient in the
1184  * experiment. The lock used in the experiment based on the test-test-and-set
1185 lock(Add
1186  * check before doing CAS).
1187  *
1188 Threads have equal chance to push and pop.
1189 The bench caculates the avg execution time for
1190 one operation (push OR pop).
1191 GL : std::priority_queue protected by global lock
1192 FL : flatcombinning priority queue
1193 RCPQ: the relaxed concurrent priority queue
1194 
1195 Test_name, Max time, Avg time, Min time
1196 
1197  ------ Initial size: 64 ------
1198 Thread number: 1
1199  GL 30 ns 29 ns 27 ns
1200  FC 47 ns 42 ns 40 ns
1201  RCPQ 85 ns 81 ns 77 ns
1202 Thread number: 2
1203  GL 377 ns 274 ns 154 ns
1204  FC 227 ns 187 ns 139 ns
1205  RCPQ 108 ns 106 ns 102 ns
1206 Thread number: 4
1207  GL 244 ns 214 ns 191 ns
1208  FC 212 ns 191 ns 173 ns
1209  RCPQ 98 ns 95 ns 92 ns
1210 Thread number: 8
1211  GL 252 ns 221 ns 197 ns
1212  FC 127 ns 112 ns 102 ns
1213  RCPQ 78 ns 78 ns 76 ns
1214 Thread number: 12
1215  GL 251 ns 227 ns 217 ns
1216  FC 104 ns 96 ns 88 ns
1217  RCPQ 81 ns 79 ns 77 ns
1218 Thread number: 14
1219  GL 243 ns 232 ns 224 ns
1220  FC 103 ns 96 ns 90 ns
1221  RCPQ 84 ns 82 ns 81 ns
1222 Thread number: 16
1223  GL 254 ns 239 ns 229 ns
1224  FC 105 ns 98 ns 92 ns
1225  RCPQ 88 ns 85 ns 83 ns
1226 Thread number: 28
1227  GL 265 ns 261 ns 258 ns
1228  FC 106 ns 100 ns 96 ns
1229  RCPQ 93 ns 87 ns 68 ns
1230 Thread number: 32
1231  GL 274 ns 267 ns 261 ns
1232  FC 110 ns 98 ns 37 ns
1233  RCPQ 93 ns 80 ns 47 ns
1234 Thread number: 56
1235  GL 274 ns 263 ns 257 ns
1236  FC 78 ns 50 ns 24 ns
1237  RCPQ 85 ns 71 ns 45 ns
1238 
1239  ------ Initial size: 512 ------
1240 Thread number: 1
1241  GL 36 ns 35 ns 33 ns
1242  FC 54 ns 49 ns 47 ns
1243  RCPQ 79 ns 76 ns 72 ns
1244 Thread number: 2
1245  GL 248 ns 187 ns 151 ns
1246  FC 228 ns 179 ns 147 ns
1247  RCPQ 95 ns 92 ns 90 ns
1248 Thread number: 4
1249  GL 282 ns 260 ns 236 ns
1250  FC 218 ns 199 ns 174 ns
1251  RCPQ 85 ns 81 ns 79 ns
1252 Thread number: 8
1253  GL 306 ns 288 ns 270 ns
1254  FC 188 ns 114 ns 104 ns
1255  RCPQ 64 ns 62 ns 59 ns
1256 Thread number: 12
1257  GL 317 ns 296 ns 280 ns
1258  FC 105 ns 99 ns 91 ns
1259  RCPQ 59 ns 57 ns 52 ns
1260 Thread number: 14
1261  GL 331 ns 305 ns 293 ns
1262  FC 109 ns 99 ns 92 ns
1263  RCPQ 64 ns 57 ns 53 ns
1264 Thread number: 16
1265  GL 316 ns 308 ns 291 ns
1266  FC 110 ns 99 ns 92 ns
1267  RCPQ 58 ns 54 ns 52 ns
1268 Thread number: 28
1269  GL 348 ns 339 ns 333 ns
1270  FC 109 ns 105 ns 100 ns
1271  RCPQ 64 ns 62 ns 56 ns
1272 Thread number: 32
1273  GL 353 ns 347 ns 341 ns
1274  FC 116 ns 102 ns 39 ns
1275  RCPQ 62 ns 32 ns 3 ns
1276 Thread number: 56
1277  GL 360 ns 352 ns 342 ns
1278  FC 101 ns 58 ns 41 ns
1279  RCPQ 59 ns 43 ns 26 ns
1280 
1281  ------ Initial size: 65536 ------
1282 Thread number: 1
1283  GL 64 ns 60 ns 56 ns
1284  FC 93 ns 72 ns 67 ns
1285  RCPQ 293 ns 286 ns 281 ns
1286 Thread number: 2
1287  GL 262 ns 248 ns 231 ns
1288  FC 318 ns 301 ns 288 ns
1289  RCPQ 230 ns 216 ns 206 ns
1290 Thread number: 4
1291  GL 463 ns 452 ns 408 ns
1292  FC 273 ns 265 ns 257 ns
1293  RCPQ 141 ns 131 ns 126 ns
1294 Thread number: 8
1295  GL 582 ns 574 ns 569 ns
1296  FC 152 ns 139 ns 131 ns
1297  RCPQ 98 ns 81 ns 72 ns
1298 Thread number: 12
1299  GL 593 ns 586 ns 576 ns
1300  FC 126 ns 123 ns 119 ns
1301  RCPQ 85 ns 72 ns 62 ns
1302 Thread number: 14
1303  GL 599 ns 595 ns 588 ns
1304  FC 138 ns 123 ns 119 ns
1305  RCPQ 79 ns 70 ns 62 ns
1306 Thread number: 16
1307  GL 599 ns 592 ns 587 ns
1308  FC 138 ns 123 ns 117 ns
1309  RCPQ 75 ns 65 ns 56 ns
1310 Thread number: 28
1311  GL 611 ns 609 ns 608 ns
1312  FC 147 ns 144 ns 137 ns
1313  RCPQ 74 ns 70 ns 66 ns
1314 Thread number: 32
1315  GL 635 ns 630 ns 627 ns
1316  FC 151 ns 143 ns 76 ns
1317  RCPQ 199 ns 94 ns 59 ns
1318 Thread number: 56
1319  GL 637 ns 633 ns 627 ns
1320  FC 176 ns 103 ns 41 ns
1321  RCPQ 561 ns 132 ns 46 ns
1322 
1323 
1324 <Producer, Consumer> pattern
1325 The bench caculates the avg execution time for
1326 push AND pop pair(two operations).
1327 GL : std::priority_queue protected by global lock
1328 FL : flatcombinning priority queue
1329 RCPQ SPN: RCPQ spinning
1330 RCPQ BLK: RCPQ blocking
1331 
1332 
1333  ------ Scalability ------
1334 <1 , 1> , size = 0:
1335  GL 781 ns 735 ns 652 ns
1336  FC 599 ns 535 ns 462 ns
1337  RCPQ SPN 178 ns 166 ns 148 ns
1338  RCPQ BLK 217 ns 201 ns 182 ns
1339 <2 , 2> , size = 0:
1340  GL 686 ns 665 ns 619 ns
1341  FC 487 ns 430 ns 398 ns
1342  RCPQ SPN 281 ns 239 ns 139 ns
1343  RCPQ BLK 405 ns 367 ns 181 ns
1344 <4 , 4> , size = 0:
1345  GL 1106 ns 1082 ns 1050 ns
1346  FC 278 ns 242 ns 208 ns
1347  RCPQ SPN 114 ns 107 ns 103 ns
1348  RCPQ BLK 169 ns 158 ns 148 ns
1349 <8 , 8> , size = 0:
1350  GL 1169 ns 1156 ns 1144 ns
1351  FC 236 ns 214 ns 197 ns
1352  RCPQ SPN 121 ns 114 ns 110 ns
1353  RCPQ BLK 154 ns 150 ns 141 ns
1354 <12 , 12> , size = 0:
1355  GL 1191 ns 1185 ns 1178 ns
1356  FC 232 ns 221 ns 201 ns
1357  RCPQ SPN 802 ns 205 ns 123 ns
1358  RCPQ BLK 218 ns 161 ns 147 ns
1359 <16 , 16> , size = 0:
1360  GL 1236 ns 1227 ns 1221 ns
1361  FC 269 ns 258 ns 243 ns
1362  RCPQ SPN 826 ns 733 ns 655 ns
1363  RCPQ BLK 172 ns 149 ns 137 ns
1364 <24 , 24> , size = 0:
1365  GL 1269 ns 1262 ns 1255 ns
1366  FC 280 ns 225 ns 171 ns
1367  RCPQ SPN 931 ns 891 ns 836 ns
1368  RCPQ BLK 611 ns 445 ns 362 ns
1369 
1370  ------ Unbalanced(Producer<Consumer) ------
1371 <1 , 8> , size = 0:
1372  GL 1454 ns 1225 ns 1144 ns
1373  FC 2141 ns 1974 ns 1811 ns
1374  RCPQ SPN 597 ns 586 ns 573 ns
1375  RCPQ BLK 663 ns 649 ns 636 ns
1376 <1 , 12> , size = 0:
1377  GL 1763 ns 1658 ns 1591 ns
1378  FC 3396 ns 3261 ns 3107 ns
1379  RCPQ SPN 735 ns 714 ns 651 ns
1380  RCPQ BLK 773 ns 761 ns 744 ns
1381 <1 , 16> , size = 0:
1382  GL 2231 ns 2070 ns 1963 ns
1383  FC 6305 ns 5771 ns 5603 ns
1384  RCPQ SPN 787 ns 756 ns 694 ns
1385  RCPQ BLK 828 ns 806 ns 775 ns
1386 <1 , 24> , size = 0:
1387  GL 3802 ns 3545 ns 3229 ns
1388  FC 10625 ns 10311 ns 10119 ns
1389  RCPQ SPN 781 ns 756 ns 739 ns
1390  RCPQ BLK 892 ns 882 ns 870 ns
1391 <2 , 8> , size = 0:
1392  GL 873 ns 750 ns 718 ns
1393  FC 815 ns 712 ns 659 ns
1394  RCPQ SPN 720 ns 691 ns 673 ns
1395  RCPQ BLK 738 ns 707 ns 694 ns
1396 <2 , 12> , size = 0:
1397  GL 1061 ns 968 ns 904 ns
1398  FC 1410 ns 1227 ns 1190 ns
1399  RCPQ SPN 862 ns 829 ns 767 ns
1400  RCPQ BLK 825 ns 804 ns 771 ns
1401 <2 , 16> , size = 0:
1402  GL 1438 ns 1283 ns 1162 ns
1403  FC 2095 ns 2012 ns 1909 ns
1404  RCPQ SPN 763 ns 706 ns 628 ns
1405  RCPQ BLK 833 ns 804 ns 777 ns
1406 <2 , 24> , size = 0:
1407  GL 2031 ns 1972 ns 1872 ns
1408  FC 4298 ns 4191 ns 4107 ns
1409  RCPQ SPN 762 ns 709 ns 680 ns
1410  RCPQ BLK 876 ns 859 ns 825 ns
1411 <4 , 12> , size = 0:
1412  GL 696 ns 649 ns 606 ns
1413  FC 561 ns 517 ns 480 ns
1414  RCPQ SPN 759 ns 698 ns 498 ns
1415  RCPQ BLK 823 ns 803 ns 786 ns
1416 <4 , 16> , size = 0:
1417  GL 862 ns 800 ns 749 ns
1418  FC 857 ns 824 ns 781 ns
1419  RCPQ SPN 730 ns 679 ns 589 ns
1420  RCPQ BLK 863 ns 824 ns 803 ns
1421 <4 , 24> , size = 0:
1422  GL 1138 ns 1125 ns 1105 ns
1423  FC 1635 ns 1576 ns 1540 ns
1424  RCPQ SPN 756 ns 717 ns 668 ns
1425  RCPQ BLK 865 ns 839 ns 812 ns
1426 
1427  ------ Unbalanced(Producer>Consumer) ------
1428 <12 , 4> , size = 0:
1429  GL 1115 ns 1087 ns 1053 ns
1430  FC 373 ns 355 ns 333 ns
1431  RCPQ SPN 155 ns 147 ns 142 ns
1432  RCPQ BLK 202 ns 190 ns 182 ns
1433 <12 , 8> , size = 0:
1434  GL 1167 ns 1157 ns 1148 ns
1435  FC 281 ns 256 ns 227 ns
1436  RCPQ SPN 132 ns 126 ns 120 ns
1437  RCPQ BLK 175 ns 164 ns 161 ns
1438 <16 , 4> , size = 0:
1439  GL 1103 ns 1088 ns 1074 ns
1440  FC 442 ns 380 ns 327 ns
1441  RCPQ SPN 178 ns 162 ns 150 ns
1442  RCPQ BLK 217 ns 200 ns 188 ns
1443 <16 , 8> , size = 0:
1444  GL 1164 ns 1153 ns 1143 ns
1445  FC 290 ns 268 ns 243 ns
1446  RCPQ SPN 146 ns 138 ns 134 ns
1447  RCPQ BLK 184 ns 175 ns 161 ns
1448 <16 , 12> , size = 0:
1449  GL 1196 ns 1189 ns 1185 ns
1450  FC 269 ns 260 ns 245 ns
1451  RCPQ SPN 405 ns 172 ns 129 ns
1452  RCPQ BLK 172 ns 165 ns 152 ns
1453 <24 , 4> , size = 0:
1454  GL 1097 ns 1081 ns 1030 ns
1455  FC 407 ns 369 ns 301 ns
1456  RCPQ SPN 184 ns 176 ns 164 ns
1457  RCPQ BLK 220 ns 211 ns 201 ns
1458 <24 , 8> , size = 0:
1459  GL 1177 ns 1158 ns 1148 ns
1460  FC 321 ns 297 ns 233 ns
1461  RCPQ SPN 155 ns 148 ns 139 ns
1462  RCPQ BLK 204 ns 188 ns 173 ns
1463 <24 , 12> , size = 0:
1464  GL 1224 ns 1215 ns 1205 ns
1465  FC 320 ns 287 ns 218 ns
1466  RCPQ SPN 145 ns 141 ns 135 ns
1467  RCPQ BLK 176 ns 167 ns 160 ns
1468 <24 , 16> , size = 0:
1469  GL 1250 ns 1244 ns 1238 ns
1470  FC 339 ns 257 ns 209 ns
1471  RCPQ SPN 615 ns 480 ns 359 ns
1472  RCPQ BLK 185 ns 151 ns 137 ns
1473 
1474 [ RUN ] CPQ.Accuracy
1475 The Accuracy test check how many pops return lower
1476 priority when popping the top X% priorities.
1477 The default batch size is 16.
1478 
1479 ------ Size: 512 Get top: 1% (Num: 5) ------
1480  FIFO Q Lower priority popped: 439
1481  RCPQ(strict) Lower priority popped: 0
1482  RCPQ(batch=2) Lower priority popped: 1
1483  RCPQ(batch=8) Lower priority popped: 10
1484  RCPQ(batch=16) Lower priority popped: 13
1485  RCPQ(batch=50) Lower priority popped: 11
1486 
1487 ------ Size: 512 Get top: 10% (Num: 51) ------
1488  FIFO Q Lower priority popped: 451
1489  RCPQ(strict) Lower priority popped: 0
1490  RCPQ(batch=2) Lower priority popped: 15
1491  RCPQ(batch=8) Lower priority popped: 73
1492  RCPQ(batch=16) Lower priority popped: 147
1493  RCPQ(batch=50) Lower priority popped: 201
1494 
1495 ------ Size: 65536 Get top: 0.1% (Num: 65) ------
1496  FIFO Q Lower priority popped: 64917
1497  RCPQ(strict) Lower priority popped: 0
1498  RCPQ(batch=2) Lower priority popped: 35
1499  RCPQ(batch=8) Lower priority popped: 190
1500  RCPQ(batch=16) Lower priority popped: 387
1501  RCPQ(batch=50) Lower priority popped: 655
1502 
1503 ------ Size: 65536 Get top: 1% (Num: 655) ------
1504  FIFO Q Lower priority popped: 64793
1505  RCPQ(strict) Lower priority popped: 0
1506  RCPQ(batch=2) Lower priority popped: 122
1507  RCPQ(batch=8) Lower priority popped: 516
1508  RCPQ(batch=16) Lower priority popped: 1450
1509  RCPQ(batch=50) Lower priority popped: 3219
1510 
1511 ------ Size: 65536 Get top: 10% (Num: 6553) ------
1512  FIFO Q Lower priority popped: 58977
1513  RCPQ(strict) Lower priority popped: 0
1514  RCPQ(batch=2) Lower priority popped: 174
1515  RCPQ(batch=8) Lower priority popped: 753
1516  RCPQ(batch=16) Lower priority popped: 1436
1517  RCPQ(batch=50) Lower priority popped: 3297
1518 
1519 ------ Size: 1048576 Get top: 0.1% (Num: 1048) ------
1520  FIFO Q Lower priority popped: 1046345
1521  RCPQ(strict) Lower priority popped: 0
1522  RCPQ(batch=2) Lower priority popped: 124
1523  RCPQ(batch=8) Lower priority popped: 449
1524  RCPQ(batch=16) Lower priority popped: 1111
1525  RCPQ(batch=50) Lower priority popped: 3648
1526 
1527 ------ Size: 1048576 Get top: 1% (Num: 10485) ------
1528  FIFO Q Lower priority popped: 1038012
1529  RCPQ(strict) Lower priority popped: 0
1530  RCPQ(batch=2) Lower priority popped: 297
1531  RCPQ(batch=8) Lower priority popped: 1241
1532  RCPQ(batch=16) Lower priority popped: 2489
1533  RCPQ(batch=50) Lower priority popped: 7764
1534 
1535 ------ Size: 1048576 Get top: 10% (Num: 104857) ------
1536  FIFO Q Lower priority popped: 943706
1537  RCPQ(strict) Lower priority popped: 0
1538  RCPQ(batch=2) Lower priority popped: 1984
1539  RCPQ(batch=8) Lower priority popped: 8150
1540  RCPQ(batch=16) Lower priority popped: 15787
1541  RCPQ(batch=50) Lower priority popped: 42778
1542 
1543 The experiment was running on 1 NUMA node,
1544 which is 14 cores.
1545 
1546 rchitecture: x86_64
1547 CPU op-mode(s): 32-bit, 64-bit
1548 Byte Order: Little Endian
1549 CPU(s): 56
1550 On-line CPU(s) list: 0-55
1551 Thread(s) per core: 2
1552 Core(s) per socket: 14
1553 Socket(s): 2
1554 NUMA node(s): 2
1555 Vendor ID: GenuineIntel
1556 CPU family: 6
1557 Model: 79
1558 Model name: Intel(R) Xeon(R) CPU E5-2680 v4 @ 2.40GHz
1559 Stepping: 1
1560 CPU MHz: 2401.000
1561 CPU max MHz: 2401.0000
1562 CPU min MHz: 1200.0000
1563 BogoMIPS: 4788.91
1564 Virtualization: VT-x
1565 L1d cache: 32K
1566 L1i cache: 32K
1567 L2 cache: 256K
1568 L3 cache: 35840K
1569 NUMA node0 CPU(s): 0-13,28-41
1570 NUMA node1 CPU(s): 14-27,42-55
1571 */
static uint64_t run_once(const Func &fn)
execute the function for nthreads
static uint64_t producer_consumer_test(std::string name, uint32_t PushThr, uint32_t PopThr, uint64_t initial_size)
DEFINE_bool(bench, false,"run benchmark")
std::atomic< int64_t > sum(0)
PUSHMI_INLINE_VAR constexpr detail::filter_fn filter
Definition: filter.h:75
DEFINE_int32(reps, 1,"number of reps")
char b
LogLevel max
Definition: LogLevel.cpp:31
#define EXPECT_EQ(val1, val2)
Definition: gtest.h:1922
std::chrono::steady_clock::time_point now()
DeterministicAtomicImpl< T, DeterministicSchedule > DeterministicAtomic
double val
Definition: String.cpp:273
folly::std T
static std::thread thread(Func &&func, Args &&...args)
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
DEFINE_int64(threadtimeout_ms, 60000,"Idle time before ThreadPoolExecutor threads are joined")
void multiPusherPopper(int PushThr, int PopThr)
std::vector< std::thread::id > threads
auto rng
Definition: CollectTest.cpp:31
const int nthrs
const char * name
Definition: http_parser.c:437
constexpr auto size(C const &c) -> decltype(c.size())
Definition: Access.h:45
static std::function< size_t(size_t)> uniform(uint64_t seed)
Function< void()> Func
Definition: Executor.h:27
static void stop()
LogLevel min
Definition: LogLevel.cpp:30
folly::FlatCombiningPriorityQueue< int > FCPQ
static void accuracy_test(std::string name, uint64_t initial_size, uint32_t top_percent)
void concurrentSizeTest(int ops)
std::priority_queue< T > q_
#define Atom
static map< string, int > m
const int ops
static uint32_t nthreads
std::mt19937 DefaultGenerator
Definition: Random.h:97
void concurrentPopforSharedBuffer()
int * count
#define EXPECT_TRUE(condition)
Definition: gtest.h:1859
std::mutex mutex
std::atomic< int > counter
const char * string
Definition: Conv.cpp:212
g_t g(f_t)
static set< string > s
void concurrentOps(int ops)
#define EXPECT_FALSE(condition)
Definition: gtest.h:1862
static void join(std::thread &child)
static uint32_t rand32()
Definition: Random.h:213
static uint64_t throughtput_test(std::string name, uint64_t initial_size)
char c
static void DSchedMixedTest()
TEST(SequencedExecutor, CPUThreadPoolExecutor)
static std::vector< int > sizes
uint64_t bench(const int nprod, const int ncons, const std::string &name)
static std::vector< int > nthr