proxygen
FlatCombiningPriorityQueueTest.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 #include <folly/Benchmark.h>
20 #include <glog/logging.h>
21 
22 #include <condition_variable>
23 #include <mutex>
24 #include <queue>
25 
26 DEFINE_bool(bench, false, "run benchmark");
27 DEFINE_int32(reps, 10, "number of reps");
28 DEFINE_int32(ops, 100000, "number of operations per rep");
29 DEFINE_int32(size, 64, "initial size of the priority queue");
30 DEFINE_int32(work, 1000, "amount of unrelated work per operation");
31 
32 void doWork(int work) {
33  uint64_t a = 0;
34  for (int i = work; i > 0; --i) {
35  a += i;
36  }
38 }
39 
42 template <
43  typename T,
44  typename PriorityQueue = std::priority_queue<T>,
45  typename Mutex = std::mutex>
46 class BaselinePQ {
47  public:
48  template <
49  typename... PQArgs,
50  typename = decltype(PriorityQueue(std::declval<PQArgs>()...))>
51  explicit BaselinePQ(size_t maxSize = 0, PQArgs... args)
52  : maxSize_(maxSize), pq_(std::forward<PQArgs>(args)...) {}
53 
54  bool empty() const {
55  std::lock_guard<Mutex> g(m_);
56  return pq_.empty();
57  }
58 
59  size_t size() const {
60  std::lock_guard<Mutex> g(m_);
61  return pq_.size();
62  }
63 
64  bool try_push(const T& val) {
65  std::lock_guard<Mutex> g(m_);
66  if (maxSize_ > 0 && pq_.size() == maxSize_) {
67  return false;
68  }
69  DCHECK(maxSize_ == 0 || pq_.size() < maxSize_);
70  try {
71  pq_.push(val);
72  notempty_.notify_one();
73  return true;
74  } catch (const std::bad_alloc&) {
75  return false;
76  }
77  }
78 
79  bool try_pop(T& val) {
80  std::lock_guard<Mutex> g(m_);
81  if (!pq_.empty()) {
82  val = pq_.top();
83  pq_.pop();
84  notfull_.notify_one();
85  return true;
86  }
87  return false;
88  }
89 
90  bool try_peek(T& val) {
91  std::lock_guard<Mutex> g(m_);
92  if (!pq_.empty()) {
93  val = pq_.top();
94  return true;
95  }
96  return false;
97  }
98 
99  private:
100  Mutex m_;
101  size_t maxSize_;
102  PriorityQueue pq_;
103  std::condition_variable notempty_;
104  std::condition_variable notfull_;
105 };
106 
107 using FCPQ = folly::FlatCombiningPriorityQueue<int>;
109 
110 #if FOLLY_SANITIZE_THREAD
111 static std::vector<int> nthr = {1, 2, 3, 4, 6, 8, 12, 16};
112 #else
113 static std::vector<int> nthr = {1, 2, 3, 4, 6, 8, 12, 16, 24, 32, 48, 64};
114 #endif
116 
117 template <typename PriorityQueue, typename Func>
118 static uint64_t run_once(PriorityQueue& pq, const Func& fn) {
119  int ops = FLAGS_ops;
120  int size = FLAGS_size;
121  std::atomic<bool> start{false};
122  std::atomic<uint32_t> started{0};
123 
124  for (int i = 0; i < size; ++i) {
125  CHECK(pq.try_push(i * (ops / size)));
126  }
127 
128  std::vector<std::thread> threads(nthreads);
129  for (uint32_t tid = 0; tid < nthreads; ++tid) {
130  threads[tid] = std::thread([&, tid] {
131  started.fetch_add(1);
132  while (!start.load()) {
133  /* nothing */;
134  }
135  fn(tid);
136  });
137  }
138 
139  while (started.load() < nthreads) {
140  /* nothing */;
141  }
142  auto tbegin = std::chrono::steady_clock::now();
143 
144  // begin time measurement
145  start.store(true);
146 
147  for (auto& t : threads) {
148  t.join();
149  }
150 
151  // end time measurement
152  uint64_t duration = 0;
153  auto tend = std::chrono::steady_clock::now();
154  duration = std::chrono::duration_cast<std::chrono::nanoseconds>(tend - tbegin)
155  .count();
156  return duration;
157 }
158 
159 TEST(FCPriQueue, basic) {
160  FCPQ pq;
161  CHECK(pq.empty());
162  CHECK_EQ(pq.size(), 0);
163  int v;
164  CHECK(!pq.try_pop(v));
165  // try_pop() returns an Optional
166  EXPECT_FALSE(bool(pq.try_pop()));
167 
168  CHECK(pq.try_push(1));
169  CHECK(pq.try_push(2));
170  CHECK(!pq.empty());
171  CHECK_EQ(pq.size(), 2);
172 
173  pq.peek(v);
174  CHECK_EQ(v, 2); // higher value has higher priority
175  CHECK(pq.try_peek(v));
176  CHECK_EQ(v, 2);
177  CHECK(!pq.empty());
178  CHECK_EQ(pq.size(), 2);
179 
180  CHECK(pq.try_pop(v));
181  CHECK_EQ(v, 2);
182  CHECK(!pq.empty());
183  CHECK_EQ(pq.size(), 1);
184 
185  CHECK(pq.try_pop(v));
186  CHECK_EQ(v, 1);
187  CHECK(pq.empty());
188  CHECK_EQ(pq.size(), 0);
189 
190  CHECK(pq.try_push(1));
191  CHECK(pq.try_push(2));
192 
193  // check successful try_pop()
194  EXPECT_EQ(*pq.try_pop(), 2);
195  CHECK(!pq.empty());
196  CHECK_EQ(pq.size(), 1);
197 
198  EXPECT_EQ(*pq.try_pop(), 1);
199  CHECK(pq.empty());
200  CHECK_EQ(pq.size(), 0);
201 }
202 
203 TEST(FCPriQueue, bounded) {
204  FCPQ pq(1);
205  CHECK(pq.try_push(1));
206  CHECK(!pq.try_push(1));
207  CHECK_EQ(pq.size(), 1);
208  CHECK(!pq.empty());
209  int v;
210  CHECK(pq.try_pop(v));
211  CHECK_EQ(v, 1);
212  CHECK_EQ(pq.size(), 0);
213  CHECK(pq.empty());
214 }
215 
216 TEST(FCPriQueue, timeout) {
217  FCPQ pq(1);
218  int v;
219  CHECK(!pq.try_peek(v));
220  CHECK(!pq.try_pop(v));
221  pq.push(10);
222  CHECK(!pq.try_push(20));
223 
224  auto dur = std::chrono::microseconds(1000);
225  EXPECT_EQ(*pq.try_pop(), 10);
226  CHECK(pq.empty());
227  // check try_***_for
228  EXPECT_FALSE(bool(pq.try_pop_for(dur)));
229  EXPECT_FALSE(bool(pq.try_peek_for(dur)));
230  CHECK(pq.try_push_for(10, dur));
231  CHECK(!pq.try_push_for(20, dur));
232  EXPECT_EQ(*pq.try_peek_for(dur), 10);
233  EXPECT_EQ(*pq.try_pop_for(dur), 10);
234 
235  CHECK(pq.empty());
236  // check try_***_until
237  EXPECT_FALSE(bool(pq.try_pop_until(std::chrono::steady_clock::now() + dur)));
238 
239  EXPECT_FALSE(bool(pq.try_peek_until(std::chrono::steady_clock::now() + dur)));
240  CHECK(pq.try_push_until(10, std::chrono::steady_clock::now() + dur));
241  CHECK(!pq.try_push_until(20, std::chrono::steady_clock::now() + dur));
242  EXPECT_EQ(*pq.try_peek_until(std::chrono::steady_clock::now() + dur), 10);
243  EXPECT_EQ(*pq.try_pop_until(std::chrono::steady_clock::now() + dur), 10);
244  CHECK(pq.empty());
245 }
246 
247 TEST(FCPriQueue, push_pop) {
248  int ops = 1000;
249  int work = 0;
250  std::chrono::steady_clock::time_point when =
251  std::chrono::steady_clock::now() + std::chrono::hours(24);
252  for (auto n : nthr) {
253  nthreads = n;
254  FCPQ pq(10000);
255  auto fn = [&](uint32_t tid) {
256  for (int i = tid; i < ops; i += nthreads) {
257  CHECK(pq.try_push(i));
258  CHECK(pq.try_push_until(i, when));
259  pq.push(i);
260  doWork(work);
261  int v;
262  CHECK(pq.try_pop(v));
263  EXPECT_NE(pq.try_pop_until(when), folly::none);
264  pq.pop(v);
265  doWork(work);
266  }
267  };
268  run_once(pq, fn);
269  }
270 }
271 
272 enum Exp {
277 };
278 
279 static uint64_t test(std::string name, Exp exp, uint64_t base) {
280  int ops = FLAGS_ops;
281  int work = FLAGS_work;
282 
283  uint64_t min = UINTMAX_MAX;
284  uint64_t max = 0;
285  uint64_t sum = 0;
286 
287  for (int r = 0; r < FLAGS_reps; ++r) {
288  uint64_t dur;
289  switch (exp) {
290  case NoFC: {
291  Baseline pq;
292  auto fn = [&](uint32_t tid) {
293  for (int i = tid; i < ops; i += nthreads) {
294  CHECK(pq.try_push(i));
295  doWork(work);
296  int v;
297  CHECK(pq.try_pop(v));
298  doWork(work);
299  }
300  };
301  dur = run_once(pq, fn);
302  break;
303  }
304  case FCNonBlock: {
305  FCPQ pq;
306  auto fn = [&](uint32_t tid) {
307  for (int i = tid; i < ops; i += nthreads) {
308  CHECK(pq.try_push(i));
309  doWork(work);
310  int v;
311  CHECK(pq.try_pop(v));
312  doWork(work);
313  }
314  };
315  dur = run_once(pq, fn);
316  break;
317  }
318  case FCBlock: {
319  FCPQ pq;
320  auto fn = [&](uint32_t tid) {
321  for (int i = tid; i < ops; i += nthreads) {
322  pq.push(i);
323  doWork(work);
324  int v;
325  pq.pop(v);
326  doWork(work);
327  }
328  };
329  dur = run_once(pq, fn);
330  break;
331  }
332  case FCTimed: {
333  FCPQ pq;
334  auto fn = [&](uint32_t tid) {
335  std::chrono::steady_clock::time_point when =
336  std::chrono::steady_clock::now() + std::chrono::hours(24);
337  for (int i = tid; i < ops; i += nthreads) {
338  CHECK(pq.try_push_until(i, when));
339  doWork(work);
340  EXPECT_NE(pq.try_pop_until(when), folly::none);
341  doWork(work);
342  }
343  };
344  dur = run_once(pq, fn);
345  break;
346  }
347  default:
348  CHECK(false);
349  }
350 
351  sum += dur;
352  min = std::min(min, dur);
353  max = std::max(max, dur);
354  }
355 
356  uint64_t avg = sum / FLAGS_reps;
357  uint64_t res = min;
358  std::cout << name;
359  std::cout << " " << std::setw(4) << max / FLAGS_ops << " ns";
360  std::cout << " " << std::setw(4) << avg / FLAGS_ops << " ns";
361  std::cout << " " << std::setw(4) << res / FLAGS_ops << " ns";
362  if (base) {
363  std::cout << " " << std::setw(3) << 100 * base / res << "%";
364  }
365  std::cout << std::endl;
366  return res;
367 }
368 
369 TEST(FCPriQueue, bench) {
370  if (!FLAGS_bench) {
371  return;
372  }
373 
374  std::cout << "Test_name, Max time, Avg time, Min time, % base min / min"
375  << std::endl;
376  for (int i : nthr) {
377  nthreads = i;
378  std::cout << "\n------------------------------------ Number of threads = "
379  << i << std::endl;
380  uint64_t base = test("baseline ", NoFC, 0);
381  test("baseline - dup ", NoFC, base);
382  std::cout << "---- fc -------------------------------" << std::endl;
383  test("fc non-blocking ", FCNonBlock, base);
384  test("fc non-blocking - dup ", FCNonBlock, base);
385  test("fc timed ", FCTimed, base);
386  test("fc timed - dup ", FCTimed, base);
387  test("fc blocking ", FCBlock, base);
388  test("fc blocking - dup ", FCBlock, base);
389  }
390 }
391 
392 /*
393 $ numactl -N 1 folly/experimental/test/fc_pri_queue_test --bench
394 
395 [ RUN ] FCPriQueue.bench
396 Test_name, Max time, Avg time, Min time, % base min / min
397 
398 ------------------------------------ Number of threads = 1
399 baseline 815 ns 793 ns 789 ns
400 baseline - dup 886 ns 827 ns 789 ns 99%
401 ---- fc -------------------------------
402 fc non-blocking 881 ns 819 ns 789 ns 99%
403 fc non-blocking - dup 833 ns 801 ns 786 ns 100%
404 fc timed 863 ns 801 ns 781 ns 100%
405 fc timed - dup 830 ns 793 ns 782 ns 100%
406 fc blocking 1043 ns 820 ns 789 ns 99%
407 fc blocking - dup 801 ns 793 ns 789 ns 100%
408 
409 ------------------------------------ Number of threads = 2
410 baseline 579 ns 557 ns 540 ns
411 baseline - dup 905 ns 621 ns 538 ns 100%
412 ---- fc -------------------------------
413 fc non-blocking 824 ns 642 ns 568 ns 95%
414 fc non-blocking - dup 737 ns 645 ns 591 ns 91%
415 fc timed 654 ns 590 ns 542 ns 99%
416 fc timed - dup 666 ns 586 ns 534 ns 101%
417 fc blocking 622 ns 599 ns 575 ns 93%
418 fc blocking - dup 677 ns 618 ns 570 ns 94%
419 
420 ------------------------------------ Number of threads = 3
421 baseline 740 ns 717 ns 699 ns
422 baseline - dup 742 ns 716 ns 697 ns 100%
423 ---- fc -------------------------------
424 fc non-blocking 730 ns 689 ns 645 ns 108%
425 fc non-blocking - dup 719 ns 695 ns 639 ns 109%
426 fc timed 695 ns 650 ns 597 ns 117%
427 fc timed - dup 694 ns 654 ns 624 ns 112%
428 fc blocking 711 ns 687 ns 669 ns 104%
429 fc blocking - dup 716 ns 695 ns 624 ns 112%
430 
431 ------------------------------------ Number of threads = 4
432 baseline 777 ns 766 ns 750 ns
433 baseline - dup 778 ns 752 ns 731 ns 102%
434 ---- fc -------------------------------
435 fc non-blocking 653 ns 615 ns 589 ns 127%
436 fc non-blocking - dup 611 ns 593 ns 563 ns 133%
437 fc timed 597 ns 577 ns 569 ns 131%
438 fc timed - dup 618 ns 575 ns 546 ns 137%
439 fc blocking 603 ns 590 ns 552 ns 135%
440 fc blocking - dup 614 ns 590 ns 556 ns 134%
441 
442 ------------------------------------ Number of threads = 6
443 baseline 925 ns 900 ns 869 ns
444 baseline - dup 930 ns 895 ns 866 ns 100%
445 ---- fc -------------------------------
446 fc non-blocking 568 ns 530 ns 481 ns 180%
447 fc non-blocking - dup 557 ns 521 ns 488 ns 177%
448 fc timed 516 ns 496 ns 463 ns 187%
449 fc timed - dup 517 ns 500 ns 474 ns 183%
450 fc blocking 559 ns 513 ns 450 ns 193%
451 fc blocking - dup 564 ns 528 ns 466 ns 186%
452 
453 ------------------------------------ Number of threads = 8
454 baseline 999 ns 981 ns 962 ns
455 baseline - dup 998 ns 984 ns 965 ns 99%
456 ---- fc -------------------------------
457 fc non-blocking 491 ns 386 ns 317 ns 303%
458 fc non-blocking - dup 433 ns 344 ns 298 ns 322%
459 fc timed 445 ns 348 ns 294 ns 327%
460 fc timed - dup 446 ns 357 ns 292 ns 328%
461 fc blocking 505 ns 389 ns 318 ns 302%
462 fc blocking - dup 416 ns 333 ns 293 ns 328%
463 
464 ------------------------------------ Number of threads = 12
465 baseline 1092 ns 1080 ns 1072 ns
466 baseline - dup 1085 ns 1074 ns 1065 ns 100%
467 ---- fc -------------------------------
468 fc non-blocking 360 ns 283 ns 258 ns 415%
469 fc non-blocking - dup 340 ns 278 ns 250 ns 427%
470 fc timed 271 ns 260 ns 249 ns 429%
471 fc timed - dup 397 ns 283 ns 253 ns 423%
472 fc blocking 331 ns 279 ns 258 ns 415%
473 fc blocking - dup 358 ns 280 ns 259 ns 412%
474 
475 ------------------------------------ Number of threads = 16
476 baseline 1120 ns 1115 ns 1103 ns
477 baseline - dup 1122 ns 1118 ns 1114 ns 99%
478 ---- fc -------------------------------
479 fc non-blocking 339 ns 297 ns 246 ns 448%
480 fc non-blocking - dup 353 ns 301 ns 264 ns 417%
481 fc timed 326 ns 287 ns 247 ns 445%
482 fc timed - dup 338 ns 294 ns 259 ns 425%
483 fc blocking 329 ns 288 ns 247 ns 445%
484 fc blocking - dup 375 ns 308 ns 265 ns 415%
485 
486 ------------------------------------ Number of threads = 24
487 baseline 1073 ns 1068 ns 1064 ns
488 baseline - dup 1075 ns 1071 ns 1069 ns 99%
489 ---- fc -------------------------------
490 fc non-blocking 439 ns 342 ns 278 ns 382%
491 fc non-blocking - dup 389 ns 318 ns 291 ns 364%
492 fc timed 368 ns 324 ns 266 ns 398%
493 fc timed - dup 412 ns 328 ns 302 ns 352%
494 fc blocking 425 ns 345 ns 275 ns 386%
495 fc blocking - dup 429 ns 340 ns 269 ns 395%
496 
497 ------------------------------------ Number of threads = 32
498 baseline 1001 ns 990 ns 981 ns
499 baseline - dup 1002 ns 992 ns 983 ns 99%
500 ---- fc -------------------------------
501 fc non-blocking 404 ns 342 ns 273 ns 359%
502 fc non-blocking - dup 395 ns 316 ns 259 ns 378%
503 fc timed 379 ns 330 ns 258 ns 380%
504 fc timed - dup 392 ns 335 ns 274 ns 357%
505 fc blocking 423 ns 340 ns 277 ns 353%
506 fc blocking - dup 445 ns 359 ns 275 ns 356%
507 
508 ------------------------------------ Number of threads = 48
509 baseline 978 ns 975 ns 971 ns
510 baseline - dup 977 ns 974 ns 972 ns 99%
511 ---- fc -------------------------------
512 fc non-blocking 424 ns 327 ns 258 ns 375%
513 fc non-blocking - dup 378 ns 317 ns 256 ns 379%
514 fc timed 368 ns 311 ns 277 ns 350%
515 fc timed - dup 385 ns 310 ns 251 ns 385%
516 fc blocking 422 ns 313 ns 255 ns 380%
517 fc blocking - dup 406 ns 314 ns 258 ns 376%
518 
519 ------------------------------------ Number of threads = 64
520 baseline 993 ns 981 ns 974 ns
521 baseline - dup 984 ns 979 ns 975 ns 99%
522 ---- fc -------------------------------
523 fc non-blocking 353 ns 301 ns 266 ns 365%
524 fc non-blocking - dup 339 ns 301 ns 271 ns 358%
525 fc timed 399 ns 321 ns 259 ns 375%
526 fc timed - dup 381 ns 300 ns 263 ns 369%
527 fc blocking 390 ns 301 ns 251 ns 387%
528 fc blocking - dup 345 ns 289 ns 259 ns 374%
529 [ OK ] FCPriQueue.bench (112424 ms)
530 
531 $ lscpu
532 Architecture: x86_64
533 CPU op-mode(s): 32-bit, 64-bit
534 Byte Order: Little Endian
535 CPU(s): 32
536 On-line CPU(s) list: 0-31
537 Thread(s) per core: 2
538 Core(s) per socket: 8
539 Socket(s): 2
540 NUMA node(s): 2
541 Vendor ID: GenuineIntel
542 CPU family: 6
543 Model: 45
544 Model name: Intel(R) Xeon(R) CPU E5-2660 0 @ 2.20GHz
545 Stepping: 6
546 CPU MHz: 2200.000
547 CPU max MHz: 2200.0000
548 CPU min MHz: 1200.0000
549 BogoMIPS: 4399.87
550 Virtualization: VT-x
551 L1d cache: 32K
552 L1i cache: 32K
553 L2 cache: 256K
554 L3 cache: 20480K
555 NUMA node0 CPU(s): 0-7,16-23
556 NUMA node1 CPU(s): 8-15,24-31
557 Flags: fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca
558  cmov pat pse36 clflush dts acpi mmx fxsr sse sse2 ss ht
559  tm pbe syscall nx pdpe1gb rdtscp lm constant_tsc
560  arch_perfmon pebs bts rep_good nopl xtopology nonstop_tsc
561  aperfmperf eagerfpu pni pclmulqdq dtes64 monitor ds_cpl
562  vmx smx est tm2 ssse3 cx16 xtpr pdcm pcid dca sse4_1
563  sse4_2 x2apic popcnt tsc_deadline_timer aes xsave avx
564  lahf_lm epb tpr_shadow vnmi flexpriority ept vpid
565  xsaveopt dtherm arat pln pts
566 
567  */
#define T(v)
Definition: http_parser.c:233
DEFINE_int32(reps, 10,"number of reps")
std::atomic< int64_t > sum(0)
auto v
static uint64_t run_once(PriorityQueue &pq, const Func &fn)
LogLevel max
Definition: LogLevel.cpp:31
#define EXPECT_EQ(val1, val2)
Definition: gtest.h:1922
std::chrono::steady_clock::time_point now()
#define Mutex
STL namespace.
double val
Definition: String.cpp:273
TEST(FCPriQueue, basic)
static void basic()
std::condition_variable notempty_
static std::vector< int > nthr
static uint32_t nthreads
std::condition_variable notfull_
void doWork(int work)
std::vector< std::thread::id > threads
const char * name
Definition: http_parser.c:437
constexpr auto size(C const &c) -> decltype(c.size())
Definition: Access.h:45
Function< void()> Func
Definition: Executor.h:27
LogLevel min
Definition: LogLevel.cpp:30
folly::FlatCombiningPriorityQueue< int > FCPQ
BaselinePQ(size_t maxSize=0, PQArgs...args)
char a
const int ops
auto start
int * count
Future< Unit > when(bool p, F &&thunk)
Definition: Future-inl.h:2330
std::mutex mutex
const char * string
Definition: Conv.cpp:212
g_t g(f_t)
#define EXPECT_NE(val1, val2)
Definition: gtest.h:1926
static uint64_t test(std::string name, Exp exp, uint64_t base)
DEFINE_bool(bench, false,"run benchmark")
#define EXPECT_FALSE(condition)
Definition: gtest.h:1862
uint64_t bench(const int nprod, const int ncons, const std::string &name)
constexpr None none
Definition: Optional.h:87
auto doNotOptimizeAway(const T &datum) -> typename std::enable_if< !detail::DoNotOptimizeAwayNeedsIndirect< T >::value >::type
Definition: Benchmark.h:258
bool try_push(const T &val)