proxygen
MPMCQueueTest.cpp File Reference
#include <folly/MPMCQueue.h>
#include <folly/Format.h>
#include <folly/Memory.h>
#include <folly/portability/GTest.h>
#include <folly/portability/SysResource.h>
#include <folly/portability/SysTime.h>
#include <folly/portability/Unistd.h>
#include <folly/stop_watch.h>
#include <folly/test/DeterministicSchedule.h>
#include <boost/intrusive_ptr.hpp>
#include <boost/thread/barrier.hpp>
#include <functional>
#include <memory>
#include <thread>
#include <utility>

Go to the source code of this file.

Classes

struct  RefCounted
 
struct  WriteMethodCaller< Q >
 
struct  BlockingWriteCaller< Q >
 
struct  WriteIfNotFullCaller< Q >
 
struct  WriteCaller< Q >
 
struct  TryWriteUntilCaller< Q, Clock, Duration >
 
struct  Lifecycle< R >
 

Macros

#define PC_BENCH(q, np, nc, ...)   producerConsumerBench(q, #q, (np), (nc), __VA_ARGS__)
 
#define LIFECYCLE_STEP(...)   lc_step(__LINE__, __VA_ARGS__)
 

Typedefs

typedef DeterministicSchedule DSched
 

Enumerations

enum  LifecycleEvent {
  NOTHING = -1, DEFAULT_CONSTRUCTOR, COPY_CONSTRUCTOR, MOVE_CONSTRUCTOR,
  TWO_ARG_CONSTRUCTOR, COPY_OPERATOR, MOVE_OPERATOR, DESTRUCTOR,
  MAX_LIFECYCLE_EVENT
}
 

Functions

template<template< typename > class Atom>
void run_mt_sequencer_thread (int numThreads, int numOps, uint32_t init, TurnSequencer< Atom > &seq, Atom< uint32_t > &spinThreshold, int &prev, int i)
 
template<template< typename > class Atom>
void run_mt_sequencer_test (int numThreads, int numOps, uint32_t init)
 
 TEST (MPMCQueue, sequencer)
 
 TEST (MPMCQueue, sequencer_emulated_futex)
 
 TEST (MPMCQueue, sequencer_deterministic)
 
template<bool Dynamic = false, typename T >
void runElementTypeTest (T &&src)
 
void intrusive_ptr_add_ref (RefCounted const *p)
 
void intrusive_ptr_release (RefCounted const *p)
 
 TEST (MPMCQueue, lots_of_element_types)
 
 TEST (MPMCQueue, lots_of_element_types_dynamic)
 
 TEST (MPMCQueue, single_thread_enqdeq)
 
 TEST (MPMCQueue, tryenq_capacity_test)
 
 TEST (MPMCQueue, enq_capacity_test)
 
template<template< typename > class Atom, bool Dynamic = false>
void runTryEnqDeqThread (int numThreads, int n, MPMCQueue< int, Atom, Dynamic > &cq, std::atomic< uint64_t > &sum, int t)
 
template<template< typename > class Atom, bool Dynamic = false>
void runTryEnqDeqTest (int numThreads, int numOps)
 
 TEST (MPMCQueue, mt_try_enq_deq)
 
 TEST (MPMCQueue, mt_try_enq_deq_dynamic)
 
 TEST (MPMCQueue, mt_try_enq_deq_emulated_futex)
 
 TEST (MPMCQueue, mt_try_enq_deq_emulated_futex_dynamic)
 
 TEST (MPMCQueue, mt_try_enq_deq_deterministic)
 
uint64_t nowMicro ()
 
template<typename Q >
string producerConsumerBench (Q &&queue, string qName, int numProducers, int numConsumers, int numOps, WriteMethodCaller< Q > &writer, bool ignoreContents=false)
 
template<bool Dynamic = false>
void runMtProdConsDeterministic (long seed)
 
void runMtProdConsDeterministicDynamic (long seed, uint32_t prods, uint32_t cons, uint32_t numOps, size_t cap, size_t minCap, size_t mult)
 
 TEST (MPMCQueue, mt_prod_cons_deterministic)
 
 TEST (MPMCQueue, mt_prod_cons_deterministic_dynamic)
 
template<typename T >
void setFromEnv (T &var, const char *envvar)
 
 TEST (MPMCQueue, mt_prod_cons_deterministic_dynamic_with_arguments)
 
template<bool Dynamic = false>
void runMtProdCons ()
 
 TEST (MPMCQueue, mt_prod_cons)
 
 TEST (MPMCQueue, mt_prod_cons_dynamic)
 
template<bool Dynamic = false>
void runMtProdConsEmulatedFutex ()
 
 TEST (MPMCQueue, mt_prod_cons_emulated_futex)
 
 TEST (MPMCQueue, mt_prod_cons_emulated_futex_dynamic)
 
template<template< typename > class Atom, bool Dynamic = false>
void runNeverFailThread (int numThreads, int n, MPMCQueue< int, Atom, Dynamic > &cq, std::atomic< uint64_t > &sum, int t)
 
template<template< typename > class Atom, bool Dynamic = false>
uint64_t runNeverFailTest (int numThreads, int numOps)
 
template<template< typename > class Atom, bool Dynamic = false>
void runMtNeverFail (std::vector< int > &nts, int n)
 
 TEST (MPMCQueue, mt_never_fail)
 
 TEST (MPMCQueue, mt_never_fail_emulated_futex)
 
template<bool Dynamic = false>
void runMtNeverFailDeterministic (std::vector< int > &nts, int n, long seed)
 
 TEST (MPMCQueue, mt_never_fail_deterministic)
 
template<class Clock , template< typename > class Atom, bool Dynamic>
void runNeverFailUntilThread (int numThreads, int n, MPMCQueue< int, Atom, Dynamic > &cq, std::atomic< uint64_t > &sum, int t)
 
template<class Clock , template< typename > class Atom, bool Dynamic = false>
uint64_t runNeverFailTest (int numThreads, int numOps)
 
template<bool Dynamic = false>
void runMtNeverFailUntilSystem (std::vector< int > &nts, int n)
 
 TEST (MPMCQueue, mt_never_fail_until_system)
 
template<bool Dynamic = false>
void runMtNeverFailUntilSteady (std::vector< int > &nts, int n)
 
 TEST (MPMCQueue, mt_never_fail_until_steady)
 
static int lc_outstanding ()
 
static void lc_snap ()
 
static void lc_step (int lineno, int what=NOTHING, int what2=NOTHING)
 
template<typename R >
void runPerfectForwardingTest ()
 
 TEST (MPMCQueue, perfect_forwarding)
 
 TEST (MPMCQueue, perfect_forwarding_relocatable)
 
template<bool Dynamic = false>
void run_queue_moving ()
 
 TEST (MPMCQueue, queue_moving)
 
 TEST (MPMCQueue, queue_moving_dynamic)
 
 TEST (MPMCQueue, explicit_zero_capacity_fail)
 
template<bool Dynamic>
void testTryReadUntil ()
 
template<bool Dynamic>
void testTryWriteUntil ()
 
 TEST (MPMCQueue, try_read_until)
 
 TEST (MPMCQueue, try_read_until_dynamic)
 
 TEST (MPMCQueue, try_write_until)
 
 TEST (MPMCQueue, try_write_until_dynamic)
 
template<bool Dynamic>
void testTimeout (MPMCQueue< int, std::atomic, Dynamic > &q)
 
 TEST (MPMCQueue, try_write_until_timeout)
 
 TEST (MPMCQueue, must_fail_try_write_until_dynamic)
 

Variables

static FOLLY_TLS int lc_counts [MAX_LIFECYCLE_EVENT]
 
static FOLLY_TLS int lc_prev [MAX_LIFECYCLE_EVENT]
 

Macro Definition Documentation

#define LIFECYCLE_STEP (   ...)    lc_step(__LINE__, __VA_ARGS__)

Definition at line 918 of file MPMCQueueTest.cpp.

Referenced by run_queue_moving(), and runPerfectForwardingTest().

#define PC_BENCH (   q,
  np,
  nc,
  ... 
)    producerConsumerBench(q, #q, (np), (nc), __VA_ARGS__)

Definition at line 640 of file MPMCQueueTest.cpp.

Referenced by runMtProdCons(), and runMtProdConsEmulatedFutex().

Typedef Documentation

typedef DeterministicSchedule DSched

Definition at line 47 of file MPMCQueueTest.cpp.

Enumeration Type Documentation

Enumerator
NOTHING 
DEFAULT_CONSTRUCTOR 
COPY_CONSTRUCTOR 
MOVE_CONSTRUCTOR 
TWO_ARG_CONSTRUCTOR 
COPY_OPERATOR 
MOVE_OPERATOR 
DESTRUCTOR 
MAX_LIFECYCLE_EVENT 

Definition at line 891 of file MPMCQueueTest.cpp.

Function Documentation

void intrusive_ptr_add_ref ( RefCounted const *  p)

Definition at line 142 of file MPMCQueueTest.cpp.

References RefCounted::rc.

142  {
143  p->rc++;
144 }
void intrusive_ptr_release ( RefCounted const *  p)

Definition at line 146 of file MPMCQueueTest.cpp.

References RefCounted::rc.

146  {
147  if (--(p->rc) == 0) {
148  delete p;
149  }
150 }
static void lc_snap ( )
static

Definition at line 912 of file MPMCQueueTest.cpp.

References i, lc_counts, lc_prev, and MAX_LIFECYCLE_EVENT.

Referenced by lc_step(), run_queue_moving(), and runPerfectForwardingTest().

912  {
913  for (int i = 0; i < MAX_LIFECYCLE_EVENT; ++i) {
914  lc_prev[i] = lc_counts[i];
915  }
916 }
static FOLLY_TLS int lc_counts[MAX_LIFECYCLE_EVENT]
static FOLLY_TLS int lc_prev[MAX_LIFECYCLE_EVENT]
static void lc_step ( int  lineno,
int  what = NOTHING,
int  what2 = NOTHING 
)
static

Definition at line 920 of file MPMCQueueTest.cpp.

References EXPECT_EQ, i, lc_counts, lc_prev, lc_snap(), and MAX_LIFECYCLE_EVENT.

920  {
921  for (int i = 0; i < MAX_LIFECYCLE_EVENT; ++i) {
922  int delta = i == what || i == what2 ? 1 : 0;
923  EXPECT_EQ(lc_counts[i] - lc_prev[i], delta)
924  << "lc_counts[" << i << "] - lc_prev[" << i << "] was "
925  << (lc_counts[i] - lc_prev[i]) << ", expected " << delta
926  << ", from line " << lineno;
927  }
928  lc_snap();
929 }
static FOLLY_TLS int lc_counts[MAX_LIFECYCLE_EVENT]
#define EXPECT_EQ(val1, val2)
Definition: gtest.h:1922
static FOLLY_TLS int lc_prev[MAX_LIFECYCLE_EVENT]
static void lc_snap()
uint64_t nowMicro ( )

Definition at line 360 of file MPMCQueueTest.cpp.

References uint64_t.

Referenced by producerConsumerBench(), and runNeverFailTest().

360  {
361  timeval tv;
362  gettimeofday(&tv, nullptr);
363  return static_cast<uint64_t>(tv.tv_sec) * 1000000 + tv.tv_usec;
364 }
template<typename Q >
string producerConsumerBench ( Q &&  queue,
string  qName,
int  numProducers,
int  numConsumers,
int  numOps,
WriteMethodCaller< Q > &  writer,
bool  ignoreContents = false 
)

Definition at line 424 of file MPMCQueueTest.cpp.

References allocated, WriteMethodCaller< Q >::callWrite(), upload::dest, EXPECT_EQ, EXPECT_FALSE, failed, i, folly::test::DeterministicSchedule::join(), WriteMethodCaller< Q >::methodName(), nowMicro(), folly::sformat(), sum(), folly::pushmi::detail::t, folly::test::DeterministicSchedule::thread(), and uint64_t.

Referenced by runMtProdConsDeterministic(), and runMtProdConsDeterministicDynamic().

431  {
432  Q& q = queue;
433 
434  struct rusage beginUsage;
435  getrusage(RUSAGE_SELF, &beginUsage);
436 
437  auto beginMicro = nowMicro();
438 
439  uint64_t n = numOps;
440  std::atomic<uint64_t> sum(0);
441  std::atomic<uint64_t> failed(0);
442 
443  vector<std::thread> producers(numProducers);
444  for (int t = 0; t < numProducers; ++t) {
445  producers[t] = DSched::thread([&, t] {
446  for (int i = t; i < numOps; i += numProducers) {
447  while (!writer.callWrite(q, i)) {
448  ++failed;
449  }
450  }
451  });
452  }
453 
454  vector<std::thread> consumers(numConsumers);
455  for (int t = 0; t < numConsumers; ++t) {
456  consumers[t] = DSched::thread([&, t] {
457  uint64_t localSum = 0;
458  for (int i = t; i < numOps; i += numConsumers) {
459  int dest = -1;
460  q.blockingRead(dest);
461  EXPECT_FALSE(dest == -1);
462  localSum += dest;
463  }
464  sum += localSum;
465  });
466  }
467 
468  for (auto& t : producers) {
469  DSched::join(t);
470  }
471  for (auto& t : consumers) {
472  DSched::join(t);
473  }
474  if (!ignoreContents) {
475  EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
476  }
477 
478  auto endMicro = nowMicro();
479 
480  struct rusage endUsage;
481  getrusage(RUSAGE_SELF, &endUsage);
482 
483  uint64_t nanosPer = (1000 * (endMicro - beginMicro)) / n;
484  long csw = endUsage.ru_nvcsw + endUsage.ru_nivcsw -
485  (beginUsage.ru_nvcsw + beginUsage.ru_nivcsw);
486  uint64_t failures = failed;
487  size_t allocated = q.allocatedCapacity();
488 
489  return folly::sformat(
490  "{}, {} {} producers, {} consumers => {} nanos/handoff, {} csw / {} "
491  "handoff, {} failures, {} allocated",
492  qName,
493  numProducers,
494  writer.methodName(),
495  numConsumers,
496  nanosPer,
497  csw,
498  n,
499  failures,
500  allocated);
501 }
std::atomic< int64_t > sum(0)
std::string sformat(StringPiece fmt, Args &&...args)
Definition: Format.h:280
#define EXPECT_EQ(val1, val2)
Definition: gtest.h:1922
static bool failed
dest
Definition: upload.py:394
static std::thread thread(Func &&func, Args &&...args)
virtual bool callWrite(Q &q, int i)=0
uint64_t nowMicro()
virtual string methodName()=0
static unsigned long long allocated
#define EXPECT_FALSE(condition)
Definition: gtest.h:1862
static void join(std::thread &child)
template<template< typename > class Atom>
void run_mt_sequencer_test ( int  numThreads,
int  numOps,
uint32_t  init 
)

Definition at line 67 of file MPMCQueueTest.cpp.

References folly::netops::bind(), EXPECT_EQ, i, folly::test::DeterministicSchedule::join(), folly::gen::seq(), folly::test::DeterministicSchedule::thread(), and threads.

67  {
69  Atom<uint32_t> spinThreshold(0);
70 
71  int prev = -1;
72  vector<std::thread> threads(numThreads);
73  for (int i = 0; i < numThreads; ++i) {
75  run_mt_sequencer_thread<Atom>,
76  numThreads,
77  numOps,
78  init,
79  std::ref(seq),
80  std::ref(spinThreshold),
81  std::ref(prev),
82  i));
83  }
84 
85  for (auto& thr : threads) {
86  DSched::join(thr);
87  }
88 
89  EXPECT_EQ(prev, numOps - 1);
90 }
#define EXPECT_EQ(val1, val2)
Definition: gtest.h:1922
Gen seq(Value first, Value last)
Definition: Base.h:484
static std::thread thread(Func &&func, Args &&...args)
void init()
std::vector< std::thread::id > threads
int bind(NetworkSocket s, const sockaddr *name, socklen_t namelen)
Definition: NetOps.cpp:76
static void join(std::thread &child)
template<template< typename > class Atom>
void run_mt_sequencer_thread ( int  numThreads,
int  numOps,
uint32_t  init,
TurnSequencer< Atom > &  seq,
Atom< uint32_t > &  spinThreshold,
int &  prev,
int  i 
)

Definition at line 50 of file MPMCQueueTest.cpp.

References Atom, and EXPECT_EQ.

57  {
58  for (int op = i; op < numOps; op += numThreads) {
59  seq.waitForTurn(init + op, spinThreshold, (op % 32) == 0);
60  EXPECT_EQ(prev, op - 1);
61  prev = op;
62  seq.completeTurn(init + op);
63  }
64 }
void completeTurn(const uint32_t turn) noexcept
Unblocks a thread running waitForTurn(turn + 1)
#define EXPECT_EQ(val1, val2)
Definition: gtest.h:1922
void waitForTurn(const uint32_t turn, Atom< uint32_t > &spinCutoff, const bool updateSpinCutoff) noexcept
Definition: TurnSequencer.h:86
void init()
template<bool Dynamic = false>
void run_queue_moving ( )

Definition at line 1056 of file MPMCQueueTest.cpp.

References a, b, c, COPY_CONSTRUCTOR, DEFAULT_CONSTRUCTOR, DESTRUCTOR, EXPECT_EQ, EXPECT_TRUE, lc_outstanding(), lc_snap(), LIFECYCLE_STEP, folly::gen::move, MOVE_CONSTRUCTOR, MOVE_OPERATOR, NOTHING, and folly::f14::swap().

Referenced by TEST().

1056  {
1057  lc_snap();
1058  EXPECT_EQ(lc_outstanding(), 0);
1059 
1060  {
1061  MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> a(50);
1063 
1064  a.blockingWrite();
1066 
1067  // move constructor
1068  MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> b =
1069  std::move(a);
1071  EXPECT_EQ(a.capacity(), 0);
1072  EXPECT_EQ(a.size(), 0);
1073  EXPECT_EQ(b.capacity(), 50);
1074  EXPECT_EQ(b.size(), 1);
1075 
1076  b.blockingWrite();
1078 
1079  // move operator
1080  MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> c;
1082  c = std::move(b);
1084  EXPECT_EQ(c.capacity(), 50);
1085  EXPECT_EQ(c.size(), 2);
1086 
1087  {
1090  c.blockingRead(dst);
1092 
1093  {
1094  // swap
1095  MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> d(10);
1097  std::swap(c, d);
1099  EXPECT_EQ(c.capacity(), 10);
1100  EXPECT_TRUE(c.isEmpty());
1101  EXPECT_EQ(d.capacity(), 50);
1102  EXPECT_EQ(d.size(), 1);
1103 
1104  d.blockingRead(dst);
1106 
1107  c.blockingWrite(dst);
1109 
1110  d.blockingWrite(std::move(dst));
1112  } // d goes out of scope
1114  } // dst goes out of scope
1116  } // c goes out of scope
1118 }
char b
#define EXPECT_EQ(val1, val2)
Definition: gtest.h:1922
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
#define LIFECYCLE_STEP(...)
static int lc_outstanding()
char a
static void lc_snap()
#define EXPECT_TRUE(condition)
Definition: gtest.h:1859
void swap(SwapTrackingAlloc< T > &, SwapTrackingAlloc< T > &)
Definition: F14TestUtil.h:414
char c
template<bool Dynamic = false, typename T >
void runElementTypeTest ( T &&  src)

Definition at line 112 of file MPMCQueueTest.cpp.

References upload::dest, EXPECT_TRUE, folly::gen::move, now(), and T.

Referenced by TEST().

112  {
114  cq.blockingWrite(std::forward<T>(src));
115  T dest;
116  cq.blockingRead(dest);
117  EXPECT_TRUE(cq.write(std::move(dest)));
118  EXPECT_TRUE(cq.read(dest));
119  auto soon1 = std::chrono::system_clock::now() + std::chrono::seconds(1);
120  EXPECT_TRUE(cq.tryWriteUntil(soon1, std::move(dest)));
121  EXPECT_TRUE(cq.read(dest));
122  auto soon2 = std::chrono::steady_clock::now() + std::chrono::seconds(1);
123  EXPECT_TRUE(cq.tryWriteUntil(soon2, std::move(dest)));
124  EXPECT_TRUE(cq.read(dest));
125 }
#define T(v)
Definition: http_parser.c:233
std::chrono::steady_clock::time_point now()
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
dest
Definition: upload.py:394
#define EXPECT_TRUE(condition)
Definition: gtest.h:1859
template<template< typename > class Atom, bool Dynamic = false>
void runMtNeverFail ( std::vector< int > &  nts,
int  n 
)

Definition at line 761 of file MPMCQueueTest.cpp.

References folly::INFO, and uint64_t.

761  {
762  for (int nt : nts) {
763  uint64_t elapsed = runNeverFailTest<Atom, Dynamic>(nt, n);
764  LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
765  << " threads";
766  }
767 }
template<bool Dynamic = false>
void runMtNeverFailDeterministic ( std::vector< int > &  nts,
int  n,
long  seed 
)

Definition at line 787 of file MPMCQueueTest.cpp.

References folly::INFO, seed, folly::test::DeterministicSchedule::uniform(), and folly::test::DeterministicSchedule::uniformSubset().

Referenced by TEST().

787  {
788  LOG(INFO) << "using seed " << seed;
789  for (int nt : nts) {
790  {
791  DSched sched(DSched::uniform(seed));
792  runNeverFailTest<DeterministicAtomic, Dynamic>(nt, n);
793  }
794  {
795  DSched sched(DSched::uniformSubset(seed, 2));
796  runNeverFailTest<DeterministicAtomic, Dynamic>(nt, n);
797  }
798  }
799 }
static const int seed
static std::function< size_t(size_t)> uniform(uint64_t seed)
static std::function< size_t(size_t)> uniformSubset(uint64_t seed, size_t n=2, size_t m=64)
template<bool Dynamic = false>
void runMtNeverFailUntilSteady ( std::vector< int > &  nts,
int  n 
)

Definition at line 875 of file MPMCQueueTest.cpp.

References folly::INFO, and uint64_t.

Referenced by TEST().

875  {
876  for (int nt : nts) {
877  uint64_t elapsed =
878  runNeverFailTest<std::chrono::steady_clock, std::atomic, Dynamic>(
879  nt, n);
880  LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
881  << " threads";
882  }
883 }
template<bool Dynamic = false>
void runMtNeverFailUntilSystem ( std::vector< int > &  nts,
int  n 
)

Definition at line 858 of file MPMCQueueTest.cpp.

References folly::INFO, and uint64_t.

Referenced by TEST().

858  {
859  for (int nt : nts) {
860  uint64_t elapsed =
861  runNeverFailTest<std::chrono::system_clock, std::atomic, Dynamic>(
862  nt, n);
863  LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
864  << " threads";
865  }
866 }
template<bool Dynamic = false>
void runMtProdCons ( )

Definition at line 644 of file MPMCQueueTest.cpp.

References folly::INFO, folly::make_unique(), PC_BENCH, and setFromEnv().

Referenced by TEST().

644  {
645  using QueueType = MPMCQueue<int, std::atomic, Dynamic>;
646 
647  int n = 100000;
648  setFromEnv(n, "NUM_OPS");
649  vector<unique_ptr<WriteMethodCaller<QueueType>>> callers;
650  callers.emplace_back(std::make_unique<BlockingWriteCaller<QueueType>>());
651  callers.emplace_back(std::make_unique<WriteIfNotFullCaller<QueueType>>());
652  callers.emplace_back(std::make_unique<WriteCaller<QueueType>>());
653  callers.emplace_back(
655  callers.emplace_back(
657  for (const auto& caller : callers) {
658  LOG(INFO) << PC_BENCH((QueueType(10)), 1, 1, n, *caller);
659  LOG(INFO) << PC_BENCH((QueueType(10)), 10, 1, n, *caller);
660  LOG(INFO) << PC_BENCH((QueueType(10)), 1, 10, n, *caller);
661  LOG(INFO) << PC_BENCH((QueueType(10)), 10, 10, n, *caller);
662  LOG(INFO) << PC_BENCH((QueueType(10000)), 1, 1, n, *caller);
663  LOG(INFO) << PC_BENCH((QueueType(10000)), 10, 1, n, *caller);
664  LOG(INFO) << PC_BENCH((QueueType(10000)), 1, 10, n, *caller);
665  LOG(INFO) << PC_BENCH((QueueType(10000)), 10, 10, n, *caller);
666  LOG(INFO) << PC_BENCH((QueueType(100000)), 32, 100, n, *caller);
667  }
668 }
#define PC_BENCH(q, np, nc,...)
void setFromEnv(T &var, const char *envvar)
std::enable_if<!std::is_array< T >::value, std::unique_ptr< T > >::type make_unique(Args &&...args)
Definition: Memory.h:259
template<bool Dynamic = false>
void runMtProdConsDeterministic ( long  seed)

Definition at line 504 of file MPMCQueueTest.cpp.

References folly::INFO, folly::make_unique(), producerConsumerBench(), and folly::test::DeterministicSchedule::uniform().

Referenced by TEST().

504  {
505  // we use the Bench method, but perf results are meaningless under DSched
506  DSched sched(DSched::uniform(seed));
507 
509 
510  vector<unique_ptr<WriteMethodCaller<QueueType>>> callers;
511  callers.emplace_back(std::make_unique<BlockingWriteCaller<QueueType>>());
512  callers.emplace_back(std::make_unique<WriteIfNotFullCaller<QueueType>>());
513  callers.emplace_back(std::make_unique<WriteCaller<QueueType>>());
514  callers.emplace_back(
516  callers.emplace_back(
518  size_t cap;
519 
520  for (const auto& caller : callers) {
521  cap = 10;
522  LOG(INFO) << producerConsumerBench(
524  "MPMCQueue<int, DeterministicAtomic, Dynamic>(" +
525  folly::to<std::string>(cap) + ")",
526  1,
527  1,
528  1000,
529  *caller);
530  cap = 100;
531  LOG(INFO) << producerConsumerBench(
533  "MPMCQueue<int, DeterministicAtomic, Dynamic>(" +
534  folly::to<std::string>(cap) + ")",
535  10,
536  10,
537  1000,
538  *caller);
539  cap = 10;
540  LOG(INFO) << producerConsumerBench(
542  "MPMCQueue<int, DeterministicAtomic, Dynamic>(" +
543  folly::to<std::string>(cap) + ")",
544  1,
545  1,
546  1000,
547  *caller);
548  cap = 100;
549  LOG(INFO) << producerConsumerBench(
551  "MPMCQueue<int, DeterministicAtomic, Dynamic>(" +
552  folly::to<std::string>(cap) + ")",
553  10,
554  10,
555  1000,
556  *caller);
557  cap = 1;
558  LOG(INFO) << producerConsumerBench(
560  "MPMCQueue<int, DeterministicAtomic, Dynamic>(" +
561  folly::to<std::string>(cap) + ")",
562  10,
563  10,
564  1000,
565  *caller);
566  }
567 }
static const int seed
static std::function< size_t(size_t)> uniform(uint64_t seed)
std::enable_if<!std::is_array< T >::value, std::unique_ptr< T > >::type make_unique(Args &&...args)
Definition: Memory.h:259
string producerConsumerBench(Q &&queue, string qName, int numProducers, int numConsumers, int numOps, WriteMethodCaller< Q > &writer, bool ignoreContents=false)
void runMtProdConsDeterministicDynamic ( long  seed,
uint32_t  prods,
uint32_t  cons,
uint32_t  numOps,
size_t  cap,
size_t  minCap,
size_t  mult 
)

Definition at line 569 of file MPMCQueueTest.cpp.

References folly::INFO, folly::make_unique(), producerConsumerBench(), and folly::test::DeterministicSchedule::uniform().

Referenced by TEST().

576  {
577  // we use the Bench method, but perf results are meaningless under DSched
578  DSched sched(DSched::uniform(seed));
579 
581 
582  vector<unique_ptr<WriteMethodCaller<QueueType>>> callers;
583  callers.emplace_back(std::make_unique<BlockingWriteCaller<QueueType>>());
584  callers.emplace_back(std::make_unique<WriteIfNotFullCaller<QueueType>>());
585  callers.emplace_back(std::make_unique<WriteCaller<QueueType>>());
586  callers.emplace_back(
588  callers.emplace_back(
590 
591  for (const auto& caller : callers) {
592  LOG(INFO) << producerConsumerBench(
594  "MPMCQueue<int, DeterministicAtomic, true>(" +
595  folly::to<std::string>(cap) + ", " +
596  folly::to<std::string>(minCap) + ", " +
597  folly::to<std::string>(mult) + ")",
598  prods,
599  cons,
600  numOps,
601  *caller);
602  }
603 }
static const int seed
static std::function< size_t(size_t)> uniform(uint64_t seed)
std::enable_if<!std::is_array< T >::value, std::unique_ptr< T > >::type make_unique(Args &&...args)
Definition: Memory.h:259
string producerConsumerBench(Q &&queue, string qName, int numProducers, int numConsumers, int numOps, WriteMethodCaller< Q > &writer, bool ignoreContents=false)
template<bool Dynamic = false>
void runMtProdConsEmulatedFutex ( )

Definition at line 679 of file MPMCQueueTest.cpp.

References folly::INFO, folly::make_unique(), and PC_BENCH.

Referenced by TEST().

679  {
681 
682  int n = 100000;
683  vector<unique_ptr<WriteMethodCaller<QueueType>>> callers;
684  callers.emplace_back(std::make_unique<BlockingWriteCaller<QueueType>>());
685  callers.emplace_back(std::make_unique<WriteIfNotFullCaller<QueueType>>());
686  callers.emplace_back(std::make_unique<WriteCaller<QueueType>>());
687  callers.emplace_back(
689  callers.emplace_back(
691  for (const auto& caller : callers) {
692  LOG(INFO) << PC_BENCH((QueueType(10)), 1, 1, n, *caller);
693  LOG(INFO) << PC_BENCH((QueueType(10)), 10, 1, n, *caller);
694  LOG(INFO) << PC_BENCH((QueueType(10)), 1, 10, n, *caller);
695  LOG(INFO) << PC_BENCH((QueueType(10)), 10, 10, n, *caller);
696  LOG(INFO) << PC_BENCH((QueueType(10000)), 1, 1, n, *caller);
697  LOG(INFO) << PC_BENCH((QueueType(10000)), 10, 1, n, *caller);
698  LOG(INFO) << PC_BENCH((QueueType(10000)), 1, 10, n, *caller);
699  LOG(INFO) << PC_BENCH((QueueType(10000)), 10, 10, n, *caller);
700  LOG(INFO) << PC_BENCH((QueueType(100000)), 32, 100, n, *caller);
701  }
702 }
#define PC_BENCH(q, np, nc,...)
std::enable_if<!std::is_array< T >::value, std::unique_ptr< T > >::type make_unique(Args &&...args)
Definition: Memory.h:259
template<template< typename > class Atom, bool Dynamic = false>
uint64_t runNeverFailTest ( int  numThreads,
int  numOps 
)

Definition at line 733 of file MPMCQueueTest.cpp.

References Atom, folly::netops::bind(), EXPECT_EQ, EXPECT_TRUE, folly::test::DeterministicSchedule::join(), nowMicro(), sum(), folly::pushmi::detail::t, folly::test::DeterministicSchedule::thread(), threads, and uint64_t.

733  {
734  // always #enq >= #deq
735  MPMCQueue<int, Atom, Dynamic> cq(numThreads);
736 
737  uint64_t n = numOps;
738  auto beginMicro = nowMicro();
739 
740  vector<std::thread> threads(numThreads);
741  std::atomic<uint64_t> sum(0);
742  for (int t = 0; t < numThreads; ++t) {
744  runNeverFailThread<Atom, Dynamic>,
745  numThreads,
746  n,
747  std::ref(cq),
748  std::ref(sum),
749  t));
750  }
751  for (auto& t : threads) {
752  DSched::join(t);
753  }
754  EXPECT_TRUE(cq.isEmpty());
755  EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
756 
757  return nowMicro() - beginMicro;
758 }
std::atomic< int64_t > sum(0)
#define EXPECT_EQ(val1, val2)
Definition: gtest.h:1922
static std::thread thread(Func &&func, Args &&...args)
std::vector< std::thread::id > threads
uint64_t nowMicro()
#define EXPECT_TRUE(condition)
Definition: gtest.h:1859
int bind(NetworkSocket s, const sockaddr *name, socklen_t namelen)
Definition: NetOps.cpp:76
static void join(std::thread &child)
template<class Clock , template< typename > class Atom, bool Dynamic = false>
uint64_t runNeverFailTest ( int  numThreads,
int  numOps 
)

Definition at line 830 of file MPMCQueueTest.cpp.

References folly::netops::bind(), EXPECT_EQ, EXPECT_TRUE, folly::test::DeterministicSchedule::join(), nowMicro(), sum(), folly::pushmi::detail::t, folly::test::DeterministicSchedule::thread(), threads, and uint64_t.

830  {
831  // always #enq >= #deq
832  MPMCQueue<int, Atom, Dynamic> cq(numThreads);
833 
834  uint64_t n = numOps;
835  auto beginMicro = nowMicro();
836 
837  vector<std::thread> threads(numThreads);
838  std::atomic<uint64_t> sum(0);
839  for (int t = 0; t < numThreads; ++t) {
841  runNeverFailUntilThread<Clock, Atom, Dynamic>,
842  numThreads,
843  n,
844  std::ref(cq),
845  std::ref(sum),
846  t));
847  }
848  for (auto& t : threads) {
849  DSched::join(t);
850  }
851  EXPECT_TRUE(cq.isEmpty());
852  EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
853 
854  return nowMicro() - beginMicro;
855 }
std::atomic< int64_t > sum(0)
#define EXPECT_EQ(val1, val2)
Definition: gtest.h:1922
static std::thread thread(Func &&func, Args &&...args)
std::vector< std::thread::id > threads
uint64_t nowMicro()
#define EXPECT_TRUE(condition)
Definition: gtest.h:1859
int bind(NetworkSocket s, const sockaddr *name, socklen_t namelen)
Definition: NetOps.cpp:76
static void join(std::thread &child)
template<template< typename > class Atom, bool Dynamic = false>
void runNeverFailThread ( int  numThreads,
int  n,
MPMCQueue< int, Atom, Dynamic > &  cq,
std::atomic< uint64_t > &  sum,
int  t 
)

Definition at line 713 of file MPMCQueueTest.cpp.

References Atom, upload::dest, EXPECT_TRUE, i, and uint64_t.

718  {
719  uint64_t threadSum = 0;
720  for (int i = t; i < n; i += numThreads) {
721  // enq + deq
722  EXPECT_TRUE(cq.writeIfNotFull(i));
723 
724  int dest = -1;
725  EXPECT_TRUE(cq.readIfNotEmpty(dest));
726  EXPECT_TRUE(dest >= 0);
727  threadSum += dest;
728  }
729  sum += threadSum;
730 }
std::atomic< int64_t > sum(0)
dest
Definition: upload.py:394
#define EXPECT_TRUE(condition)
Definition: gtest.h:1859
template<class Clock , template< typename > class Atom, bool Dynamic>
void runNeverFailUntilThread ( int  numThreads,
int  n,
MPMCQueue< int, Atom, Dynamic > &  cq,
std::atomic< uint64_t > &  sum,
int  t 
)

Definition at line 809 of file MPMCQueueTest.cpp.

References Atom, upload::dest, EXPECT_TRUE, i, now(), and uint64_t.

814  {
815  uint64_t threadSum = 0;
816  for (int i = t; i < n; i += numThreads) {
817  // enq + deq
818  auto soon = Clock::now() + std::chrono::seconds(1);
819  EXPECT_TRUE(cq.tryWriteUntil(soon, i));
820 
821  int dest = -1;
822  EXPECT_TRUE(cq.readIfNotEmpty(dest));
823  EXPECT_TRUE(dest >= 0);
824  threadSum += dest;
825  }
826  sum += threadSum;
827 }
std::atomic< int64_t > sum(0)
std::chrono::steady_clock::time_point now()
dest
Definition: upload.py:394
#define EXPECT_TRUE(condition)
Definition: gtest.h:1859
template<typename R >
void runPerfectForwardingTest ( )

Definition at line 973 of file MPMCQueueTest.cpp.

References COPY_CONSTRUCTOR, DEFAULT_CONSTRUCTOR, DESTRUCTOR, EXPECT_EQ, EXPECT_FALSE, EXPECT_TRUE, i, lc_outstanding(), lc_snap(), LIFECYCLE_STEP, folly::gen::move, MOVE_CONSTRUCTOR, MOVE_OPERATOR, NOTHING, TWO_ARG_CONSTRUCTOR, and value.

973  {
974  lc_snap();
976 
977  {
978  // Non-dynamic only. False positive for dynamic.
979  MPMCQueue<Lifecycle<R>, std::atomic> queue(50);
981 
982  for (int pass = 0; pass < 10; ++pass) {
983  for (int i = 0; i < 10; ++i) {
984  queue.blockingWrite();
986 
987  queue.blockingWrite(1, "one");
989 
990  {
991  Lifecycle<R> src;
993  queue.blockingWrite(std::move(src));
995  }
997 
998  {
999  Lifecycle<R> src;
1001  queue.blockingWrite(src);
1003  }
1005 
1006  EXPECT_TRUE(queue.write());
1008  }
1009 
1010  EXPECT_EQ(queue.size(), 50);
1011  EXPECT_FALSE(queue.write(2, "two"));
1013 
1014  for (int i = 0; i < 50; ++i) {
1015  {
1016  Lifecycle<R> node;
1018 
1019  queue.blockingRead(node);
1020  if (R::value) {
1021  // relocatable, moved via memcpy
1023  } else {
1025  }
1026  }
1028  }
1029 
1030  EXPECT_EQ(queue.size(), 0);
1031  }
1032 
1033  // put one element back before destruction
1034  {
1035  Lifecycle<R> src(3, "three");
1037  queue.write(std::move(src));
1039  }
1040  LIFECYCLE_STEP(DESTRUCTOR); // destroy src
1041  }
1042  LIFECYCLE_STEP(DESTRUCTOR); // destroy queue
1043 
1044  EXPECT_EQ(lc_outstanding(), 0);
1045 }
#define EXPECT_EQ(val1, val2)
Definition: gtest.h:1922
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
#define LIFECYCLE_STEP(...)
static int lc_outstanding()
static const char *const value
Definition: Conv.cpp:50
static void lc_snap()
#define EXPECT_TRUE(condition)
Definition: gtest.h:1859
#define EXPECT_FALSE(condition)
Definition: gtest.h:1862
template<template< typename > class Atom, bool Dynamic = false>
void runTryEnqDeqTest ( int  numThreads,
int  numOps 
)

Definition at line 272 of file MPMCQueueTest.cpp.

References folly::netops::bind(), EXPECT_EQ, EXPECT_TRUE, folly::test::DeterministicSchedule::join(), sum(), folly::pushmi::detail::t, folly::test::DeterministicSchedule::thread(), threads, and uint64_t.

Referenced by TEST().

272  {
273  // write and read aren't linearizable, so we don't have
274  // hard guarantees on their individual behavior. We can still test
275  // correctness in aggregate
276  MPMCQueue<int, Atom, Dynamic> cq(numThreads);
277 
278  uint64_t n = numOps;
279  vector<std::thread> threads(numThreads);
280  std::atomic<uint64_t> sum(0);
281  for (int t = 0; t < numThreads; ++t) {
283  runTryEnqDeqThread<Atom, Dynamic>,
284  numThreads,
285  n,
286  std::ref(cq),
287  std::ref(sum),
288  t));
289  }
290  for (auto& t : threads) {
291  DSched::join(t);
292  }
293  EXPECT_TRUE(cq.isEmpty());
294  EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
295 }
std::atomic< int64_t > sum(0)
#define EXPECT_EQ(val1, val2)
Definition: gtest.h:1922
static std::thread thread(Func &&func, Args &&...args)
std::vector< std::thread::id > threads
#define EXPECT_TRUE(condition)
Definition: gtest.h:1859
int bind(NetworkSocket s, const sockaddr *name, socklen_t namelen)
Definition: NetOps.cpp:76
static void join(std::thread &child)
template<template< typename > class Atom, bool Dynamic = false>
void runTryEnqDeqThread ( int  numThreads,
int  n,
MPMCQueue< int, Atom, Dynamic > &  cq,
std::atomic< uint64_t > &  sum,
int  t 
)

Definition at line 245 of file MPMCQueueTest.cpp.

References Atom, folly::pushmi::detail::t, and uint64_t.

250  {
251  uint64_t threadSum = 0;
252  int src = t;
253  // received doesn't reflect any actual values, we just start with
254  // t and increment by numThreads to get the rounding of termination
255  // correct if numThreads doesn't evenly divide numOps
256  int received = t;
257  while (src < n || received < n) {
258  if (src < n && cq.write(src)) {
259  src += numThreads;
260  }
261 
262  int dst;
263  if (received < n && cq.read(dst)) {
264  received += numThreads;
265  threadSum += dst;
266  }
267  }
268  sum += threadSum;
269 }
std::atomic< int64_t > sum(0)
template<typename T >
void setFromEnv ( T var,
const char *  envvar 
)

Definition at line 614 of file MPMCQueueTest.cpp.

Referenced by runMtProdCons(), and TEST().

614  {
615  char* str = std::getenv(envvar);
616  if (str) {
617  var = atoi(str);
618  }
619 }
TEST ( MPMCQueue  ,
sequencer   
)

Definition at line 92 of file MPMCQueueTest.cpp.

92  {
93  run_mt_sequencer_test<std::atomic>(1, 100, 0);
94  run_mt_sequencer_test<std::atomic>(2, 100000, -100);
95  run_mt_sequencer_test<std::atomic>(100, 10000, -100);
96 }
TEST ( MPMCQueue  ,
sequencer_emulated_futex   
)

Definition at line 98 of file MPMCQueueTest.cpp.

98  {
99  run_mt_sequencer_test<EmulatedFutexAtomic>(1, 100, 0);
100  run_mt_sequencer_test<EmulatedFutexAtomic>(2, 100000, -100);
101  run_mt_sequencer_test<EmulatedFutexAtomic>(100, 10000, -100);
102 }
TEST ( MPMCQueue  ,
sequencer_deterministic   
)

Definition at line 104 of file MPMCQueueTest.cpp.

References folly::test::DeterministicSchedule::uniform().

104  {
105  DSched sched(DSched::uniform(0));
106  run_mt_sequencer_test<DeterministicAtomic>(1, 100, -50);
107  run_mt_sequencer_test<DeterministicAtomic>(2, 10000, (1 << 29) - 100);
108  run_mt_sequencer_test<DeterministicAtomic>(10, 1000, -100);
109 }
static std::function< size_t(size_t)> uniform(uint64_t seed)
TEST ( MPMCQueue  ,
lots_of_element_types   
)

Definition at line 152 of file MPMCQueueTest.cpp.

References RefCounted::active_instances, EXPECT_EQ, and runElementTypeTest().

152  {
153  runElementTypeTest(10);
154  runElementTypeTest(string("abc"));
155  runElementTypeTest(std::make_pair(10, string("def")));
156  runElementTypeTest(vector<string>{{"abc"}});
157  runElementTypeTest(std::make_shared<char>('a'));
158  runElementTypeTest(std::make_unique<char>('a'));
159  runElementTypeTest(boost::intrusive_ptr<RefCounted>(new RefCounted));
161 }
void runElementTypeTest(T &&src)
#define EXPECT_EQ(val1, val2)
Definition: gtest.h:1922
static FOLLY_TLS int active_instances
TEST ( MPMCQueue  ,
lots_of_element_types_dynamic   
)

Definition at line 163 of file MPMCQueueTest.cpp.

References RefCounted::active_instances, EXPECT_EQ, RefCounted::RefCounted(), and string.

163  {
164  runElementTypeTest<true>(10);
165  runElementTypeTest<true>(string("abc"));
166  runElementTypeTest<true>(std::make_pair(10, string("def")));
167  runElementTypeTest<true>(vector<string>{{"abc"}});
168  runElementTypeTest<true>(std::make_shared<char>('a'));
169  runElementTypeTest<true>(std::make_unique<char>('a'));
170  runElementTypeTest<true>(boost::intrusive_ptr<RefCounted>(new RefCounted));
172 }
#define EXPECT_EQ(val1, val2)
Definition: gtest.h:1922
static FOLLY_TLS int active_instances
const char * string
Definition: Conv.cpp:212
TEST ( MPMCQueue  ,
single_thread_enqdeq   
)

Definition at line 174 of file MPMCQueueTest.cpp.

References upload::dest, EXPECT_EQ, EXPECT_FALSE, EXPECT_TRUE, and i.

174  {
175  // Non-dynamic version only.
176  // False positive for dynamic version. Capacity can be temporarily
177  // higher than specified.
178  MPMCQueue<int> cq(10);
179 
180  for (int pass = 0; pass < 10; ++pass) {
181  for (int i = 0; i < 10; ++i) {
182  EXPECT_TRUE(cq.write(i));
183  }
184  EXPECT_FALSE(cq.write(-1));
185  EXPECT_FALSE(cq.isEmpty());
186  EXPECT_EQ(cq.size(), 10);
187 
188  for (int i = 0; i < 5; ++i) {
189  int dest = -1;
190  EXPECT_TRUE(cq.read(dest));
191  EXPECT_EQ(dest, i);
192  }
193  for (int i = 5; i < 10; ++i) {
194  int dest = -1;
195  cq.blockingRead(dest);
196  EXPECT_EQ(dest, i);
197  }
198  int dest = -1;
199  EXPECT_FALSE(cq.read(dest));
200  EXPECT_EQ(dest, -1);
201 
202  EXPECT_TRUE(cq.isEmpty());
203  EXPECT_EQ(cq.size(), 0);
204  }
205 }
#define EXPECT_EQ(val1, val2)
Definition: gtest.h:1922
dest
Definition: upload.py:394
#define EXPECT_TRUE(condition)
Definition: gtest.h:1859
#define EXPECT_FALSE(condition)
Definition: gtest.h:1862
TEST ( MPMCQueue  ,
tryenq_capacity_test   
)

Definition at line 207 of file MPMCQueueTest.cpp.

References EXPECT_FALSE, EXPECT_TRUE, and i.

207  {
208  // Non-dynamic version only.
209  // False positive for dynamic version. Capacity can be temporarily
210  // higher than specified.
211  for (size_t cap = 1; cap < 100; ++cap) {
212  MPMCQueue<int> cq(cap);
213  for (size_t i = 0; i < cap; ++i) {
214  EXPECT_TRUE(cq.write(i));
215  }
216  EXPECT_FALSE(cq.write(100));
217  }
218 }
#define EXPECT_TRUE(condition)
Definition: gtest.h:1859
#define EXPECT_FALSE(condition)
Definition: gtest.h:1862
TEST ( MPMCQueue  ,
enq_capacity_test   
)

Definition at line 220 of file MPMCQueueTest.cpp.

References Atom, dummy(), EXPECT_EQ, i, folly::pushmi::detail::t, and folly::when().

220  {
221  // Non-dynamic version only.
222  // False positive for dynamic version. Capacity can be temporarily
223  // higher than specified.
224  for (auto cap : {1, 100, 10000}) {
225  MPMCQueue<int> cq(cap);
226  for (int i = 0; i < cap; ++i) {
227  cq.blockingWrite(i);
228  }
229  int t = 0;
230  int when;
231  auto thr = std::thread([&] {
232  cq.blockingWrite(100);
233  when = t;
234  });
235  usleep(2000);
236  t = 1;
237  int dummy;
238  cq.blockingRead(dummy);
239  thr.join();
240  EXPECT_EQ(when, 1);
241  }
242 }
#define EXPECT_EQ(val1, val2)
Definition: gtest.h:1922
void dummy()
Future< Unit > when(bool p, F &&thunk)
Definition: Future-inl.h:2330
TEST ( MPMCQueue  ,
mt_try_enq_deq   
)

Definition at line 297 of file MPMCQueueTest.cpp.

297  {
298  int nts[] = {1, 3, 100};
299 
300  int n = 100000;
301  for (int nt : nts) {
302  runTryEnqDeqTest<std::atomic>(nt, n);
303  }
304 }
TEST ( MPMCQueue  ,
mt_try_enq_deq_dynamic   
)

Definition at line 306 of file MPMCQueueTest.cpp.

References runTryEnqDeqTest().

306  {
307  int nts[] = {1, 3, 100};
308 
309  int n = 100000;
310  for (int nt : nts) {
311  runTryEnqDeqTest<std::atomic, /* Dynamic = */ true>(nt, n);
312  }
313 }
void runTryEnqDeqTest(int numThreads, int numOps)
TEST ( MPMCQueue  ,
mt_try_enq_deq_emulated_futex   
)

Definition at line 315 of file MPMCQueueTest.cpp.

315  {
316  int nts[] = {1, 3, 100};
317 
318  int n = 100000;
319  for (int nt : nts) {
320  runTryEnqDeqTest<EmulatedFutexAtomic>(nt, n);
321  }
322 }
TEST ( MPMCQueue  ,
mt_try_enq_deq_emulated_futex_dynamic   
)

Definition at line 324 of file MPMCQueueTest.cpp.

References runTryEnqDeqTest().

324  {
325  int nts[] = {1, 3, 100};
326 
327  int n = 100000;
328  for (int nt : nts) {
329  runTryEnqDeqTest<EmulatedFutexAtomic, /* Dynamic = */ true>(nt, n);
330  }
331 }
void runTryEnqDeqTest(int numThreads, int numOps)
TEST ( MPMCQueue  ,
mt_try_enq_deq_deterministic   
)

Definition at line 333 of file MPMCQueueTest.cpp.

References folly::INFO, runTryEnqDeqTest(), seed, folly::test::DeterministicSchedule::uniform(), and folly::test::DeterministicSchedule::uniformSubset().

333  {
334  int nts[] = {3, 10};
335 
336  long seed = 0;
337  LOG(INFO) << "using seed " << seed;
338 
339  int n = 1000;
340  for (int nt : nts) {
341  {
342  DSched sched(DSched::uniform(seed));
343  runTryEnqDeqTest<DeterministicAtomic>(nt, n);
344  }
345  {
346  DSched sched(DSched::uniformSubset(seed, 2));
347  runTryEnqDeqTest<DeterministicAtomic>(nt, n);
348  }
349  {
350  DSched sched(DSched::uniform(seed));
351  runTryEnqDeqTest<DeterministicAtomic, /*Dynamic = */ true>(nt, n);
352  }
353  {
354  DSched sched(DSched::uniformSubset(seed, 2));
355  runTryEnqDeqTest<DeterministicAtomic, /*Dynamic = */ true>(nt, n);
356  }
357  }
358 }
static const int seed
DeterministicAtomicImpl< T, DeterministicSchedule > DeterministicAtomic
static std::function< size_t(size_t)> uniform(uint64_t seed)
static std::function< size_t(size_t)> uniformSubset(uint64_t seed, size_t n=2, size_t m=64)
void runTryEnqDeqTest(int numThreads, int numOps)
TEST ( MPMCQueue  ,
mt_prod_cons_deterministic   
)

Definition at line 605 of file MPMCQueueTest.cpp.

References runMtProdConsDeterministic().

605  {
607 }
void runMtProdConsDeterministic(long seed)
TEST ( MPMCQueue  ,
mt_prod_cons_deterministic_dynamic   
)

Definition at line 609 of file MPMCQueueTest.cpp.

609  {
610  runMtProdConsDeterministic<true>(0);
611 }
TEST ( MPMCQueue  ,
mt_prod_cons_deterministic_dynamic_with_arguments   
)

Definition at line 621 of file MPMCQueueTest.cpp.

References runMtProdConsDeterministicDynamic(), seed, setFromEnv(), and uint32_t.

621  {
622  long seed = 0;
623  uint32_t prods = 10;
624  uint32_t cons = 10;
625  uint32_t numOps = 1000;
626  size_t cap = 10000;
627  size_t minCap = 9;
628  size_t mult = 3;
629  setFromEnv(seed, "SEED");
630  setFromEnv(prods, "PRODS");
631  setFromEnv(cons, "CONS");
632  setFromEnv(numOps, "NUM_OPS");
633  setFromEnv(cap, "CAP");
634  setFromEnv(minCap, "MIN_CAP");
635  setFromEnv(mult, "MULT");
637  seed, prods, cons, numOps, cap, minCap, mult);
638 }
void runMtProdConsDeterministicDynamic(long seed, uint32_t prods, uint32_t cons, uint32_t numOps, size_t cap, size_t minCap, size_t mult)
static const int seed
void setFromEnv(T &var, const char *envvar)
TEST ( MPMCQueue  ,
mt_prod_cons   
)

Definition at line 670 of file MPMCQueueTest.cpp.

References runMtProdCons().

670  {
671  runMtProdCons();
672 }
void runMtProdCons()
TEST ( MPMCQueue  ,
mt_prod_cons_dynamic   
)

Definition at line 674 of file MPMCQueueTest.cpp.

References runMtProdCons().

674  {
675  runMtProdCons</* Dynamic = */ true>();
676 }
void runMtProdCons()
TEST ( MPMCQueue  ,
mt_prod_cons_emulated_futex   
)

Definition at line 704 of file MPMCQueueTest.cpp.

References runMtProdConsEmulatedFutex().

704  {
706 }
void runMtProdConsEmulatedFutex()
TEST ( MPMCQueue  ,
mt_prod_cons_emulated_futex_dynamic   
)

Definition at line 708 of file MPMCQueueTest.cpp.

References Atom, and runMtProdConsEmulatedFutex().

708  {
709  runMtProdConsEmulatedFutex</* Dynamic = */ true>();
710 }
void runMtProdConsEmulatedFutex()
TEST ( MPMCQueue  ,
mt_never_fail   
)

Definition at line 774 of file MPMCQueueTest.cpp.

774  {
775  std::vector<int> nts{1, 3, 100};
776  int n = 100000;
777  runMtNeverFail<std::atomic>(nts, n);
778 }
TEST ( MPMCQueue  ,
mt_never_fail_emulated_futex   
)

Definition at line 780 of file MPMCQueueTest.cpp.

780  {
781  std::vector<int> nts{1, 3, 100};
782  int n = 100000;
783  runMtNeverFail<EmulatedFutexAtomic>(nts, n);
784 }
TEST ( MPMCQueue  ,
mt_never_fail_deterministic   
)

Definition at line 801 of file MPMCQueueTest.cpp.

References Atom, runMtNeverFailDeterministic(), and seed.

801  {
802  std::vector<int> nts{3, 10};
803  long seed = 0; // nowMicro() % 10000;
804  int n = 1000;
805  runMtNeverFailDeterministic(nts, n, seed);
806 }
static const int seed
void runMtNeverFailDeterministic(std::vector< int > &nts, int n, long seed)
TEST ( MPMCQueue  ,
mt_never_fail_until_system   
)

Definition at line 868 of file MPMCQueueTest.cpp.

References runMtNeverFailUntilSystem().

868  {
869  std::vector<int> nts{1, 3, 100};
870  int n = 100000;
872 }
void runMtNeverFailUntilSystem(std::vector< int > &nts, int n)
TEST ( MPMCQueue  ,
mt_never_fail_until_steady   
)

Definition at line 885 of file MPMCQueueTest.cpp.

References runMtNeverFailUntilSteady().

885  {
886  std::vector<int> nts{1, 3, 100};
887  int n = 100000;
889 }
void runMtNeverFailUntilSteady(std::vector< int > &nts, int n)
TEST ( MPMCQueue  ,
perfect_forwarding   
)

Definition at line 1047 of file MPMCQueueTest.cpp.

1047  {
1048  runPerfectForwardingTest<std::false_type>();
1049 }
TEST ( MPMCQueue  ,
perfect_forwarding_relocatable   
)

Definition at line 1051 of file MPMCQueueTest.cpp.

1051  {
1052  runPerfectForwardingTest<std::true_type>();
1053 }
TEST ( MPMCQueue  ,
queue_moving   
)

Definition at line 1120 of file MPMCQueueTest.cpp.

References run_queue_moving().

1120  {
1121  run_queue_moving();
1122 }
void run_queue_moving()
TEST ( MPMCQueue  ,
queue_moving_dynamic   
)

Definition at line 1124 of file MPMCQueueTest.cpp.

1124  {
1125  run_queue_moving<true>();
1126 }
TEST ( MPMCQueue  ,
explicit_zero_capacity_fail   
)

Definition at line 1128 of file MPMCQueueTest.cpp.

References ASSERT_THROW.

1128  {
1129  ASSERT_THROW(MPMCQueue<int> cq(0), std::invalid_argument);
1130 
1131  using DynamicMPMCQueueInt = MPMCQueue<int, std::atomic, true>;
1132  ASSERT_THROW(DynamicMPMCQueueInt cq(0), std::invalid_argument);
1133 }
#define ASSERT_THROW(statement, expected_exception)
Definition: gtest.h:1849
TEST ( MPMCQueue  ,
try_read_until   
)

Definition at line 1208 of file MPMCQueueTest.cpp.

1208  {
1209  testTryReadUntil<false>();
1210 }
TEST ( MPMCQueue  ,
try_read_until_dynamic   
)

Definition at line 1212 of file MPMCQueueTest.cpp.

1212  {
1213  testTryReadUntil<true>();
1214 }
TEST ( MPMCQueue  ,
try_write_until   
)

Definition at line 1216 of file MPMCQueueTest.cpp.

1216  {
1217  testTryWriteUntil<false>();
1218 }
TEST ( MPMCQueue  ,
try_write_until_dynamic   
)

Definition at line 1220 of file MPMCQueueTest.cpp.

1220  {
1221  testTryWriteUntil<true>();
1222 }
TEST ( MPMCQueue  ,
try_write_until_timeout   
)

Definition at line 1232 of file MPMCQueueTest.cpp.

1232  {
1234  testTimeout<false>(queue);
1235 }
TEST ( MPMCQueue  ,
must_fail_try_write_until_dynamic   
)

Definition at line 1237 of file MPMCQueueTest.cpp.

1237  {
1239  testTimeout<true>(queue);
1240 }
template<bool Dynamic>
void testTimeout ( MPMCQueue< int, std::atomic, Dynamic > &  q)

Definition at line 1225 of file MPMCQueueTest.cpp.

References now().

1225  {
1226  CHECK(q.write(1));
1227  /* The following must not block forever */
1228  q.tryWriteUntil(
1229  std::chrono::system_clock::now() + std::chrono::microseconds(10000), 2);
1230 }
std::chrono::steady_clock::time_point now()
template<bool Dynamic>
void testTryReadUntil ( )

Definition at line 1136 of file MPMCQueueTest.cpp.

References b, folly::custom_stop_watch< Clock, Duration >::elapsed(), EXPECT_EQ, EXPECT_FALSE, EXPECT_TRUE, folly::custom_stop_watch< Clock, Duration >::getCheckpoint(), i, threads, and folly::detail::distributed_mutex::wait().

1136  {
1138 
1139  const auto wait = std::chrono::milliseconds(100);
1140  stop_watch<> watch;
1141  bool rets[2];
1142  int vals[2];
1143  std::vector<std::thread> threads;
1144  boost::barrier b{3};
1145  for (int i = 0; i < 2; i++) {
1146  threads.emplace_back([&, i] {
1147  b.wait();
1148  rets[i] = q.tryReadUntil(watch.getCheckpoint() + wait, vals[i]);
1149  });
1150  }
1151 
1152  b.wait();
1153  EXPECT_TRUE(q.write(42));
1154 
1155  for (int i = 0; i < 2; i++) {
1156  threads[i].join();
1157  }
1158 
1159  for (int i = 0; i < 2; i++) {
1160  int other = (i + 1) % 2;
1161  if (rets[i]) {
1162  EXPECT_EQ(42, vals[i]);
1163  EXPECT_FALSE(rets[other]);
1164  }
1165  }
1166 
1167  EXPECT_TRUE(watch.elapsed(wait));
1168 }
char b
duration elapsed() const
Definition: stop_watch.h:168
#define EXPECT_EQ(val1, val2)
Definition: gtest.h:1922
std::vector< std::thread::id > threads
bool wait(Waiter *waiter, bool shouldSleep, Waiter *&next)
#define EXPECT_TRUE(condition)
Definition: gtest.h:1859
#define EXPECT_FALSE(condition)
Definition: gtest.h:1862
clock_type::time_point getCheckpoint() const
Definition: stop_watch.h:257
template<bool Dynamic>
void testTryWriteUntil ( )

Definition at line 1171 of file MPMCQueueTest.cpp.

References b, folly::custom_stop_watch< Clock, Duration >::elapsed(), EXPECT_EQ, EXPECT_FALSE, EXPECT_TRUE, folly::custom_stop_watch< Clock, Duration >::getCheckpoint(), i, threads, folly::detail::distributed_mutex::wait(), and x.

1171  {
1173  EXPECT_TRUE(q.write(42));
1174 
1175  const auto wait = std::chrono::milliseconds(100);
1176  stop_watch<> watch;
1177  bool rets[2];
1178  std::vector<std::thread> threads;
1179  boost::barrier b{3};
1180  for (int i = 0; i < 2; i++) {
1181  threads.emplace_back([&, i] {
1182  b.wait();
1183  rets[i] = q.tryWriteUntil(watch.getCheckpoint() + wait, i);
1184  });
1185  }
1186 
1187  b.wait();
1188  int x;
1189  EXPECT_TRUE(q.read(x));
1190  EXPECT_EQ(42, x);
1191 
1192  for (int i = 0; i < 2; i++) {
1193  threads[i].join();
1194  }
1195  EXPECT_TRUE(q.read(x));
1196 
1197  for (int i = 0; i < 2; i++) {
1198  int other = (i + 1) % 2;
1199  if (rets[i]) {
1200  EXPECT_EQ(i, x);
1201  EXPECT_FALSE(rets[other]);
1202  }
1203  }
1204 
1205  EXPECT_TRUE(watch.elapsed(wait));
1206 }
char b
duration elapsed() const
Definition: stop_watch.h:168
#define EXPECT_EQ(val1, val2)
Definition: gtest.h:1922
const int x
std::vector< std::thread::id > threads
bool wait(Waiter *waiter, bool shouldSleep, Waiter *&next)
#define EXPECT_TRUE(condition)
Definition: gtest.h:1859
#define EXPECT_FALSE(condition)
Definition: gtest.h:1862
clock_type::time_point getCheckpoint() const
Definition: stop_watch.h:257

Variable Documentation

FOLLY_TLS int lc_prev[MAX_LIFECYCLE_EVENT]
static

Definition at line 904 of file MPMCQueueTest.cpp.

Referenced by lc_snap(), and lc_step().