MPMCQueueTest.cpp File Reference
#include <folly/MPMCQueue.h>
#include <folly/Format.h>
#include <folly/Memory.h>
#include <folly/portability/GTest.h>
#include <folly/portability/SysResource.h>
#include <folly/portability/SysTime.h>
#include <folly/portability/Unistd.h>
#include <folly/stop_watch.h>
#include <folly/test/DeterministicSchedule.h>
#include <boost/intrusive_ptr.hpp>
#include <boost/thread/barrier.hpp>
#include <functional>
#include <memory>
#include <thread>
#include <utility>

Go to the source code of this file.


struct  RefCounted
struct  WriteMethodCaller< Q >
struct  BlockingWriteCaller< Q >
struct  WriteIfNotFullCaller< Q >
struct  WriteCaller< Q >
struct  TryWriteUntilCaller< Q, Clock, Duration >
struct  Lifecycle< R >


#define PC_BENCH(q, np, nc, ...)   producerConsumerBench(q, #q, (np), (nc), __VA_ARGS__)
#define LIFECYCLE_STEP(...)   lc_step(__LINE__, __VA_ARGS__)


typedef DeterministicSchedule DSched


enum  LifecycleEvent {


template<template< typename > class Atom>
void run_mt_sequencer_thread (int numThreads, int numOps, uint32_t init, TurnSequencer< Atom > &seq, Atom< uint32_t > &spinThreshold, int &prev, int i)
template<template< typename > class Atom>
void run_mt_sequencer_test (int numThreads, int numOps, uint32_t init)
 TEST (MPMCQueue, sequencer)
 TEST (MPMCQueue, sequencer_emulated_futex)
 TEST (MPMCQueue, sequencer_deterministic)
template<bool Dynamic = false, typename T >
void runElementTypeTest (T &&src)
void intrusive_ptr_add_ref (RefCounted const *p)
void intrusive_ptr_release (RefCounted const *p)
 TEST (MPMCQueue, lots_of_element_types)
 TEST (MPMCQueue, lots_of_element_types_dynamic)
 TEST (MPMCQueue, single_thread_enqdeq)
 TEST (MPMCQueue, tryenq_capacity_test)
 TEST (MPMCQueue, enq_capacity_test)
template<template< typename > class Atom, bool Dynamic = false>
void runTryEnqDeqThread (int numThreads, int n, MPMCQueue< int, Atom, Dynamic > &cq, std::atomic< uint64_t > &sum, int t)
template<template< typename > class Atom, bool Dynamic = false>
void runTryEnqDeqTest (int numThreads, int numOps)
 TEST (MPMCQueue, mt_try_enq_deq)
 TEST (MPMCQueue, mt_try_enq_deq_dynamic)
 TEST (MPMCQueue, mt_try_enq_deq_emulated_futex)
 TEST (MPMCQueue, mt_try_enq_deq_emulated_futex_dynamic)
 TEST (MPMCQueue, mt_try_enq_deq_deterministic)
uint64_t nowMicro ()
template<typename Q >
string producerConsumerBench (Q &&queue, string qName, int numProducers, int numConsumers, int numOps, WriteMethodCaller< Q > &writer, bool ignoreContents=false)
template<bool Dynamic = false>
void runMtProdConsDeterministic (long seed)
void runMtProdConsDeterministicDynamic (long seed, uint32_t prods, uint32_t cons, uint32_t numOps, size_t cap, size_t minCap, size_t mult)
 TEST (MPMCQueue, mt_prod_cons_deterministic)
 TEST (MPMCQueue, mt_prod_cons_deterministic_dynamic)
template<typename T >
void setFromEnv (T &var, const char *envvar)
 TEST (MPMCQueue, mt_prod_cons_deterministic_dynamic_with_arguments)
template<bool Dynamic = false>
void runMtProdCons ()
 TEST (MPMCQueue, mt_prod_cons)
 TEST (MPMCQueue, mt_prod_cons_dynamic)
template<bool Dynamic = false>
void runMtProdConsEmulatedFutex ()
 TEST (MPMCQueue, mt_prod_cons_emulated_futex)
 TEST (MPMCQueue, mt_prod_cons_emulated_futex_dynamic)
template<template< typename > class Atom, bool Dynamic = false>
void runNeverFailThread (int numThreads, int n, MPMCQueue< int, Atom, Dynamic > &cq, std::atomic< uint64_t > &sum, int t)
template<template< typename > class Atom, bool Dynamic = false>
uint64_t runNeverFailTest (int numThreads, int numOps)
template<template< typename > class Atom, bool Dynamic = false>
void runMtNeverFail (std::vector< int > &nts, int n)
 TEST (MPMCQueue, mt_never_fail)
 TEST (MPMCQueue, mt_never_fail_emulated_futex)
template<bool Dynamic = false>
void runMtNeverFailDeterministic (std::vector< int > &nts, int n, long seed)
 TEST (MPMCQueue, mt_never_fail_deterministic)
template<class Clock , template< typename > class Atom, bool Dynamic>
void runNeverFailUntilThread (int numThreads, int n, MPMCQueue< int, Atom, Dynamic > &cq, std::atomic< uint64_t > &sum, int t)
template<class Clock , template< typename > class Atom, bool Dynamic = false>
uint64_t runNeverFailTest (int numThreads, int numOps)
template<bool Dynamic = false>
void runMtNeverFailUntilSystem (std::vector< int > &nts, int n)
 TEST (MPMCQueue, mt_never_fail_until_system)
template<bool Dynamic = false>
void runMtNeverFailUntilSteady (std::vector< int > &nts, int n)
 TEST (MPMCQueue, mt_never_fail_until_steady)
static int lc_outstanding ()
static void lc_snap ()
static void lc_step (int lineno, int what=NOTHING, int what2=NOTHING)
template<typename R >
void runPerfectForwardingTest ()
 TEST (MPMCQueue, perfect_forwarding)
 TEST (MPMCQueue, perfect_forwarding_relocatable)
template<bool Dynamic = false>
void run_queue_moving ()
 TEST (MPMCQueue, queue_moving)
 TEST (MPMCQueue, queue_moving_dynamic)
 TEST (MPMCQueue, explicit_zero_capacity_fail)
template<bool Dynamic>
void testTryReadUntil ()
template<bool Dynamic>
void testTryWriteUntil ()
 TEST (MPMCQueue, try_read_until)
 TEST (MPMCQueue, try_read_until_dynamic)
 TEST (MPMCQueue, try_write_until)
 TEST (MPMCQueue, try_write_until_dynamic)
template<bool Dynamic>
void testTimeout (MPMCQueue< int, std::atomic, Dynamic > &q)
 TEST (MPMCQueue, try_write_until_timeout)
 TEST (MPMCQueue, must_fail_try_write_until_dynamic)


static FOLLY_TLS int lc_counts [MAX_LIFECYCLE_EVENT]
static FOLLY_TLS int lc_prev [MAX_LIFECYCLE_EVENT]

Macro Definition Documentation

#define LIFECYCLE_STEP (   ...)    lc_step(__LINE__, __VA_ARGS__)

Definition at line 918 of file MPMCQueueTest.cpp.

Referenced by run_queue_moving(), and runPerfectForwardingTest().

#define PC_BENCH (   q,
)    producerConsumerBench(q, #q, (np), (nc), __VA_ARGS__)

Definition at line 640 of file MPMCQueueTest.cpp.

Referenced by runMtProdCons(), and runMtProdConsEmulatedFutex().

Typedef Documentation

typedef DeterministicSchedule DSched

Definition at line 47 of file MPMCQueueTest.cpp.

Enumeration Type Documentation


Definition at line 891 of file MPMCQueueTest.cpp.

Function Documentation

void intrusive_ptr_add_ref ( RefCounted const *  p)

Definition at line 142 of file MPMCQueueTest.cpp.

References RefCounted::rc.

142  {
143  p->rc++;
144 }
void intrusive_ptr_release ( RefCounted const *  p)

Definition at line 146 of file MPMCQueueTest.cpp.

References RefCounted::rc.

146  {
147  if (--(p->rc) == 0) {
148  delete p;
149  }
150 }
static void lc_snap ( )

Definition at line 912 of file MPMCQueueTest.cpp.

References i, lc_counts, lc_prev, and MAX_LIFECYCLE_EVENT.

Referenced by lc_step(), run_queue_moving(), and runPerfectForwardingTest().

912  {
913  for (int i = 0; i < MAX_LIFECYCLE_EVENT; ++i) {
914  lc_prev[i] = lc_counts[i];
915  }
916 }
static FOLLY_TLS int lc_counts[MAX_LIFECYCLE_EVENT]
static void lc_step ( int  lineno,
int  what = NOTHING,
int  what2 = NOTHING 

Definition at line 920 of file MPMCQueueTest.cpp.

References EXPECT_EQ, i, lc_counts, lc_prev, lc_snap(), and MAX_LIFECYCLE_EVENT.

920  {
921  for (int i = 0; i < MAX_LIFECYCLE_EVENT; ++i) {
922  int delta = i == what || i == what2 ? 1 : 0;
923  EXPECT_EQ(lc_counts[i] - lc_prev[i], delta)
924  << "lc_counts[" << i << "] - lc_prev[" << i << "] was "
925  << (lc_counts[i] - lc_prev[i]) << ", expected " << delta
926  << ", from line " << lineno;
927  }
928  lc_snap();
929 }
static FOLLY_TLS int lc_counts[MAX_LIFECYCLE_EVENT]
#define EXPECT_EQ(val1, val2)
Definition: gtest.h:1922
static void lc_snap()
uint64_t nowMicro ( )

Definition at line 360 of file MPMCQueueTest.cpp.

References uint64_t.

Referenced by producerConsumerBench(), and runNeverFailTest().

360  {
361  timeval tv;
362  gettimeofday(&tv, nullptr);
363  return static_cast<uint64_t>(tv.tv_sec) * 1000000 + tv.tv_usec;
364 }
template<typename Q >
string producerConsumerBench ( Q &&  queue,
string  qName,
int  numProducers,
int  numConsumers,
int  numOps,
WriteMethodCaller< Q > &  writer,
bool  ignoreContents = false 

Definition at line 424 of file MPMCQueueTest.cpp.

References allocated, WriteMethodCaller< Q >::callWrite(), upload::dest, EXPECT_EQ, EXPECT_FALSE, failed, i, folly::test::DeterministicSchedule::join(), WriteMethodCaller< Q >::methodName(), nowMicro(), folly::sformat(), sum(), folly::pushmi::detail::t, folly::test::DeterministicSchedule::thread(), and uint64_t.

Referenced by runMtProdConsDeterministic(), and runMtProdConsDeterministicDynamic().

431  {
432  Q& q = queue;
434  struct rusage beginUsage;
435  getrusage(RUSAGE_SELF, &beginUsage);
437  auto beginMicro = nowMicro();
439  uint64_t n = numOps;
440  std::atomic<uint64_t> sum(0);
441  std::atomic<uint64_t> failed(0);
443  vector<std::thread> producers(numProducers);
444  for (int t = 0; t < numProducers; ++t) {
445  producers[t] = DSched::thread([&, t] {
446  for (int i = t; i < numOps; i += numProducers) {
447  while (!writer.callWrite(q, i)) {
448  ++failed;
449  }
450  }
451  });
452  }
454  vector<std::thread> consumers(numConsumers);
455  for (int t = 0; t < numConsumers; ++t) {
456  consumers[t] = DSched::thread([&, t] {
457  uint64_t localSum = 0;
458  for (int i = t; i < numOps; i += numConsumers) {
459  int dest = -1;
460  q.blockingRead(dest);
461  EXPECT_FALSE(dest == -1);
462  localSum += dest;
463  }
464  sum += localSum;
465  });
466  }
468  for (auto& t : producers) {
469  DSched::join(t);
470  }
471  for (auto& t : consumers) {
472  DSched::join(t);
473  }
474  if (!ignoreContents) {
475  EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
476  }
478  auto endMicro = nowMicro();
480  struct rusage endUsage;
481  getrusage(RUSAGE_SELF, &endUsage);
483  uint64_t nanosPer = (1000 * (endMicro - beginMicro)) / n;
484  long csw = endUsage.ru_nvcsw + endUsage.ru_nivcsw -
485  (beginUsage.ru_nvcsw + beginUsage.ru_nivcsw);
486  uint64_t failures = failed;
487  size_t allocated = q.allocatedCapacity();
489  return folly::sformat(
490  "{}, {} {} producers, {} consumers => {} nanos/handoff, {} csw / {} "
491  "handoff, {} failures, {} allocated",
492  qName,
493  numProducers,
494  writer.methodName(),
495  numConsumers,
496  nanosPer,
497  csw,
498  n,
499  failures,
500  allocated);
501 }
std::atomic< int64_t > sum(0)
std::string sformat(StringPiece fmt, Args &&...args)
Definition: Format.h:280
#define EXPECT_EQ(val1, val2)
Definition: gtest.h:1922
static bool failed
static std::thread thread(Func &&func, Args &&...args)
virtual bool callWrite(Q &q, int i)=0
uint64_t nowMicro()
virtual string methodName()=0
static unsigned long long allocated
#define EXPECT_FALSE(condition)
Definition: gtest.h:1862
static void join(std::thread &child)
template<template< typename > class Atom>
void run_mt_sequencer_test ( int  numThreads,
int  numOps,
uint32_t  init 

Definition at line 67 of file MPMCQueueTest.cpp.

References folly::netops::bind(), EXPECT_EQ, i, folly::test::DeterministicSchedule::join(), folly::gen::seq(), folly::test::DeterministicSchedule::thread(), and threads.

67  {
69  Atom<uint32_t> spinThreshold(0);
71  int prev = -1;
72  vector<std::thread> threads(numThreads);
73  for (int i = 0; i < numThreads; ++i) {
75  run_mt_sequencer_thread<Atom>,
76  numThreads,
77  numOps,
78  init,
79  std::ref(seq),
80  std::ref(spinThreshold),
81  std::ref(prev),
82  i));
83  }
85  for (auto& thr : threads) {
86  DSched::join(thr);
87  }
89  EXPECT_EQ(prev, numOps - 1);
90 }
#define EXPECT_EQ(val1, val2)
Definition: gtest.h:1922
Gen seq(Value first, Value last)
Definition: Base.h:484
static std::thread thread(Func &&func, Args &&...args)
void init()
std::vector< std::thread::id > threads
int bind(NetworkSocket s, const sockaddr *name, socklen_t namelen)
Definition: NetOps.cpp:76
static void join(std::thread &child)
template<template< typename > class Atom>
void run_mt_sequencer_thread ( int  numThreads,
int  numOps,
uint32_t  init,
TurnSequencer< Atom > &  seq,
Atom< uint32_t > &  spinThreshold,
int &  prev,
int  i 

Definition at line 50 of file MPMCQueueTest.cpp.

References Atom, and EXPECT_EQ.

57  {
58  for (int op = i; op < numOps; op += numThreads) {
59  seq.waitForTurn(init + op, spinThreshold, (op % 32) == 0);
60  EXPECT_EQ(prev, op - 1);
61  prev = op;
62  seq.completeTurn(init + op);
63  }
64 }
void completeTurn(const uint32_t turn) noexcept
Unblocks a thread running waitForTurn(turn + 1)
#define EXPECT_EQ(val1, val2)
Definition: gtest.h:1922
void waitForTurn(const uint32_t turn, Atom< uint32_t > &spinCutoff, const bool updateSpinCutoff) noexcept
Definition: TurnSequencer.h:86
void init()
template<bool Dynamic = false>
void run_queue_moving ( )

Definition at line 1056 of file MPMCQueueTest.cpp.

References a, b, c, COPY_CONSTRUCTOR, DEFAULT_CONSTRUCTOR, DESTRUCTOR, EXPECT_EQ, EXPECT_TRUE, lc_outstanding(), lc_snap(), LIFECYCLE_STEP, folly::gen::move, MOVE_CONSTRUCTOR, MOVE_OPERATOR, NOTHING, and folly::f14::swap().

Referenced by TEST().

1056  {
1057  lc_snap();
1058  EXPECT_EQ(lc_outstanding(), 0);
1060  {
1061  MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> a(50);
1064  a.blockingWrite();
1067  // move constructor
1068  MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> b =
1069  std::move(a);
1071  EXPECT_EQ(a.capacity(), 0);
1072  EXPECT_EQ(a.size(), 0);
1073  EXPECT_EQ(b.capacity(), 50);
1074  EXPECT_EQ(b.size(), 1);
1076  b.blockingWrite();
1079  // move operator
1080  MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> c;
1082  c = std::move(b);
1084  EXPECT_EQ(c.capacity(), 50);
1085  EXPECT_EQ(c.size(), 2);
1087  {
1090  c.blockingRead(dst);
1093  {
1094  // swap
1095  MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> d(10);
1097  std::swap(c, d);
1099  EXPECT_EQ(c.capacity(), 10);
1100  EXPECT_TRUE(c.isEmpty());
1101  EXPECT_EQ(d.capacity(), 50);
1102  EXPECT_EQ(d.size(), 1);
1104  d.blockingRead(dst);
1107  c.blockingWrite(dst);
1110  d.blockingWrite(std::move(dst));
1112  } // d goes out of scope
1114  } // dst goes out of scope
1116  } // c goes out of scope
1118 }
char b
#define EXPECT_EQ(val1, val2)
Definition: gtest.h:1922
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
#define LIFECYCLE_STEP(...)
static int lc_outstanding()
char a
static void lc_snap()
#define EXPECT_TRUE(condition)
Definition: gtest.h:1859
void swap(SwapTrackingAlloc< T > &, SwapTrackingAlloc< T > &)
Definition: F14TestUtil.h:414
char c
template<bool Dynamic = false, typename T >
void runElementTypeTest ( T &&  src)

Definition at line 112 of file MPMCQueueTest.cpp.

References upload::dest, EXPECT_TRUE, folly::gen::move, now(), and T.

Referenced by TEST().

112  {
114  cq.blockingWrite(std::forward<T>(src));
115  T dest;
116  cq.blockingRead(dest);
117  EXPECT_TRUE(cq.write(std::move(dest)));
119  auto soon1 = std::chrono::system_clock::now() + std::chrono::seconds(1);
120  EXPECT_TRUE(cq.tryWriteUntil(soon1, std::move(dest)));
122  auto soon2 = std::chrono::steady_clock::now() + std::chrono::seconds(1);
123  EXPECT_TRUE(cq.tryWriteUntil(soon2, std::move(dest)));
125 }
#define T(v)
Definition: http_parser.c:233
std::chrono::steady_clock::time_point now()
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
#define EXPECT_TRUE(condition)
Definition: gtest.h:1859
template<template< typename > class Atom, bool Dynamic = false>
void runMtNeverFail ( std::vector< int > &  nts,
int  n 

Definition at line 761 of file MPMCQueueTest.cpp.

References folly::INFO, and uint64_t.

761  {
762  for (int nt : nts) {
763  uint64_t elapsed = runNeverFailTest<Atom, Dynamic>(nt, n);
764  LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
765  << " threads";
766  }
767 }
template<bool Dynamic = false>
void runMtNeverFailDeterministic ( std::vector< int > &  nts,
int  n,
long  seed 

Definition at line 787 of file MPMCQueueTest.cpp.

References folly::INFO, seed, folly::test::DeterministicSchedule::uniform(), and folly::test::DeterministicSchedule::uniformSubset().

Referenced by TEST().

787  {
788  LOG(INFO) << "using seed " << seed;
789  for (int nt : nts) {
790  {
791  DSched sched(DSched::uniform(seed));
792  runNeverFailTest<DeterministicAtomic, Dynamic>(nt, n);
793  }
794  {
795  DSched sched(DSched::uniformSubset(seed, 2));
796  runNeverFailTest<DeterministicAtomic, Dynamic>(nt, n);
797  }
798  }
799 }
static const int seed
static std::function< size_t(size_t)> uniform(uint64_t seed)
static std::function< size_t(size_t)> uniformSubset(uint64_t seed, size_t n=2, size_t m=64)
template<bool Dynamic = false>
void runMtNeverFailUntilSteady ( std::vector< int > &  nts,
int  n 

Definition at line 875 of file MPMCQueueTest.cpp.

References folly::INFO, and uint64_t.

Referenced by TEST().

875  {
876  for (int nt : nts) {
877  uint64_t elapsed =
878  runNeverFailTest<std::chrono::steady_clock, std::atomic, Dynamic>(
879  nt, n);
880  LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
881  << " threads";
882  }
883 }
template<bool Dynamic = false>
void runMtNeverFailUntilSystem ( std::vector< int > &  nts,
int  n 

Definition at line 858 of file MPMCQueueTest.cpp.

References folly::INFO, and uint64_t.

Referenced by TEST().

858  {
859  for (int nt : nts) {
860  uint64_t elapsed =
861  runNeverFailTest<std::chrono::system_clock, std::atomic, Dynamic>(
862  nt, n);
863  LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
864  << " threads";
865  }
866 }
template<bool Dynamic = false>
void runMtProdCons ( )

Definition at line 644 of file MPMCQueueTest.cpp.

References folly::INFO, folly::make_unique(), PC_BENCH, and setFromEnv().

Referenced by TEST().

644  {
645  using QueueType = MPMCQueue<int, std::atomic, Dynamic>;
647  int n = 100000;
648  setFromEnv(n, "NUM_OPS");
649  vector<unique_ptr<WriteMethodCaller<QueueType>>> callers;
650  callers.emplace_back(std::make_unique<BlockingWriteCaller<QueueType>>());
651  callers.emplace_back(std::make_unique<WriteIfNotFullCaller<QueueType>>());
652  callers.emplace_back(std::make_unique<WriteCaller<QueueType>>());
653  callers.emplace_back(
655  callers.emplace_back(
657  for (const auto& caller : callers) {
658  LOG(INFO) << PC_BENCH((QueueType(10)), 1, 1, n, *caller);
659  LOG(INFO) << PC_BENCH((QueueType(10)), 10, 1, n, *caller);
660  LOG(INFO) << PC_BENCH((QueueType(10)), 1, 10, n, *caller);
661  LOG(INFO) << PC_BENCH((QueueType(10)), 10, 10, n, *caller);
662  LOG(INFO) << PC_BENCH((QueueType(10000)), 1, 1, n, *caller);
663  LOG(INFO) << PC_BENCH((QueueType(10000)), 10, 1, n, *caller);
664  LOG(INFO) << PC_BENCH((QueueType(10000)), 1, 10, n, *caller);
665  LOG(INFO) << PC_BENCH((QueueType(10000)), 10, 10, n, *caller);
666  LOG(INFO) << PC_BENCH((QueueType(100000)), 32, 100, n, *caller);
667  }
668 }
#define PC_BENCH(q, np, nc,...)
void setFromEnv(T &var, const char *envvar)
std::enable_if<!std::is_array< T >::value, std::unique_ptr< T > >::type make_unique(Args &&...args)
Definition: Memory.h:259
template<bool Dynamic = false>
void runMtProdConsDeterministic ( long  seed)

Definition at line 504 of file MPMCQueueTest.cpp.

References folly::INFO, folly::make_unique(), producerConsumerBench(), and folly::test::DeterministicSchedule::uniform().

Referenced by TEST().

504  {
505  // we use the Bench method, but perf results are meaningless under DSched
506  DSched sched(DSched::uniform(seed));
510  vector<unique_ptr<WriteMethodCaller<QueueType>>> callers;
511  callers.emplace_back(std::make_unique<BlockingWriteCaller<QueueType>>());
512  callers.emplace_back(std::make_unique<WriteIfNotFullCaller<QueueType>>());
513  callers.emplace_back(std::make_unique<WriteCaller<QueueType>>());
514  callers.emplace_back(
516  callers.emplace_back(
518  size_t cap;
520  for (const auto& caller : callers) {
521  cap = 10;
522  LOG(INFO) << producerConsumerBench(
524  "MPMCQueue<int, DeterministicAtomic, Dynamic>(" +
525  folly::to<std::string>(cap) + ")",
526  1,
527  1,
528  1000,
529  *caller);
530  cap = 100;
531  LOG(INFO) << producerConsumerBench(
533  "MPMCQueue<int, DeterministicAtomic, Dynamic>(" +
534  folly::to<std::string>(cap) + ")",
535  10,
536  10,
537  1000,
538  *caller);
539  cap = 10;
540  LOG(INFO) << producerConsumerBench(
542  "MPMCQueue<int, DeterministicAtomic, Dynamic>(" +
543  folly::to<std::string>(cap) + ")",
544  1,
545  1,
546  1000,
547  *caller);
548  cap = 100;
549  LOG(INFO) << producerConsumerBench(
551  "MPMCQueue<int, DeterministicAtomic, Dynamic>(" +
552  folly::to<std::string>(cap) + ")",
553  10,
554  10,
555  1000,
556  *caller);
557  cap = 1;
558  LOG(INFO) << producerConsumerBench(
560  "MPMCQueue<int, DeterministicAtomic, Dynamic>(" +
561  folly::to<std::string>(cap) + ")",
562  10,
563  10,
564  1000,
565  *caller);
566  }
567 }
static const int seed
static std::function< size_t(size_t)> uniform(uint64_t seed)
std::enable_if<!std::is_array< T >::value, std::unique_ptr< T > >::type make_unique(Args &&...args)
Definition: Memory.h:259
string producerConsumerBench(Q &&queue, string qName, int numProducers, int numConsumers, int numOps, WriteMethodCaller< Q > &writer, bool ignoreContents=false)
void runMtProdConsDeterministicDynamic ( long  seed,
uint32_t  prods,
uint32_t  cons,
uint32_t  numOps,
size_t  cap,
size_t  minCap,
size_t  mult 

Definition at line 569 of file MPMCQueueTest.cpp.

References folly::INFO, folly::make_unique(), producerConsumerBench(), and folly::test::DeterministicSchedule::uniform().

Referenced by TEST().

576  {
577  // we use the Bench method, but perf results are meaningless under DSched
578  DSched sched(DSched::uniform(seed));
582  vector<unique_ptr<WriteMethodCaller<QueueType>>> callers;
583  callers.emplace_back(std::make_unique<BlockingWriteCaller<QueueType>>());
584  callers.emplace_back(std::make_unique<WriteIfNotFullCaller<QueueType>>());
585  callers.emplace_back(std::make_unique<WriteCaller<QueueType>>());
586  callers.emplace_back(
588  callers.emplace_back(
591  for (const auto& caller : callers) {
592  LOG(INFO) << producerConsumerBench(
594  "MPMCQueue<int, DeterministicAtomic, true>(" +
595  folly::to<std::string>(cap) + ", " +
596  folly::to<std::string>(minCap) + ", " +
597  folly::to<std::string>(mult) + ")",
598  prods,
599  cons,
600  numOps,
601  *caller);
602  }
603 }
static const int seed
static std::function< size_t(size_t)> uniform(uint64_t seed)
std::enable_if<!std::is_array< T >::value, std::unique_ptr< T > >::type make_unique(Args &&...args)
Definition: Memory.h:259
string producerConsumerBench(Q &&queue, string qName, int numProducers, int numConsumers, int numOps, WriteMethodCaller< Q > &writer, bool ignoreContents=false)
template<bool Dynamic = false>
void runMtProdConsEmulatedFutex ( )

Definition at line 679 of file MPMCQueueTest.cpp.

References folly::INFO, folly::make_unique(), and PC_BENCH.

Referenced by TEST().

679  {
682  int n = 100000;
683  vector<unique_ptr<WriteMethodCaller<QueueType>>> callers;
684  callers.emplace_back(std::make_unique<BlockingWriteCaller<QueueType>>());
685  callers.emplace_back(std::make_unique<WriteIfNotFullCaller<QueueType>>());
686  callers.emplace_back(std::make_unique<WriteCaller<QueueType>>());
687  callers.emplace_back(
689  callers.emplace_back(
691  for (const auto& caller : callers) {
692  LOG(INFO) << PC_BENCH((QueueType(10)), 1, 1, n, *caller);
693  LOG(INFO) << PC_BENCH((QueueType(10)), 10, 1, n, *caller);
694  LOG(INFO) << PC_BENCH((QueueType(10)), 1, 10, n, *caller);
695  LOG(INFO) << PC_BENCH((QueueType(10)), 10, 10, n, *caller);
696  LOG(INFO) << PC_BENCH((QueueType(10000)), 1, 1, n, *caller);
697  LOG(INFO) << PC_BENCH((QueueType(10000)), 10, 1, n, *caller);
698  LOG(INFO) << PC_BENCH((QueueType(10000)), 1, 10, n, *caller);
699  LOG(INFO) << PC_BENCH((QueueType(10000)), 10, 10, n, *caller);
700  LOG(INFO) << PC_BENCH((QueueType(100000)), 32, 100, n, *caller);
701  }
702 }
#define PC_BENCH(q, np, nc,...)
std::enable_if<!std::is_array< T >::value, std::unique_ptr< T > >::type make_unique(Args &&...args)
Definition: Memory.h:259
template<template< typename > class Atom, bool Dynamic = false>
uint64_t runNeverFailTest ( int  numThreads,
int  numOps 

Definition at line 733 of file MPMCQueueTest.cpp.

References Atom, folly::netops::bind(), EXPECT_EQ, EXPECT_TRUE, folly::test::DeterministicSchedule::join(), nowMicro(), sum(), folly::pushmi::detail::t, folly::test::DeterministicSchedule::thread(), threads, and uint64_t.

733  {
734  // always #enq >= #deq
735  MPMCQueue<int, Atom, Dynamic> cq(numThreads);
737  uint64_t n = numOps;
738  auto beginMicro = nowMicro();
740  vector<std::thread> threads(numThreads);
741  std::atomic<uint64_t> sum(0);
742  for (int t = 0; t < numThreads; ++t) {
744  runNeverFailThread<Atom, Dynamic>,
745  numThreads,
746  n,
747  std::ref(cq),
748  std::ref(sum),
749  t));
750  }
751  for (auto& t : threads) {
752  DSched::join(t);
753  }
754  EXPECT_TRUE(cq.isEmpty());
755  EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
757  return nowMicro() - beginMicro;
758 }
std::atomic< int64_t > sum(0)
#define EXPECT_EQ(val1, val2)
Definition: gtest.h:1922
static std::thread thread(Func &&func, Args &&...args)
std::vector< std::thread::id > threads
uint64_t nowMicro()
#define EXPECT_TRUE(condition)
Definition: gtest.h:1859
int bind(NetworkSocket s, const sockaddr *name, socklen_t namelen)
Definition: NetOps.cpp:76
static void join(std::thread &child)
template<class Clock , template< typename > class Atom, bool Dynamic = false>
uint64_t runNeverFailTest ( int  numThreads,
int  numOps 

Definition at line 830 of file MPMCQueueTest.cpp.

References folly::netops::bind(), EXPECT_EQ, EXPECT_TRUE, folly::test::DeterministicSchedule::join(), nowMicro(), sum(), folly::pushmi::detail::t, folly::test::DeterministicSchedule::thread(), threads, and uint64_t.

830  {
831  // always #enq >= #deq
832  MPMCQueue<int, Atom, Dynamic> cq(numThreads);
834  uint64_t n = numOps;
835  auto beginMicro = nowMicro();
837  vector<std::thread> threads(numThreads);
838  std::atomic<uint64_t> sum(0);
839  for (int t = 0; t < numThreads; ++t) {
841  runNeverFailUntilThread<Clock, Atom, Dynamic>,
842  numThreads,
843  n,
844  std::ref(cq),
845  std::ref(sum),
846  t));
847  }
848  for (auto& t : threads) {
849  DSched::join(t);
850  }
851  EXPECT_TRUE(cq.isEmpty());
852  EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
854  return nowMicro() - beginMicro;
855 }
std::atomic< int64_t > sum(0)
#define EXPECT_EQ(val1, val2)
Definition: gtest.h:1922
static std::thread thread(Func &&func, Args &&...args)
std::vector< std::thread::id > threads
uint64_t nowMicro()
#define EXPECT_TRUE(condition)
Definition: gtest.h:1859
int bind(NetworkSocket s, const sockaddr *name, socklen_t namelen)
Definition: NetOps.cpp:76
static void join(std::thread &child)
template<template< typename > class Atom, bool Dynamic = false>
void runNeverFailThread ( int  numThreads,
int  n,
MPMCQueue< int, Atom, Dynamic > &  cq,
std::atomic< uint64_t > &  sum,
int  t 

Definition at line 713 of file MPMCQueueTest.cpp.

References Atom, upload::dest, EXPECT_TRUE, i, and uint64_t.

718  {
719  uint64_t threadSum = 0;
720  for (int i = t; i < n; i += numThreads) {
721  // enq + deq
722  EXPECT_TRUE(cq.writeIfNotFull(i));
724  int dest = -1;
725  EXPECT_TRUE(cq.readIfNotEmpty(dest));
726  EXPECT_TRUE(dest >= 0);
727  threadSum += dest;
728  }
729  sum += threadSum;
730 }
std::atomic< int64_t > sum(0)
#define EXPECT_TRUE(condition)
Definition: gtest.h:1859
template<class Clock , template< typename > class Atom, bool Dynamic>
void runNeverFailUntilThread ( int  numThreads,
int  n,
MPMCQueue< int, Atom, Dynamic > &  cq,
std::atomic< uint64_t > &  sum,
int  t 

Definition at line 809 of file MPMCQueueTest.cpp.

References Atom, upload::dest, EXPECT_TRUE, i, now(), and uint64_t.

814  {
815  uint64_t threadSum = 0;
816  for (int i = t; i < n; i += numThreads) {
817  // enq + deq
818  auto soon = Clock::now() + std::chrono::seconds(1);
819  EXPECT_TRUE(cq.tryWriteUntil(soon, i));
821  int dest = -1;
822  EXPECT_TRUE(cq.readIfNotEmpty(dest));
823  EXPECT_TRUE(dest >= 0);
824  threadSum += dest;
825  }
826  sum += threadSum;
827 }
std::atomic< int64_t > sum(0)
std::chrono::steady_clock::time_point now()
#define EXPECT_TRUE(condition)
Definition: gtest.h:1859
template<typename R >
void runPerfectForwardingTest ( )

Definition at line 973 of file MPMCQueueTest.cpp.


973  {
974  lc_snap();
977  {
978  // Non-dynamic only. False positive for dynamic.
979  MPMCQueue<Lifecycle<R>, std::atomic> queue(50);
982  for (int pass = 0; pass < 10; ++pass) {
983  for (int i = 0; i < 10; ++i) {
984  queue.blockingWrite();
987  queue.blockingWrite(1, "one");
990  {
991  Lifecycle<R> src;
993  queue.blockingWrite(std::move(src));
995  }
998  {
999  Lifecycle<R> src;
1001  queue.blockingWrite(src);
1003  }
1006  EXPECT_TRUE(queue.write());
1008  }
1010  EXPECT_EQ(queue.size(), 50);
1011  EXPECT_FALSE(queue.write(2, "two"));
1014  for (int i = 0; i < 50; ++i) {
1015  {
1016  Lifecycle<R> node;
1019  queue.blockingRead(node);
1020  if (R::value) {
1021  // relocatable, moved via memcpy
1023  } else {
1025  }
1026  }
1028  }
1030  EXPECT_EQ(queue.size(), 0);
1031  }
1033  // put one element back before destruction
1034  {
1035  Lifecycle<R> src(3, "three");
1037  queue.write(std::move(src));
1039  }
1040  LIFECYCLE_STEP(DESTRUCTOR); // destroy src
1041  }
1042  LIFECYCLE_STEP(DESTRUCTOR); // destroy queue
1044  EXPECT_EQ(lc_outstanding(), 0);
1045 }
#define EXPECT_EQ(val1, val2)
Definition: gtest.h:1922
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
#define LIFECYCLE_STEP(...)
static int lc_outstanding()
static const char *const value
Definition: Conv.cpp:50
static void lc_snap()
#define EXPECT_TRUE(condition)
Definition: gtest.h:1859
#define EXPECT_FALSE(condition)
Definition: gtest.h:1862
template<template< typename > class Atom, bool Dynamic = false>
void runTryEnqDeqTest ( int  numThreads,
int  numOps 

Definition at line 272 of file MPMCQueueTest.cpp.

References folly::netops::bind(), EXPECT_EQ, EXPECT_TRUE, folly::test::DeterministicSchedule::join(), sum(), folly::pushmi::detail::t, folly::test::DeterministicSchedule::thread(), threads, and uint64_t.

Referenced by TEST().

272  {
273  // write and read aren't linearizable, so we don't have
274  // hard guarantees on their individual behavior. We can still test
275  // correctness in aggregate
276  MPMCQueue<int, Atom, Dynamic> cq(numThreads);
278  uint64_t n = numOps;
279  vector<std::thread> threads(numThreads);
280  std::atomic<uint64_t> sum(0);
281  for (int t = 0; t < numThreads; ++t) {
283  runTryEnqDeqThread<Atom, Dynamic>,
284  numThreads,
285  n,
286  std::ref(cq),
287  std::ref(sum),
288  t));
289  }
290  for (auto& t : threads) {
291  DSched::join(t);
292  }
293  EXPECT_TRUE(cq.isEmpty());
294  EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
295 }
std::atomic< int64_t > sum(0)
#define EXPECT_EQ(val1, val2)
Definition: gtest.h:1922
static std::thread thread(Func &&func, Args &&...args)
std::vector< std::thread::id > threads
#define EXPECT_TRUE(condition)
Definition: gtest.h:1859
int bind(NetworkSocket s, const sockaddr *name, socklen_t namelen)
Definition: NetOps.cpp:76
static void join(std::thread &child)
template<template< typename > class Atom, bool Dynamic = false>
void runTryEnqDeqThread ( int  numThreads,
int  n,
MPMCQueue< int, Atom, Dynamic > &  cq,
std::atomic< uint64_t > &  sum,
int  t 

Definition at line 245 of file MPMCQueueTest.cpp.

References Atom, folly::pushmi::detail::t, and uint64_t.

250  {
251  uint64_t threadSum = 0;
252  int src = t;
253  // received doesn't reflect any actual values, we just start with
254  // t and increment by numThreads to get the rounding of termination
255  // correct if numThreads doesn't evenly divide numOps
256  int received = t;
257  while (src < n || received < n) {
258  if (src < n && cq.write(src)) {
259  src += numThreads;
260  }
262  int dst;
263  if (received < n && {
264  received += numThreads;
265  threadSum += dst;
266  }
267  }
268  sum += threadSum;
269 }
std::atomic< int64_t > sum(0)
template<typename T >
void setFromEnv ( T var,
const char *  envvar 

Definition at line 614 of file MPMCQueueTest.cpp.

Referenced by runMtProdCons(), and TEST().

614  {
615  char* str = std::getenv(envvar);
616  if (str) {
617  var = atoi(str);
618  }
619 }
TEST ( MPMCQueue  ,

Definition at line 92 of file MPMCQueueTest.cpp.

92  {
93  run_mt_sequencer_test<std::atomic>(1, 100, 0);
94  run_mt_sequencer_test<std::atomic>(2, 100000, -100);
95  run_mt_sequencer_test<std::atomic>(100, 10000, -100);
96 }
TEST ( MPMCQueue  ,

Definition at line 98 of file MPMCQueueTest.cpp.

98  {
99  run_mt_sequencer_test<EmulatedFutexAtomic>(1, 100, 0);
100  run_mt_sequencer_test<EmulatedFutexAtomic>(2, 100000, -100);
101  run_mt_sequencer_test<EmulatedFutexAtomic>(100, 10000, -100);
102 }
TEST ( MPMCQueue  ,

Definition at line 104 of file MPMCQueueTest.cpp.

References folly::test::DeterministicSchedule::uniform().

104  {
105  DSched sched(DSched::uniform(0));
106  run_mt_sequencer_test<DeterministicAtomic>(1, 100, -50);
107  run_mt_sequencer_test<DeterministicAtomic>(2, 10000, (1 << 29) - 100);
108  run_mt_sequencer_test<DeterministicAtomic>(10, 1000, -100);
109 }
static std::function< size_t(size_t)> uniform(uint64_t seed)
TEST ( MPMCQueue  ,

Definition at line 152 of file MPMCQueueTest.cpp.

References RefCounted::active_instances, EXPECT_EQ, and runElementTypeTest().

152  {
153  runElementTypeTest(10);
154  runElementTypeTest(string("abc"));
155  runElementTypeTest(std::make_pair(10, string("def")));
156  runElementTypeTest(vector<string>{{"abc"}});
157  runElementTypeTest(std::make_shared<char>('a'));
158  runElementTypeTest(std::make_unique<char>('a'));
159  runElementTypeTest(boost::intrusive_ptr<RefCounted>(new RefCounted));
161 }
void runElementTypeTest(T &&src)
#define EXPECT_EQ(val1, val2)
Definition: gtest.h:1922
static FOLLY_TLS int active_instances
TEST ( MPMCQueue  ,

Definition at line 163 of file MPMCQueueTest.cpp.

References RefCounted::active_instances, EXPECT_EQ, RefCounted::RefCounted(), and string.

163  {
164  runElementTypeTest<true>(10);
165  runElementTypeTest<true>(string("abc"));
166  runElementTypeTest<true>(std::make_pair(10, string("def")));
167  runElementTypeTest<true>(vector<string>{{"abc"}});
168  runElementTypeTest<true>(std::make_shared<char>('a'));
169  runElementTypeTest<true>(std::make_unique<char>('a'));
170  runElementTypeTest<true>(boost::intrusive_ptr<RefCounted>(new RefCounted));
172 }
#define EXPECT_EQ(val1, val2)
Definition: gtest.h:1922
static FOLLY_TLS int active_instances
const char * string
Definition: Conv.cpp:212
TEST ( MPMCQueue  ,

Definition at line 174 of file MPMCQueueTest.cpp.

References upload::dest, EXPECT_EQ, EXPECT_FALSE, EXPECT_TRUE, and i.

174  {
175  // Non-dynamic version only.
176  // False positive for dynamic version. Capacity can be temporarily
177  // higher than specified.
178  MPMCQueue<int> cq(10);
180  for (int pass = 0; pass < 10; ++pass) {
181  for (int i = 0; i < 10; ++i) {
182  EXPECT_TRUE(cq.write(i));
183  }
184  EXPECT_FALSE(cq.write(-1));
185  EXPECT_FALSE(cq.isEmpty());
186  EXPECT_EQ(cq.size(), 10);
188  for (int i = 0; i < 5; ++i) {
189  int dest = -1;
191  EXPECT_EQ(dest, i);
192  }
193  for (int i = 5; i < 10; ++i) {
194  int dest = -1;
195  cq.blockingRead(dest);
196  EXPECT_EQ(dest, i);
197  }
198  int dest = -1;
200  EXPECT_EQ(dest, -1);
202  EXPECT_TRUE(cq.isEmpty());
203  EXPECT_EQ(cq.size(), 0);
204  }
205 }
#define EXPECT_EQ(val1, val2)
Definition: gtest.h:1922
#define EXPECT_TRUE(condition)
Definition: gtest.h:1859
#define EXPECT_FALSE(condition)
Definition: gtest.h:1862
TEST ( MPMCQueue  ,

Definition at line 207 of file MPMCQueueTest.cpp.

References EXPECT_FALSE, EXPECT_TRUE, and i.

207  {
208  // Non-dynamic version only.
209  // False positive for dynamic version. Capacity can be temporarily
210  // higher than specified.
211  for (size_t cap = 1; cap < 100; ++cap) {
212  MPMCQueue<int> cq(cap);
213  for (size_t i = 0; i < cap; ++i) {
214  EXPECT_TRUE(cq.write(i));
215  }
216  EXPECT_FALSE(cq.write(100));
217  }
218 }
#define EXPECT_TRUE(condition)
Definition: gtest.h:1859
#define EXPECT_FALSE(condition)
Definition: gtest.h:1862
TEST ( MPMCQueue  ,

Definition at line 220 of file MPMCQueueTest.cpp.

References Atom, dummy(), EXPECT_EQ, i, folly::pushmi::detail::t, and folly::when().

220  {
221  // Non-dynamic version only.
222  // False positive for dynamic version. Capacity can be temporarily
223  // higher than specified.
224  for (auto cap : {1, 100, 10000}) {
225  MPMCQueue<int> cq(cap);
226  for (int i = 0; i < cap; ++i) {
227  cq.blockingWrite(i);
228  }
229  int t = 0;
230  int when;
231  auto thr = std::thread([&] {
232  cq.blockingWrite(100);
233  when = t;
234  });
235  usleep(2000);
236  t = 1;
237  int dummy;
238  cq.blockingRead(dummy);
239  thr.join();
240  EXPECT_EQ(when, 1);
241  }
242 }
#define EXPECT_EQ(val1, val2)
Definition: gtest.h:1922
void dummy()
Future< Unit > when(bool p, F &&thunk)
Definition: Future-inl.h:2330
TEST ( MPMCQueue  ,

Definition at line 297 of file MPMCQueueTest.cpp.

297  {
298  int nts[] = {1, 3, 100};
300  int n = 100000;
301  for (int nt : nts) {
302  runTryEnqDeqTest<std::atomic>(nt, n);
303  }
304 }
TEST ( MPMCQueue  ,

Definition at line 306 of file MPMCQueueTest.cpp.

References runTryEnqDeqTest().

306  {
307  int nts[] = {1, 3, 100};
309  int n = 100000;
310  for (int nt : nts) {
311  runTryEnqDeqTest<std::atomic, /* Dynamic = */ true>(nt, n);
312  }
313 }
void runTryEnqDeqTest(int numThreads, int numOps)
TEST ( MPMCQueue  ,

Definition at line 315 of file MPMCQueueTest.cpp.

315  {
316  int nts[] = {1, 3, 100};
318  int n = 100000;
319  for (int nt : nts) {
320  runTryEnqDeqTest<EmulatedFutexAtomic>(nt, n);
321  }
322 }
TEST ( MPMCQueue  ,

Definition at line 324 of file MPMCQueueTest.cpp.

References runTryEnqDeqTest().

324  {
325  int nts[] = {1, 3, 100};
327  int n = 100000;
328  for (int nt : nts) {
329  runTryEnqDeqTest<EmulatedFutexAtomic, /* Dynamic = */ true>(nt, n);
330  }
331 }
void runTryEnqDeqTest(int numThreads, int numOps)
TEST ( MPMCQueue  ,

Definition at line 333 of file MPMCQueueTest.cpp.

References folly::INFO, runTryEnqDeqTest(), seed, folly::test::DeterministicSchedule::uniform(), and folly::test::DeterministicSchedule::uniformSubset().

333  {
334  int nts[] = {3, 10};
336  long seed = 0;
337  LOG(INFO) << "using seed " << seed;
339  int n = 1000;
340  for (int nt : nts) {
341  {
342  DSched sched(DSched::uniform(seed));
343  runTryEnqDeqTest<DeterministicAtomic>(nt, n);
344  }
345  {
346  DSched sched(DSched::uniformSubset(seed, 2));
347  runTryEnqDeqTest<DeterministicAtomic>(nt, n);
348  }
349  {
350  DSched sched(DSched::uniform(seed));
351  runTryEnqDeqTest<DeterministicAtomic, /*Dynamic = */ true>(nt, n);
352  }
353  {
354  DSched sched(DSched::uniformSubset(seed, 2));
355  runTryEnqDeqTest<DeterministicAtomic, /*Dynamic = */ true>(nt, n);
356  }
357  }
358 }
static const int seed
DeterministicAtomicImpl< T, DeterministicSchedule > DeterministicAtomic
static std::function< size_t(size_t)> uniform(uint64_t seed)
static std::function< size_t(size_t)> uniformSubset(uint64_t seed, size_t n=2, size_t m=64)
void runTryEnqDeqTest(int numThreads, int numOps)
TEST ( MPMCQueue  ,

Definition at line 605 of file MPMCQueueTest.cpp.

References runMtProdConsDeterministic().

605  {
607 }
void runMtProdConsDeterministic(long seed)
TEST ( MPMCQueue  ,

Definition at line 609 of file MPMCQueueTest.cpp.

609  {
610  runMtProdConsDeterministic<true>(0);
611 }
TEST ( MPMCQueue  ,

Definition at line 621 of file MPMCQueueTest.cpp.

References runMtProdConsDeterministicDynamic(), seed, setFromEnv(), and uint32_t.

621  {
622  long seed = 0;
623  uint32_t prods = 10;
624  uint32_t cons = 10;
625  uint32_t numOps = 1000;
626  size_t cap = 10000;
627  size_t minCap = 9;
628  size_t mult = 3;
629  setFromEnv(seed, "SEED");
630  setFromEnv(prods, "PRODS");
631  setFromEnv(cons, "CONS");
632  setFromEnv(numOps, "NUM_OPS");
633  setFromEnv(cap, "CAP");
634  setFromEnv(minCap, "MIN_CAP");
635  setFromEnv(mult, "MULT");
637  seed, prods, cons, numOps, cap, minCap, mult);
638 }
void runMtProdConsDeterministicDynamic(long seed, uint32_t prods, uint32_t cons, uint32_t numOps, size_t cap, size_t minCap, size_t mult)
static const int seed
void setFromEnv(T &var, const char *envvar)
TEST ( MPMCQueue  ,

Definition at line 670 of file MPMCQueueTest.cpp.

References runMtProdCons().

670  {
671  runMtProdCons();
672 }
void runMtProdCons()
TEST ( MPMCQueue  ,

Definition at line 674 of file MPMCQueueTest.cpp.

References runMtProdCons().

674  {
675  runMtProdCons</* Dynamic = */ true>();
676 }
void runMtProdCons()
TEST ( MPMCQueue  ,

Definition at line 704 of file MPMCQueueTest.cpp.

References runMtProdConsEmulatedFutex().

704  {
706 }
void runMtProdConsEmulatedFutex()
TEST ( MPMCQueue  ,

Definition at line 708 of file MPMCQueueTest.cpp.

References Atom, and runMtProdConsEmulatedFutex().

708  {
709  runMtProdConsEmulatedFutex</* Dynamic = */ true>();
710 }
void runMtProdConsEmulatedFutex()
TEST ( MPMCQueue  ,

Definition at line 774 of file MPMCQueueTest.cpp.

774  {
775  std::vector<int> nts{1, 3, 100};
776  int n = 100000;
777  runMtNeverFail<std::atomic>(nts, n);
778 }
TEST ( MPMCQueue  ,

Definition at line 780 of file MPMCQueueTest.cpp.

780  {
781  std::vector<int> nts{1, 3, 100};
782  int n = 100000;
783  runMtNeverFail<EmulatedFutexAtomic>(nts, n);
784 }
TEST ( MPMCQueue  ,

Definition at line 801 of file MPMCQueueTest.cpp.

References Atom, runMtNeverFailDeterministic(), and seed.

801  {
802  std::vector<int> nts{3, 10};
803  long seed = 0; // nowMicro() % 10000;
804  int n = 1000;
805  runMtNeverFailDeterministic(nts, n, seed);
806 }
static const int seed
void runMtNeverFailDeterministic(std::vector< int > &nts, int n, long seed)
TEST ( MPMCQueue  ,

Definition at line 868 of file MPMCQueueTest.cpp.

References runMtNeverFailUntilSystem().

868  {
869  std::vector<int> nts{1, 3, 100};
870  int n = 100000;
872 }
void runMtNeverFailUntilSystem(std::vector< int > &nts, int n)
TEST ( MPMCQueue  ,

Definition at line 885 of file MPMCQueueTest.cpp.

References runMtNeverFailUntilSteady().

885  {
886  std::vector<int> nts{1, 3, 100};
887  int n = 100000;
889 }
void runMtNeverFailUntilSteady(std::vector< int > &nts, int n)
TEST ( MPMCQueue  ,

Definition at line 1047 of file MPMCQueueTest.cpp.

1047  {
1048  runPerfectForwardingTest<std::false_type>();
1049 }
TEST ( MPMCQueue  ,

Definition at line 1051 of file MPMCQueueTest.cpp.

1051  {
1052  runPerfectForwardingTest<std::true_type>();
1053 }
TEST ( MPMCQueue  ,

Definition at line 1120 of file MPMCQueueTest.cpp.

References run_queue_moving().

1120  {
1121  run_queue_moving();
1122 }
void run_queue_moving()
TEST ( MPMCQueue  ,

Definition at line 1124 of file MPMCQueueTest.cpp.

1124  {
1125  run_queue_moving<true>();
1126 }
TEST ( MPMCQueue  ,

Definition at line 1128 of file MPMCQueueTest.cpp.

References ASSERT_THROW.

1128  {
1129  ASSERT_THROW(MPMCQueue<int> cq(0), std::invalid_argument);
1131  using DynamicMPMCQueueInt = MPMCQueue<int, std::atomic, true>;
1132  ASSERT_THROW(DynamicMPMCQueueInt cq(0), std::invalid_argument);
1133 }
#define ASSERT_THROW(statement, expected_exception)
Definition: gtest.h:1849
TEST ( MPMCQueue  ,

Definition at line 1208 of file MPMCQueueTest.cpp.

1208  {
1209  testTryReadUntil<false>();
1210 }
TEST ( MPMCQueue  ,

Definition at line 1212 of file MPMCQueueTest.cpp.

1212  {
1213  testTryReadUntil<true>();
1214 }
TEST ( MPMCQueue  ,

Definition at line 1216 of file MPMCQueueTest.cpp.

1216  {
1217  testTryWriteUntil<false>();
1218 }
TEST ( MPMCQueue  ,

Definition at line 1220 of file MPMCQueueTest.cpp.

1220  {
1221  testTryWriteUntil<true>();
1222 }
TEST ( MPMCQueue  ,

Definition at line 1232 of file MPMCQueueTest.cpp.

1232  {
1234  testTimeout<false>(queue);
1235 }
TEST ( MPMCQueue  ,

Definition at line 1237 of file MPMCQueueTest.cpp.

1237  {
1239  testTimeout<true>(queue);
1240 }
template<bool Dynamic>
void testTimeout ( MPMCQueue< int, std::atomic, Dynamic > &  q)

Definition at line 1225 of file MPMCQueueTest.cpp.

References now().

1225  {
1226  CHECK(q.write(1));
1227  /* The following must not block forever */
1228  q.tryWriteUntil(
1229  std::chrono::system_clock::now() + std::chrono::microseconds(10000), 2);
1230 }
std::chrono::steady_clock::time_point now()
template<bool Dynamic>
void testTryReadUntil ( )

Definition at line 1136 of file MPMCQueueTest.cpp.

References b, folly::custom_stop_watch< Clock, Duration >::elapsed(), EXPECT_EQ, EXPECT_FALSE, EXPECT_TRUE, folly::custom_stop_watch< Clock, Duration >::getCheckpoint(), i, threads, and folly::detail::distributed_mutex::wait().

1136  {
1139  const auto wait = std::chrono::milliseconds(100);
1140  stop_watch<> watch;
1141  bool rets[2];
1142  int vals[2];
1143  std::vector<std::thread> threads;
1144  boost::barrier b{3};
1145  for (int i = 0; i < 2; i++) {
1146  threads.emplace_back([&, i] {
1147  b.wait();
1148  rets[i] = q.tryReadUntil(watch.getCheckpoint() + wait, vals[i]);
1149  });
1150  }
1152  b.wait();
1153  EXPECT_TRUE(q.write(42));
1155  for (int i = 0; i < 2; i++) {
1156  threads[i].join();
1157  }
1159  for (int i = 0; i < 2; i++) {
1160  int other = (i + 1) % 2;
1161  if (rets[i]) {
1162  EXPECT_EQ(42, vals[i]);
1163  EXPECT_FALSE(rets[other]);
1164  }
1165  }
1167  EXPECT_TRUE(watch.elapsed(wait));
1168 }
char b
duration elapsed() const
Definition: stop_watch.h:168
#define EXPECT_EQ(val1, val2)
Definition: gtest.h:1922
std::vector< std::thread::id > threads
bool wait(Waiter *waiter, bool shouldSleep, Waiter *&next)
#define EXPECT_TRUE(condition)
Definition: gtest.h:1859
#define EXPECT_FALSE(condition)
Definition: gtest.h:1862
clock_type::time_point getCheckpoint() const
Definition: stop_watch.h:257
template<bool Dynamic>
void testTryWriteUntil ( )

Definition at line 1171 of file MPMCQueueTest.cpp.

References b, folly::custom_stop_watch< Clock, Duration >::elapsed(), EXPECT_EQ, EXPECT_FALSE, EXPECT_TRUE, folly::custom_stop_watch< Clock, Duration >::getCheckpoint(), i, threads, folly::detail::distributed_mutex::wait(), and x.

1171  {
1173  EXPECT_TRUE(q.write(42));
1175  const auto wait = std::chrono::milliseconds(100);
1176  stop_watch<> watch;
1177  bool rets[2];
1178  std::vector<std::thread> threads;
1179  boost::barrier b{3};
1180  for (int i = 0; i < 2; i++) {
1181  threads.emplace_back([&, i] {
1182  b.wait();
1183  rets[i] = q.tryWriteUntil(watch.getCheckpoint() + wait, i);
1184  });
1185  }
1187  b.wait();
1188  int x;
1190  EXPECT_EQ(42, x);
1192  for (int i = 0; i < 2; i++) {
1193  threads[i].join();
1194  }
1197  for (int i = 0; i < 2; i++) {
1198  int other = (i + 1) % 2;
1199  if (rets[i]) {
1200  EXPECT_EQ(i, x);
1201  EXPECT_FALSE(rets[other]);
1202  }
1203  }
1205  EXPECT_TRUE(watch.elapsed(wait));
1206 }
char b
duration elapsed() const
Definition: stop_watch.h:168
#define EXPECT_EQ(val1, val2)
Definition: gtest.h:1922
const int x
std::vector< std::thread::id > threads
bool wait(Waiter *waiter, bool shouldSleep, Waiter *&next)
#define EXPECT_TRUE(condition)
Definition: gtest.h:1859
#define EXPECT_FALSE(condition)
Definition: gtest.h:1862
clock_type::time_point getCheckpoint() const
Definition: stop_watch.h:257

Variable Documentation


Definition at line 904 of file MPMCQueueTest.cpp.

Referenced by lc_snap(), and lc_step().