proxygen
MPMCQueueTest.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2013-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <folly/MPMCQueue.h>
18 #include <folly/Format.h>
19 #include <folly/Memory.h>
24 #include <folly/stop_watch.h>
26 
27 #include <boost/intrusive_ptr.hpp>
28 #include <boost/thread/barrier.hpp>
29 #include <functional>
30 #include <memory>
31 #include <thread>
32 #include <utility>
33 
34 FOLLY_ASSUME_FBVECTOR_COMPATIBLE_1(boost::intrusive_ptr)
35 
36 using namespace folly;
37 using namespace detail;
38 using namespace test;
39 using std::string;
40 using std::unique_ptr;
41 using std::vector;
42 using std::chrono::milliseconds;
43 using std::chrono::seconds;
44 using std::chrono::steady_clock;
45 using std::chrono::time_point;
46 
47 typedef DeterministicSchedule DSched;
48 
49 template <template <typename> class Atom>
51  int numThreads,
52  int numOps,
53  uint32_t init,
55  Atom<uint32_t>& spinThreshold,
56  int& prev,
57  int i) {
58  for (int op = i; op < numOps; op += numThreads) {
59  seq.waitForTurn(init + op, spinThreshold, (op % 32) == 0);
60  EXPECT_EQ(prev, op - 1);
61  prev = op;
62  seq.completeTurn(init + op);
63  }
64 }
65 
66 template <template <typename> class Atom>
67 void run_mt_sequencer_test(int numThreads, int numOps, uint32_t init) {
69  Atom<uint32_t> spinThreshold(0);
70 
71  int prev = -1;
72  vector<std::thread> threads(numThreads);
73  for (int i = 0; i < numThreads; ++i) {
74  threads[i] = DSched::thread(std::bind(
75  run_mt_sequencer_thread<Atom>,
76  numThreads,
77  numOps,
78  init,
79  std::ref(seq),
80  std::ref(spinThreshold),
81  std::ref(prev),
82  i));
83  }
84 
85  for (auto& thr : threads) {
86  DSched::join(thr);
87  }
88 
89  EXPECT_EQ(prev, numOps - 1);
90 }
91 
92 TEST(MPMCQueue, sequencer) {
93  run_mt_sequencer_test<std::atomic>(1, 100, 0);
94  run_mt_sequencer_test<std::atomic>(2, 100000, -100);
95  run_mt_sequencer_test<std::atomic>(100, 10000, -100);
96 }
97 
98 TEST(MPMCQueue, sequencer_emulated_futex) {
99  run_mt_sequencer_test<EmulatedFutexAtomic>(1, 100, 0);
100  run_mt_sequencer_test<EmulatedFutexAtomic>(2, 100000, -100);
101  run_mt_sequencer_test<EmulatedFutexAtomic>(100, 10000, -100);
102 }
103 
104 TEST(MPMCQueue, sequencer_deterministic) {
105  DSched sched(DSched::uniform(0));
106  run_mt_sequencer_test<DeterministicAtomic>(1, 100, -50);
107  run_mt_sequencer_test<DeterministicAtomic>(2, 10000, (1 << 29) - 100);
108  run_mt_sequencer_test<DeterministicAtomic>(10, 1000, -100);
109 }
110 
111 template <bool Dynamic = false, typename T>
112 void runElementTypeTest(T&& src) {
114  cq.blockingWrite(std::forward<T>(src));
115  T dest;
116  cq.blockingRead(dest);
117  EXPECT_TRUE(cq.write(std::move(dest)));
118  EXPECT_TRUE(cq.read(dest));
119  auto soon1 = std::chrono::system_clock::now() + std::chrono::seconds(1);
120  EXPECT_TRUE(cq.tryWriteUntil(soon1, std::move(dest)));
121  EXPECT_TRUE(cq.read(dest));
122  auto soon2 = std::chrono::steady_clock::now() + std::chrono::seconds(1);
123  EXPECT_TRUE(cq.tryWriteUntil(soon2, std::move(dest)));
124  EXPECT_TRUE(cq.read(dest));
125 }
126 
127 struct RefCounted {
128  static FOLLY_TLS int active_instances;
129 
130  mutable std::atomic<int> rc;
131 
132  RefCounted() : rc(0) {
134  }
135 
138  }
139 };
140 FOLLY_TLS int RefCounted::active_instances;
141 
143  p->rc++;
144 }
145 
147  if (--(p->rc) == 0) {
148  delete p;
149  }
150 }
151 
152 TEST(MPMCQueue, lots_of_element_types) {
153  runElementTypeTest(10);
154  runElementTypeTest(string("abc"));
155  runElementTypeTest(std::make_pair(10, string("def")));
156  runElementTypeTest(vector<string>{{"abc"}});
157  runElementTypeTest(std::make_shared<char>('a'));
158  runElementTypeTest(std::make_unique<char>('a'));
159  runElementTypeTest(boost::intrusive_ptr<RefCounted>(new RefCounted));
161 }
162 
163 TEST(MPMCQueue, lots_of_element_types_dynamic) {
164  runElementTypeTest<true>(10);
165  runElementTypeTest<true>(string("abc"));
166  runElementTypeTest<true>(std::make_pair(10, string("def")));
167  runElementTypeTest<true>(vector<string>{{"abc"}});
168  runElementTypeTest<true>(std::make_shared<char>('a'));
169  runElementTypeTest<true>(std::make_unique<char>('a'));
170  runElementTypeTest<true>(boost::intrusive_ptr<RefCounted>(new RefCounted));
172 }
173 
174 TEST(MPMCQueue, single_thread_enqdeq) {
175  // Non-dynamic version only.
176  // False positive for dynamic version. Capacity can be temporarily
177  // higher than specified.
178  MPMCQueue<int> cq(10);
179 
180  for (int pass = 0; pass < 10; ++pass) {
181  for (int i = 0; i < 10; ++i) {
182  EXPECT_TRUE(cq.write(i));
183  }
184  EXPECT_FALSE(cq.write(-1));
185  EXPECT_FALSE(cq.isEmpty());
186  EXPECT_EQ(cq.size(), 10);
187 
188  for (int i = 0; i < 5; ++i) {
189  int dest = -1;
190  EXPECT_TRUE(cq.read(dest));
191  EXPECT_EQ(dest, i);
192  }
193  for (int i = 5; i < 10; ++i) {
194  int dest = -1;
195  cq.blockingRead(dest);
196  EXPECT_EQ(dest, i);
197  }
198  int dest = -1;
199  EXPECT_FALSE(cq.read(dest));
200  EXPECT_EQ(dest, -1);
201 
202  EXPECT_TRUE(cq.isEmpty());
203  EXPECT_EQ(cq.size(), 0);
204  }
205 }
206 
207 TEST(MPMCQueue, tryenq_capacity_test) {
208  // Non-dynamic version only.
209  // False positive for dynamic version. Capacity can be temporarily
210  // higher than specified.
211  for (size_t cap = 1; cap < 100; ++cap) {
212  MPMCQueue<int> cq(cap);
213  for (size_t i = 0; i < cap; ++i) {
214  EXPECT_TRUE(cq.write(i));
215  }
216  EXPECT_FALSE(cq.write(100));
217  }
218 }
219 
220 TEST(MPMCQueue, enq_capacity_test) {
221  // Non-dynamic version only.
222  // False positive for dynamic version. Capacity can be temporarily
223  // higher than specified.
224  for (auto cap : {1, 100, 10000}) {
225  MPMCQueue<int> cq(cap);
226  for (int i = 0; i < cap; ++i) {
227  cq.blockingWrite(i);
228  }
229  int t = 0;
230  int when;
231  auto thr = std::thread([&] {
232  cq.blockingWrite(100);
233  when = t;
234  });
235  usleep(2000);
236  t = 1;
237  int dummy;
238  cq.blockingRead(dummy);
239  thr.join();
240  EXPECT_EQ(when, 1);
241  }
242 }
243 
244 template <template <typename> class Atom, bool Dynamic = false>
246  int numThreads,
247  int n, /*numOps*/
249  std::atomic<uint64_t>& sum,
250  int t) {
251  uint64_t threadSum = 0;
252  int src = t;
253  // received doesn't reflect any actual values, we just start with
254  // t and increment by numThreads to get the rounding of termination
255  // correct if numThreads doesn't evenly divide numOps
256  int received = t;
257  while (src < n || received < n) {
258  if (src < n && cq.write(src)) {
259  src += numThreads;
260  }
261 
262  int dst;
263  if (received < n && cq.read(dst)) {
264  received += numThreads;
265  threadSum += dst;
266  }
267  }
268  sum += threadSum;
269 }
270 
271 template <template <typename> class Atom, bool Dynamic = false>
272 void runTryEnqDeqTest(int numThreads, int numOps) {
273  // write and read aren't linearizable, so we don't have
274  // hard guarantees on their individual behavior. We can still test
275  // correctness in aggregate
276  MPMCQueue<int, Atom, Dynamic> cq(numThreads);
277 
278  uint64_t n = numOps;
279  vector<std::thread> threads(numThreads);
280  std::atomic<uint64_t> sum(0);
281  for (int t = 0; t < numThreads; ++t) {
282  threads[t] = DSched::thread(std::bind(
283  runTryEnqDeqThread<Atom, Dynamic>,
284  numThreads,
285  n,
286  std::ref(cq),
287  std::ref(sum),
288  t));
289  }
290  for (auto& t : threads) {
291  DSched::join(t);
292  }
293  EXPECT_TRUE(cq.isEmpty());
294  EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
295 }
296 
297 TEST(MPMCQueue, mt_try_enq_deq) {
298  int nts[] = {1, 3, 100};
299 
300  int n = 100000;
301  for (int nt : nts) {
302  runTryEnqDeqTest<std::atomic>(nt, n);
303  }
304 }
305 
306 TEST(MPMCQueue, mt_try_enq_deq_dynamic) {
307  int nts[] = {1, 3, 100};
308 
309  int n = 100000;
310  for (int nt : nts) {
311  runTryEnqDeqTest<std::atomic, /* Dynamic = */ true>(nt, n);
312  }
313 }
314 
315 TEST(MPMCQueue, mt_try_enq_deq_emulated_futex) {
316  int nts[] = {1, 3, 100};
317 
318  int n = 100000;
319  for (int nt : nts) {
320  runTryEnqDeqTest<EmulatedFutexAtomic>(nt, n);
321  }
322 }
323 
324 TEST(MPMCQueue, mt_try_enq_deq_emulated_futex_dynamic) {
325  int nts[] = {1, 3, 100};
326 
327  int n = 100000;
328  for (int nt : nts) {
329  runTryEnqDeqTest<EmulatedFutexAtomic, /* Dynamic = */ true>(nt, n);
330  }
331 }
332 
333 TEST(MPMCQueue, mt_try_enq_deq_deterministic) {
334  int nts[] = {3, 10};
335 
336  long seed = 0;
337  LOG(INFO) << "using seed " << seed;
338 
339  int n = 1000;
340  for (int nt : nts) {
341  {
342  DSched sched(DSched::uniform(seed));
343  runTryEnqDeqTest<DeterministicAtomic>(nt, n);
344  }
345  {
346  DSched sched(DSched::uniformSubset(seed, 2));
347  runTryEnqDeqTest<DeterministicAtomic>(nt, n);
348  }
349  {
350  DSched sched(DSched::uniform(seed));
351  runTryEnqDeqTest<DeterministicAtomic, /*Dynamic = */ true>(nt, n);
352  }
353  {
354  DSched sched(DSched::uniformSubset(seed, 2));
355  runTryEnqDeqTest<DeterministicAtomic, /*Dynamic = */ true>(nt, n);
356  }
357  }
358 }
359 
361  timeval tv;
362  gettimeofday(&tv, nullptr);
363  return static_cast<uint64_t>(tv.tv_sec) * 1000000 + tv.tv_usec;
364 }
365 
366 template <typename Q>
369  virtual ~WriteMethodCaller() = default;
370  virtual bool callWrite(Q& q, int i) = 0;
371  virtual string methodName() = 0;
372 };
373 
374 template <typename Q>
376  bool callWrite(Q& q, int i) override {
377  q.blockingWrite(i);
378  return true;
379  }
380  string methodName() override {
381  return "blockingWrite";
382  }
383 };
384 
385 template <typename Q>
387  bool callWrite(Q& q, int i) override {
388  return q.writeIfNotFull(i);
389  }
390  string methodName() override {
391  return "writeIfNotFull";
392  }
393 };
394 
395 template <typename Q>
396 struct WriteCaller : public WriteMethodCaller<Q> {
397  bool callWrite(Q& q, int i) override {
398  return q.write(i);
399  }
400  string methodName() override {
401  return "write";
402  }
403 };
404 
405 template <
406  typename Q,
407  class Clock = steady_clock,
408  class Duration = typename Clock::duration>
411  explicit TryWriteUntilCaller(Duration&& duration) : duration_(duration) {}
412  bool callWrite(Q& q, int i) override {
413  auto then = Clock::now() + duration_;
414  return q.tryWriteUntil(then, i);
415  }
416  string methodName() override {
417  return folly::sformat(
418  "tryWriteUntil({}ms)",
419  std::chrono::duration_cast<milliseconds>(duration_).count());
420  }
421 };
422 
423 template <typename Q>
425  Q&& queue,
426  string qName,
427  int numProducers,
428  int numConsumers,
429  int numOps,
430  WriteMethodCaller<Q>& writer,
431  bool ignoreContents = false) {
432  Q& q = queue;
433 
434  struct rusage beginUsage;
435  getrusage(RUSAGE_SELF, &beginUsage);
436 
437  auto beginMicro = nowMicro();
438 
439  uint64_t n = numOps;
440  std::atomic<uint64_t> sum(0);
441  std::atomic<uint64_t> failed(0);
442 
443  vector<std::thread> producers(numProducers);
444  for (int t = 0; t < numProducers; ++t) {
445  producers[t] = DSched::thread([&, t] {
446  for (int i = t; i < numOps; i += numProducers) {
447  while (!writer.callWrite(q, i)) {
448  ++failed;
449  }
450  }
451  });
452  }
453 
454  vector<std::thread> consumers(numConsumers);
455  for (int t = 0; t < numConsumers; ++t) {
456  consumers[t] = DSched::thread([&, t] {
457  uint64_t localSum = 0;
458  for (int i = t; i < numOps; i += numConsumers) {
459  int dest = -1;
460  q.blockingRead(dest);
461  EXPECT_FALSE(dest == -1);
462  localSum += dest;
463  }
464  sum += localSum;
465  });
466  }
467 
468  for (auto& t : producers) {
469  DSched::join(t);
470  }
471  for (auto& t : consumers) {
472  DSched::join(t);
473  }
474  if (!ignoreContents) {
475  EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
476  }
477 
478  auto endMicro = nowMicro();
479 
480  struct rusage endUsage;
481  getrusage(RUSAGE_SELF, &endUsage);
482 
483  uint64_t nanosPer = (1000 * (endMicro - beginMicro)) / n;
484  long csw = endUsage.ru_nvcsw + endUsage.ru_nivcsw -
485  (beginUsage.ru_nvcsw + beginUsage.ru_nivcsw);
486  uint64_t failures = failed;
487  size_t allocated = q.allocatedCapacity();
488 
489  return folly::sformat(
490  "{}, {} {} producers, {} consumers => {} nanos/handoff, {} csw / {} "
491  "handoff, {} failures, {} allocated",
492  qName,
493  numProducers,
494  writer.methodName(),
495  numConsumers,
496  nanosPer,
497  csw,
498  n,
499  failures,
500  allocated);
501 }
502 
503 template <bool Dynamic = false>
505  // we use the Bench method, but perf results are meaningless under DSched
506  DSched sched(DSched::uniform(seed));
507 
509 
510  vector<unique_ptr<WriteMethodCaller<QueueType>>> callers;
511  callers.emplace_back(std::make_unique<BlockingWriteCaller<QueueType>>());
512  callers.emplace_back(std::make_unique<WriteIfNotFullCaller<QueueType>>());
513  callers.emplace_back(std::make_unique<WriteCaller<QueueType>>());
514  callers.emplace_back(
516  callers.emplace_back(
518  size_t cap;
519 
520  for (const auto& caller : callers) {
521  cap = 10;
522  LOG(INFO) << producerConsumerBench(
524  "MPMCQueue<int, DeterministicAtomic, Dynamic>(" +
525  folly::to<std::string>(cap) + ")",
526  1,
527  1,
528  1000,
529  *caller);
530  cap = 100;
531  LOG(INFO) << producerConsumerBench(
533  "MPMCQueue<int, DeterministicAtomic, Dynamic>(" +
534  folly::to<std::string>(cap) + ")",
535  10,
536  10,
537  1000,
538  *caller);
539  cap = 10;
540  LOG(INFO) << producerConsumerBench(
542  "MPMCQueue<int, DeterministicAtomic, Dynamic>(" +
543  folly::to<std::string>(cap) + ")",
544  1,
545  1,
546  1000,
547  *caller);
548  cap = 100;
549  LOG(INFO) << producerConsumerBench(
551  "MPMCQueue<int, DeterministicAtomic, Dynamic>(" +
552  folly::to<std::string>(cap) + ")",
553  10,
554  10,
555  1000,
556  *caller);
557  cap = 1;
558  LOG(INFO) << producerConsumerBench(
560  "MPMCQueue<int, DeterministicAtomic, Dynamic>(" +
561  folly::to<std::string>(cap) + ")",
562  10,
563  10,
564  1000,
565  *caller);
566  }
567 }
568 
570  long seed,
571  uint32_t prods,
572  uint32_t cons,
573  uint32_t numOps,
574  size_t cap,
575  size_t minCap,
576  size_t mult) {
577  // we use the Bench method, but perf results are meaningless under DSched
578  DSched sched(DSched::uniform(seed));
579 
581 
582  vector<unique_ptr<WriteMethodCaller<QueueType>>> callers;
583  callers.emplace_back(std::make_unique<BlockingWriteCaller<QueueType>>());
584  callers.emplace_back(std::make_unique<WriteIfNotFullCaller<QueueType>>());
585  callers.emplace_back(std::make_unique<WriteCaller<QueueType>>());
586  callers.emplace_back(
588  callers.emplace_back(
590 
591  for (const auto& caller : callers) {
592  LOG(INFO) << producerConsumerBench(
594  "MPMCQueue<int, DeterministicAtomic, true>(" +
595  folly::to<std::string>(cap) + ", " +
596  folly::to<std::string>(minCap) + ", " +
597  folly::to<std::string>(mult) + ")",
598  prods,
599  cons,
600  numOps,
601  *caller);
602  }
603 }
604 
605 TEST(MPMCQueue, mt_prod_cons_deterministic) {
607 }
608 
609 TEST(MPMCQueue, mt_prod_cons_deterministic_dynamic) {
610  runMtProdConsDeterministic<true>(0);
611 }
612 
613 template <typename T>
614 void setFromEnv(T& var, const char* envvar) {
615  char* str = std::getenv(envvar);
616  if (str) {
617  var = atoi(str);
618  }
619 }
620 
621 TEST(MPMCQueue, mt_prod_cons_deterministic_dynamic_with_arguments) {
622  long seed = 0;
623  uint32_t prods = 10;
624  uint32_t cons = 10;
625  uint32_t numOps = 1000;
626  size_t cap = 10000;
627  size_t minCap = 9;
628  size_t mult = 3;
629  setFromEnv(seed, "SEED");
630  setFromEnv(prods, "PRODS");
631  setFromEnv(cons, "CONS");
632  setFromEnv(numOps, "NUM_OPS");
633  setFromEnv(cap, "CAP");
634  setFromEnv(minCap, "MIN_CAP");
635  setFromEnv(mult, "MULT");
637  seed, prods, cons, numOps, cap, minCap, mult);
638 }
639 
640 #define PC_BENCH(q, np, nc, ...) \
641  producerConsumerBench(q, #q, (np), (nc), __VA_ARGS__)
642 
643 template <bool Dynamic = false>
645  using QueueType = MPMCQueue<int, std::atomic, Dynamic>;
646 
647  int n = 100000;
648  setFromEnv(n, "NUM_OPS");
649  vector<unique_ptr<WriteMethodCaller<QueueType>>> callers;
650  callers.emplace_back(std::make_unique<BlockingWriteCaller<QueueType>>());
651  callers.emplace_back(std::make_unique<WriteIfNotFullCaller<QueueType>>());
652  callers.emplace_back(std::make_unique<WriteCaller<QueueType>>());
653  callers.emplace_back(
655  callers.emplace_back(
657  for (const auto& caller : callers) {
658  LOG(INFO) << PC_BENCH((QueueType(10)), 1, 1, n, *caller);
659  LOG(INFO) << PC_BENCH((QueueType(10)), 10, 1, n, *caller);
660  LOG(INFO) << PC_BENCH((QueueType(10)), 1, 10, n, *caller);
661  LOG(INFO) << PC_BENCH((QueueType(10)), 10, 10, n, *caller);
662  LOG(INFO) << PC_BENCH((QueueType(10000)), 1, 1, n, *caller);
663  LOG(INFO) << PC_BENCH((QueueType(10000)), 10, 1, n, *caller);
664  LOG(INFO) << PC_BENCH((QueueType(10000)), 1, 10, n, *caller);
665  LOG(INFO) << PC_BENCH((QueueType(10000)), 10, 10, n, *caller);
666  LOG(INFO) << PC_BENCH((QueueType(100000)), 32, 100, n, *caller);
667  }
668 }
669 
670 TEST(MPMCQueue, mt_prod_cons) {
671  runMtProdCons();
672 }
673 
674 TEST(MPMCQueue, mt_prod_cons_dynamic) {
675  runMtProdCons</* Dynamic = */ true>();
676 }
677 
678 template <bool Dynamic = false>
681 
682  int n = 100000;
683  vector<unique_ptr<WriteMethodCaller<QueueType>>> callers;
684  callers.emplace_back(std::make_unique<BlockingWriteCaller<QueueType>>());
685  callers.emplace_back(std::make_unique<WriteIfNotFullCaller<QueueType>>());
686  callers.emplace_back(std::make_unique<WriteCaller<QueueType>>());
687  callers.emplace_back(
689  callers.emplace_back(
691  for (const auto& caller : callers) {
692  LOG(INFO) << PC_BENCH((QueueType(10)), 1, 1, n, *caller);
693  LOG(INFO) << PC_BENCH((QueueType(10)), 10, 1, n, *caller);
694  LOG(INFO) << PC_BENCH((QueueType(10)), 1, 10, n, *caller);
695  LOG(INFO) << PC_BENCH((QueueType(10)), 10, 10, n, *caller);
696  LOG(INFO) << PC_BENCH((QueueType(10000)), 1, 1, n, *caller);
697  LOG(INFO) << PC_BENCH((QueueType(10000)), 10, 1, n, *caller);
698  LOG(INFO) << PC_BENCH((QueueType(10000)), 1, 10, n, *caller);
699  LOG(INFO) << PC_BENCH((QueueType(10000)), 10, 10, n, *caller);
700  LOG(INFO) << PC_BENCH((QueueType(100000)), 32, 100, n, *caller);
701  }
702 }
703 
704 TEST(MPMCQueue, mt_prod_cons_emulated_futex) {
706 }
707 
708 TEST(MPMCQueue, mt_prod_cons_emulated_futex_dynamic) {
709  runMtProdConsEmulatedFutex</* Dynamic = */ true>();
710 }
711 
712 template <template <typename> class Atom, bool Dynamic = false>
714  int numThreads,
715  int n, /*numOps*/
717  std::atomic<uint64_t>& sum,
718  int t) {
719  uint64_t threadSum = 0;
720  for (int i = t; i < n; i += numThreads) {
721  // enq + deq
722  EXPECT_TRUE(cq.writeIfNotFull(i));
723 
724  int dest = -1;
725  EXPECT_TRUE(cq.readIfNotEmpty(dest));
726  EXPECT_TRUE(dest >= 0);
727  threadSum += dest;
728  }
729  sum += threadSum;
730 }
731 
732 template <template <typename> class Atom, bool Dynamic = false>
733 uint64_t runNeverFailTest(int numThreads, int numOps) {
734  // always #enq >= #deq
735  MPMCQueue<int, Atom, Dynamic> cq(numThreads);
736 
737  uint64_t n = numOps;
738  auto beginMicro = nowMicro();
739 
740  vector<std::thread> threads(numThreads);
741  std::atomic<uint64_t> sum(0);
742  for (int t = 0; t < numThreads; ++t) {
743  threads[t] = DSched::thread(std::bind(
744  runNeverFailThread<Atom, Dynamic>,
745  numThreads,
746  n,
747  std::ref(cq),
748  std::ref(sum),
749  t));
750  }
751  for (auto& t : threads) {
752  DSched::join(t);
753  }
754  EXPECT_TRUE(cq.isEmpty());
755  EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
756 
757  return nowMicro() - beginMicro;
758 }
759 
760 template <template <typename> class Atom, bool Dynamic = false>
761 void runMtNeverFail(std::vector<int>& nts, int n) {
762  for (int nt : nts) {
763  uint64_t elapsed = runNeverFailTest<Atom, Dynamic>(nt, n);
764  LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
765  << " threads";
766  }
767 }
768 
769 // All the never_fail tests are for the non-dynamic version only.
770 // False positive for dynamic version. Some writeIfNotFull() and
771 // tryWriteUntil() operations may fail in transient conditions related
772 // to expansion.
773 
774 TEST(MPMCQueue, mt_never_fail) {
775  std::vector<int> nts{1, 3, 100};
776  int n = 100000;
777  runMtNeverFail<std::atomic>(nts, n);
778 }
779 
780 TEST(MPMCQueue, mt_never_fail_emulated_futex) {
781  std::vector<int> nts{1, 3, 100};
782  int n = 100000;
783  runMtNeverFail<EmulatedFutexAtomic>(nts, n);
784 }
785 
786 template <bool Dynamic = false>
787 void runMtNeverFailDeterministic(std::vector<int>& nts, int n, long seed) {
788  LOG(INFO) << "using seed " << seed;
789  for (int nt : nts) {
790  {
791  DSched sched(DSched::uniform(seed));
792  runNeverFailTest<DeterministicAtomic, Dynamic>(nt, n);
793  }
794  {
795  DSched sched(DSched::uniformSubset(seed, 2));
796  runNeverFailTest<DeterministicAtomic, Dynamic>(nt, n);
797  }
798  }
799 }
800 
801 TEST(MPMCQueue, mt_never_fail_deterministic) {
802  std::vector<int> nts{3, 10};
803  long seed = 0; // nowMicro() % 10000;
804  int n = 1000;
805  runMtNeverFailDeterministic(nts, n, seed);
806 }
807 
808 template <class Clock, template <typename> class Atom, bool Dynamic>
810  int numThreads,
811  int n, /*numOps*/
813  std::atomic<uint64_t>& sum,
814  int t) {
815  uint64_t threadSum = 0;
816  for (int i = t; i < n; i += numThreads) {
817  // enq + deq
818  auto soon = Clock::now() + std::chrono::seconds(1);
819  EXPECT_TRUE(cq.tryWriteUntil(soon, i));
820 
821  int dest = -1;
822  EXPECT_TRUE(cq.readIfNotEmpty(dest));
823  EXPECT_TRUE(dest >= 0);
824  threadSum += dest;
825  }
826  sum += threadSum;
827 }
828 
829 template <class Clock, template <typename> class Atom, bool Dynamic = false>
830 uint64_t runNeverFailTest(int numThreads, int numOps) {
831  // always #enq >= #deq
832  MPMCQueue<int, Atom, Dynamic> cq(numThreads);
833 
834  uint64_t n = numOps;
835  auto beginMicro = nowMicro();
836 
837  vector<std::thread> threads(numThreads);
838  std::atomic<uint64_t> sum(0);
839  for (int t = 0; t < numThreads; ++t) {
840  threads[t] = DSched::thread(std::bind(
841  runNeverFailUntilThread<Clock, Atom, Dynamic>,
842  numThreads,
843  n,
844  std::ref(cq),
845  std::ref(sum),
846  t));
847  }
848  for (auto& t : threads) {
849  DSched::join(t);
850  }
851  EXPECT_TRUE(cq.isEmpty());
852  EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
853 
854  return nowMicro() - beginMicro;
855 }
856 
857 template <bool Dynamic = false>
858 void runMtNeverFailUntilSystem(std::vector<int>& nts, int n) {
859  for (int nt : nts) {
860  uint64_t elapsed =
861  runNeverFailTest<std::chrono::system_clock, std::atomic, Dynamic>(
862  nt, n);
863  LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
864  << " threads";
865  }
866 }
867 
868 TEST(MPMCQueue, mt_never_fail_until_system) {
869  std::vector<int> nts{1, 3, 100};
870  int n = 100000;
872 }
873 
874 template <bool Dynamic = false>
875 void runMtNeverFailUntilSteady(std::vector<int>& nts, int n) {
876  for (int nt : nts) {
877  uint64_t elapsed =
878  runNeverFailTest<std::chrono::steady_clock, std::atomic, Dynamic>(
879  nt, n);
880  LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
881  << " threads";
882  }
883 }
884 
885 TEST(MPMCQueue, mt_never_fail_until_steady) {
886  std::vector<int> nts{1, 3, 100};
887  int n = 100000;
889 }
890 
892  NOTHING = -1,
901 };
902 
903 static FOLLY_TLS int lc_counts[MAX_LIFECYCLE_EVENT];
904 static FOLLY_TLS int lc_prev[MAX_LIFECYCLE_EVENT];
905 
906 static int lc_outstanding() {
910 }
911 
912 static void lc_snap() {
913  for (int i = 0; i < MAX_LIFECYCLE_EVENT; ++i) {
914  lc_prev[i] = lc_counts[i];
915  }
916 }
917 
918 #define LIFECYCLE_STEP(...) lc_step(__LINE__, __VA_ARGS__)
919 
920 static void lc_step(int lineno, int what = NOTHING, int what2 = NOTHING) {
921  for (int i = 0; i < MAX_LIFECYCLE_EVENT; ++i) {
922  int delta = i == what || i == what2 ? 1 : 0;
923  EXPECT_EQ(lc_counts[i] - lc_prev[i], delta)
924  << "lc_counts[" << i << "] - lc_prev[" << i << "] was "
925  << (lc_counts[i] - lc_prev[i]) << ", expected " << delta
926  << ", from line " << lineno;
927  }
928  lc_snap();
929 }
930 
931 template <typename R>
932 struct Lifecycle {
933  typedef R IsRelocatable;
934 
936 
937  Lifecycle() noexcept : constructed(true) {
939  }
940 
941  explicit Lifecycle(int /* n */, char const* /* s */) noexcept
942  : constructed(true) {
944  }
945 
946  Lifecycle(const Lifecycle& /* rhs */) noexcept : constructed(true) {
948  }
949 
950  Lifecycle(Lifecycle&& /* rhs */) noexcept : constructed(true) {
952  }
953 
954  Lifecycle& operator=(const Lifecycle& /* rhs */) noexcept {
956  return *this;
957  }
958 
961  return *this;
962  }
963 
966  assert(lc_outstanding() >= 0);
967  assert(constructed);
968  constructed = false;
969  }
970 };
971 
972 template <typename R>
974  lc_snap();
976 
977  {
978  // Non-dynamic only. False positive for dynamic.
979  MPMCQueue<Lifecycle<R>, std::atomic> queue(50);
981 
982  for (int pass = 0; pass < 10; ++pass) {
983  for (int i = 0; i < 10; ++i) {
984  queue.blockingWrite();
986 
987  queue.blockingWrite(1, "one");
989 
990  {
991  Lifecycle<R> src;
993  queue.blockingWrite(std::move(src));
995  }
997 
998  {
999  Lifecycle<R> src;
1001  queue.blockingWrite(src);
1003  }
1005 
1006  EXPECT_TRUE(queue.write());
1008  }
1009 
1010  EXPECT_EQ(queue.size(), 50);
1011  EXPECT_FALSE(queue.write(2, "two"));
1013 
1014  for (int i = 0; i < 50; ++i) {
1015  {
1016  Lifecycle<R> node;
1018 
1019  queue.blockingRead(node);
1020  if (R::value) {
1021  // relocatable, moved via memcpy
1023  } else {
1025  }
1026  }
1028  }
1029 
1030  EXPECT_EQ(queue.size(), 0);
1031  }
1032 
1033  // put one element back before destruction
1034  {
1035  Lifecycle<R> src(3, "three");
1037  queue.write(std::move(src));
1039  }
1040  LIFECYCLE_STEP(DESTRUCTOR); // destroy src
1041  }
1042  LIFECYCLE_STEP(DESTRUCTOR); // destroy queue
1043 
1044  EXPECT_EQ(lc_outstanding(), 0);
1045 }
1046 
1047 TEST(MPMCQueue, perfect_forwarding) {
1048  runPerfectForwardingTest<std::false_type>();
1049 }
1050 
1051 TEST(MPMCQueue, perfect_forwarding_relocatable) {
1052  runPerfectForwardingTest<std::true_type>();
1053 }
1054 
1055 template <bool Dynamic = false>
1057  lc_snap();
1058  EXPECT_EQ(lc_outstanding(), 0);
1059 
1060  {
1061  MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> a(50);
1063 
1064  a.blockingWrite();
1066 
1067  // move constructor
1068  MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> b =
1069  std::move(a);
1071  EXPECT_EQ(a.capacity(), 0);
1072  EXPECT_EQ(a.size(), 0);
1073  EXPECT_EQ(b.capacity(), 50);
1074  EXPECT_EQ(b.size(), 1);
1075 
1076  b.blockingWrite();
1078 
1079  // move operator
1080  MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> c;
1082  c = std::move(b);
1084  EXPECT_EQ(c.capacity(), 50);
1085  EXPECT_EQ(c.size(), 2);
1086 
1087  {
1090  c.blockingRead(dst);
1092 
1093  {
1094  // swap
1095  MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> d(10);
1097  std::swap(c, d);
1099  EXPECT_EQ(c.capacity(), 10);
1100  EXPECT_TRUE(c.isEmpty());
1101  EXPECT_EQ(d.capacity(), 50);
1102  EXPECT_EQ(d.size(), 1);
1103 
1104  d.blockingRead(dst);
1106 
1107  c.blockingWrite(dst);
1109 
1110  d.blockingWrite(std::move(dst));
1112  } // d goes out of scope
1114  } // dst goes out of scope
1116  } // c goes out of scope
1118 }
1119 
1120 TEST(MPMCQueue, queue_moving) {
1121  run_queue_moving();
1122 }
1123 
1124 TEST(MPMCQueue, queue_moving_dynamic) {
1125  run_queue_moving<true>();
1126 }
1127 
1128 TEST(MPMCQueue, explicit_zero_capacity_fail) {
1129  ASSERT_THROW(MPMCQueue<int> cq(0), std::invalid_argument);
1130 
1131  using DynamicMPMCQueueInt = MPMCQueue<int, std::atomic, true>;
1132  ASSERT_THROW(DynamicMPMCQueueInt cq(0), std::invalid_argument);
1133 }
1134 
1135 template <bool Dynamic>
1138 
1139  const auto wait = std::chrono::milliseconds(100);
1140  stop_watch<> watch;
1141  bool rets[2];
1142  int vals[2];
1143  std::vector<std::thread> threads;
1144  boost::barrier b{3};
1145  for (int i = 0; i < 2; i++) {
1146  threads.emplace_back([&, i] {
1147  b.wait();
1148  rets[i] = q.tryReadUntil(watch.getCheckpoint() + wait, vals[i]);
1149  });
1150  }
1151 
1152  b.wait();
1153  EXPECT_TRUE(q.write(42));
1154 
1155  for (int i = 0; i < 2; i++) {
1156  threads[i].join();
1157  }
1158 
1159  for (int i = 0; i < 2; i++) {
1160  int other = (i + 1) % 2;
1161  if (rets[i]) {
1162  EXPECT_EQ(42, vals[i]);
1163  EXPECT_FALSE(rets[other]);
1164  }
1165  }
1166 
1167  EXPECT_TRUE(watch.elapsed(wait));
1168 }
1169 
1170 template <bool Dynamic>
1173  EXPECT_TRUE(q.write(42));
1174 
1175  const auto wait = std::chrono::milliseconds(100);
1176  stop_watch<> watch;
1177  bool rets[2];
1178  std::vector<std::thread> threads;
1179  boost::barrier b{3};
1180  for (int i = 0; i < 2; i++) {
1181  threads.emplace_back([&, i] {
1182  b.wait();
1183  rets[i] = q.tryWriteUntil(watch.getCheckpoint() + wait, i);
1184  });
1185  }
1186 
1187  b.wait();
1188  int x;
1189  EXPECT_TRUE(q.read(x));
1190  EXPECT_EQ(42, x);
1191 
1192  for (int i = 0; i < 2; i++) {
1193  threads[i].join();
1194  }
1195  EXPECT_TRUE(q.read(x));
1196 
1197  for (int i = 0; i < 2; i++) {
1198  int other = (i + 1) % 2;
1199  if (rets[i]) {
1200  EXPECT_EQ(i, x);
1201  EXPECT_FALSE(rets[other]);
1202  }
1203  }
1204 
1205  EXPECT_TRUE(watch.elapsed(wait));
1206 }
1207 
1208 TEST(MPMCQueue, try_read_until) {
1209  testTryReadUntil<false>();
1210 }
1211 
1212 TEST(MPMCQueue, try_read_until_dynamic) {
1213  testTryReadUntil<true>();
1214 }
1215 
1216 TEST(MPMCQueue, try_write_until) {
1217  testTryWriteUntil<false>();
1218 }
1219 
1220 TEST(MPMCQueue, try_write_until_dynamic) {
1221  testTryWriteUntil<true>();
1222 }
1223 
1224 template <bool Dynamic>
1226  CHECK(q.write(1));
1227  /* The following must not block forever */
1228  q.tryWriteUntil(
1229  std::chrono::system_clock::now() + std::chrono::microseconds(10000), 2);
1230 }
1231 
1232 TEST(MPMCQueue, try_write_until_timeout) {
1234  testTimeout<false>(queue);
1235 }
1236 
1237 TEST(MPMCQueue, must_fail_try_write_until_dynamic) {
1239  testTimeout<true>(queue);
1240 }
void runElementTypeTest(T &&src)
chrono
Definition: CMakeCache.txt:563
Lifecycle & operator=(Lifecycle &&) noexcept
#define T(v)
Definition: http_parser.c:233
Lifecycle(Lifecycle &&) noexcept
void runMtNeverFail(std::vector< int > &nts, int n)
void runMtNeverFailUntilSystem(std::vector< int > &nts, int n)
std::atomic< int64_t > sum(0)
void runMtProdConsDeterministicDynamic(long seed, uint32_t prods, uint32_t cons, uint32_t numOps, size_t cap, size_t minCap, size_t mult)
void testTryReadUntil()
void testTryWriteUntil()
string methodName() override
char b
duration elapsed() const
Definition: stop_watch.h:168
std::string sformat(StringPiece fmt, Args &&...args)
Definition: Format.h:280
#define PC_BENCH(q, np, nc,...)
static FOLLY_TLS int lc_counts[MAX_LIFECYCLE_EVENT]
static const int seed
#define EXPECT_EQ(val1, val2)
Definition: gtest.h:1922
std::chrono::steady_clock::time_point now()
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
const int x
DeterministicAtomicImpl< T, DeterministicSchedule > DeterministicAtomic
static bool failed
dest
Definition: upload.py:394
STL namespace.
void runMtProdCons()
Gen seq(Value first, Value last)
Definition: Base.h:484
void runPerfectForwardingTest()
std::atomic< int > rc
static std::thread thread(Func &&func, Args &&...args)
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
bool callWrite(Q &q, int i) override
requires E e noexcept(noexcept(s.error(std::move(e))))
void init()
Lifecycle(const Lifecycle &) noexcept
TryWriteUntilCaller(Duration &&duration)
void setFromEnv(T &var, const char *envvar)
static void lc_step(int lineno, int what=NOTHING, int what2=NOTHING)
Lifecycle() noexcept
string methodName() override
#define LIFECYCLE_STEP(...)
std::vector< std::thread::id > threads
string methodName() override
static int lc_outstanding()
static std::function< size_t(size_t)> uniform(uint64_t seed)
std::chrono::milliseconds Duration
Definition: Types.h:36
Lifecycle(int, char const *) noexcept
void testTimeout(MPMCQueue< int, std::atomic, Dynamic > &q)
~Lifecycle() noexcept
virtual bool callWrite(Q &q, int i)=0
#define FOLLY_ASSUME_FBVECTOR_COMPATIBLE_1(...)
Definition: Traits.h:543
#define Atom
void dummy()
void runMtNeverFailUntilSteady(std::vector< int > &nts, int n)
void runMtProdConsEmulatedFutex()
bool wait(Waiter *waiter, bool shouldSleep, Waiter *&next)
char a
uint64_t runNeverFailTest(int numThreads, int numOps)
Lifecycle & operator=(const Lifecycle &) noexcept
uint64_t nowMicro()
static const char *const value
Definition: Conv.cpp:50
void run_mt_sequencer_test(int numThreads, int numOps, uint32_t init)
void runMtNeverFailDeterministic(std::vector< int > &nts, int n, long seed)
string methodName() override
std::enable_if<!std::is_array< T >::value, std::unique_ptr< T > >::type make_unique(Args &&...args)
Definition: Memory.h:259
void run_mt_sequencer_thread(int numThreads, int numOps, uint32_t init, TurnSequencer< Atom > &seq, Atom< uint32_t > &spinThreshold, int &prev, int i)
static FOLLY_TLS int lc_prev[MAX_LIFECYCLE_EVENT]
#define ASSERT_THROW(statement, expected_exception)
Definition: gtest.h:1849
virtual string methodName()=0
static std::function< size_t(size_t)> uniformSubset(uint64_t seed, size_t n=2, size_t m=64)
static void lc_snap()
void runTryEnqDeqThread(int numThreads, int n, MPMCQueue< int, Atom, Dynamic > &cq, std::atomic< uint64_t > &sum, int t)
static FOLLY_TLS int active_instances
int * count
Future< Unit > when(bool p, F &&thunk)
Definition: Future-inl.h:2330
#define EXPECT_TRUE(condition)
Definition: gtest.h:1859
string producerConsumerBench(Q &&queue, string qName, int numProducers, int numConsumers, int numOps, WriteMethodCaller< Q > &writer, bool ignoreContents=false)
void intrusive_ptr_release(RefCounted const *p)
const char * string
Definition: Conv.cpp:212
int bind(NetworkSocket s, const sockaddr *name, socklen_t namelen)
Definition: NetOps.cpp:76
const Duration duration_
TEST(MPMCQueue, sequencer)
void runNeverFailUntilThread(int numThreads, int n, MPMCQueue< int, Atom, Dynamic > &cq, std::atomic< uint64_t > &sum, int t)
bool callWrite(Q &q, int i) override
static unsigned long long allocated
bool callWrite(Q &q, int i) override
void intrusive_ptr_add_ref(RefCounted const *p)
void swap(SwapTrackingAlloc< T > &, SwapTrackingAlloc< T > &)
Definition: F14TestUtil.h:414
#define EXPECT_FALSE(condition)
Definition: gtest.h:1862
static void join(std::thread &child)
void runMtProdConsDeterministic(long seed)
clock_type::time_point getCheckpoint() const
Definition: stop_watch.h:257
char c
LifecycleEvent
void run_queue_moving()
bool callWrite(Q &q, int i) override
void runTryEnqDeqTest(int numThreads, int numOps)
void runNeverFailThread(int numThreads, int n, MPMCQueue< int, Atom, Dynamic > &cq, std::atomic< uint64_t > &sum, int t)