27 #include <boost/intrusive_ptr.hpp> 28 #include <boost/thread/barrier.hpp> 36 using namespace
folly;
40 using
std::unique_ptr;
47 typedef DeterministicSchedule
DSched;
49 template <template <typename> class
Atom>
58 for (
int op = i;
op < numOps;
op += numThreads) {
59 seq.waitForTurn(init +
op, spinThreshold, (
op % 32) == 0);
62 seq.completeTurn(init +
op);
66 template <
template <
typename>
class Atom>
69 Atom<uint32_t> spinThreshold(0);
72 vector<std::thread>
threads(numThreads);
73 for (
int i = 0;
i < numThreads; ++
i) {
75 run_mt_sequencer_thread<Atom>,
80 std::ref(spinThreshold),
85 for (
auto& thr : threads) {
93 run_mt_sequencer_test<std::atomic>(1, 100, 0);
94 run_mt_sequencer_test<std::atomic>(2, 100000, -100);
95 run_mt_sequencer_test<std::atomic>(100, 10000, -100);
99 run_mt_sequencer_test<EmulatedFutexAtomic>(1, 100, 0);
100 run_mt_sequencer_test<EmulatedFutexAtomic>(2, 100000, -100);
101 run_mt_sequencer_test<EmulatedFutexAtomic>(100, 10000, -100);
106 run_mt_sequencer_test<DeterministicAtomic>(1, 100, -50);
107 run_mt_sequencer_test<DeterministicAtomic>(2, 10000, (1 << 29) - 100);
108 run_mt_sequencer_test<DeterministicAtomic>(10, 1000, -100);
111 template <
bool Dynamic = false,
typename T>
114 cq.blockingWrite(std::forward<T>(src));
116 cq.blockingRead(dest);
130 mutable std::atomic<int>
rc;
147 if (--(p->
rc) == 0) {
164 runElementTypeTest<true>(10);
165 runElementTypeTest<true>(
string(
"abc"));
166 runElementTypeTest<true>(std::make_pair(10,
string(
"def")));
167 runElementTypeTest<true>(vector<string>{{
"abc"}});
168 runElementTypeTest<true>(std::make_shared<char>(
'a'));
169 runElementTypeTest<true>(std::make_unique<char>(
'a'));
170 runElementTypeTest<true>(boost::intrusive_ptr<RefCounted>(
new RefCounted));
180 for (
int pass = 0; pass < 10; ++pass) {
181 for (
int i = 0;
i < 10; ++
i) {
188 for (
int i = 0;
i < 5; ++
i) {
193 for (
int i = 5;
i < 10; ++
i) {
195 cq.blockingRead(dest);
211 for (
size_t cap = 1; cap < 100; ++cap) {
213 for (
size_t i = 0;
i < cap; ++
i) {
224 for (
auto cap : {1, 100, 10000}) {
226 for (
int i = 0;
i < cap; ++
i) {
231 auto thr = std::thread([&] {
232 cq.blockingWrite(100);
238 cq.blockingRead(dummy);
244 template <
template <
typename>
class Atom,
bool Dynamic =
false>
249 std::atomic<uint64_t>&
sum,
257 while (src < n || received < n) {
258 if (src < n && cq.write(src)) {
263 if (received < n && cq.read(dst)) {
264 received += numThreads;
271 template <
template <
typename>
class Atom,
bool Dynamic =
false>
279 vector<std::thread>
threads(numThreads);
280 std::atomic<uint64_t>
sum(0);
281 for (
int t = 0;
t < numThreads; ++
t) {
283 runTryEnqDeqThread<Atom, Dynamic>,
290 for (
auto&
t : threads) {
298 int nts[] = {1, 3, 100};
302 runTryEnqDeqTest<std::atomic>(nt, n);
307 int nts[] = {1, 3, 100};
316 int nts[] = {1, 3, 100};
320 runTryEnqDeqTest<EmulatedFutexAtomic>(nt, n);
325 int nts[] = {1, 3, 100};
343 runTryEnqDeqTest<DeterministicAtomic>(nt, n);
347 runTryEnqDeqTest<DeterministicAtomic>(nt, n);
362 gettimeofday(&tv,
nullptr);
363 return static_cast<uint64_t>(tv.tv_sec) * 1000000 + tv.tv_usec;
366 template <
typename Q>
370 virtual bool callWrite(Q& q,
int i) = 0;
371 virtual string methodName() = 0;
374 template <
typename Q>
381 return "blockingWrite";
385 template <
typename Q>
388 return q.writeIfNotFull(i);
391 return "writeIfNotFull";
395 template <
typename Q>
407 class Clock = steady_clock,
408 class Duration =
typename Clock::duration>
414 return q.tryWriteUntil(then, i);
418 "tryWriteUntil({}ms)",
419 std::chrono::duration_cast<milliseconds>(duration_).
count());
423 template <
typename Q>
431 bool ignoreContents =
false) {
434 struct rusage beginUsage;
435 getrusage(RUSAGE_SELF, &beginUsage);
440 std::atomic<uint64_t>
sum(0);
441 std::atomic<uint64_t>
failed(0);
443 vector<std::thread> producers(numProducers);
444 for (
int t = 0;
t < numProducers; ++
t) {
446 for (
int i =
t;
i < numOps;
i += numProducers) {
454 vector<std::thread> consumers(numConsumers);
455 for (
int t = 0;
t < numConsumers; ++
t) {
458 for (
int i =
t;
i < numOps;
i += numConsumers) {
460 q.blockingRead(dest);
468 for (
auto&
t : producers) {
471 for (
auto&
t : consumers) {
474 if (!ignoreContents) {
480 struct rusage endUsage;
481 getrusage(RUSAGE_SELF, &endUsage);
483 uint64_t nanosPer = (1000 * (endMicro - beginMicro)) / n;
484 long csw = endUsage.ru_nvcsw + endUsage.ru_nivcsw -
485 (beginUsage.ru_nvcsw + beginUsage.ru_nivcsw);
487 size_t allocated = q.allocatedCapacity();
490 "{}, {} {} producers, {} consumers => {} nanos/handoff, {} csw / {} " 491 "handoff, {} failures, {} allocated",
503 template <
bool Dynamic = false>
510 vector<unique_ptr<WriteMethodCaller<QueueType>>> callers;
514 callers.emplace_back(
516 callers.emplace_back(
520 for (
const auto& caller : callers) {
524 "MPMCQueue<int, DeterministicAtomic, Dynamic>(" +
525 folly::to<std::string>(cap) +
")",
533 "MPMCQueue<int, DeterministicAtomic, Dynamic>(" +
534 folly::to<std::string>(cap) +
")",
542 "MPMCQueue<int, DeterministicAtomic, Dynamic>(" +
543 folly::to<std::string>(cap) +
")",
551 "MPMCQueue<int, DeterministicAtomic, Dynamic>(" +
552 folly::to<std::string>(cap) +
")",
560 "MPMCQueue<int, DeterministicAtomic, Dynamic>(" +
561 folly::to<std::string>(cap) +
")",
582 vector<unique_ptr<WriteMethodCaller<QueueType>>> callers;
586 callers.emplace_back(
588 callers.emplace_back(
591 for (
const auto& caller : callers) {
594 "MPMCQueue<int, DeterministicAtomic, true>(" +
595 folly::to<std::string>(cap) +
", " +
596 folly::to<std::string>(minCap) +
", " +
597 folly::to<std::string>(mult) +
")",
610 runMtProdConsDeterministic<true>(0);
613 template <
typename T>
615 char* str = std::getenv(envvar);
637 seed, prods, cons, numOps, cap, minCap, mult);
640 #define PC_BENCH(q, np, nc, ...) \ 641 producerConsumerBench(q, #q, (np), (nc), __VA_ARGS__) 643 template <
bool Dynamic = false>
649 vector<unique_ptr<WriteMethodCaller<QueueType>>> callers;
653 callers.emplace_back(
655 callers.emplace_back(
657 for (
const auto& caller : callers) {
658 LOG(
INFO) <<
PC_BENCH((QueueType(10)), 1, 1, n, *caller);
659 LOG(
INFO) <<
PC_BENCH((QueueType(10)), 10, 1, n, *caller);
660 LOG(
INFO) <<
PC_BENCH((QueueType(10)), 1, 10, n, *caller);
661 LOG(
INFO) <<
PC_BENCH((QueueType(10)), 10, 10, n, *caller);
662 LOG(
INFO) <<
PC_BENCH((QueueType(10000)), 1, 1, n, *caller);
663 LOG(
INFO) <<
PC_BENCH((QueueType(10000)), 10, 1, n, *caller);
664 LOG(
INFO) <<
PC_BENCH((QueueType(10000)), 1, 10, n, *caller);
665 LOG(
INFO) <<
PC_BENCH((QueueType(10000)), 10, 10, n, *caller);
666 LOG(
INFO) <<
PC_BENCH((QueueType(100000)), 32, 100, n, *caller);
678 template <
bool Dynamic = false>
683 vector<unique_ptr<WriteMethodCaller<QueueType>>> callers;
687 callers.emplace_back(
689 callers.emplace_back(
691 for (
const auto& caller : callers) {
692 LOG(
INFO) <<
PC_BENCH((QueueType(10)), 1, 1, n, *caller);
693 LOG(
INFO) <<
PC_BENCH((QueueType(10)), 10, 1, n, *caller);
694 LOG(
INFO) <<
PC_BENCH((QueueType(10)), 1, 10, n, *caller);
695 LOG(
INFO) <<
PC_BENCH((QueueType(10)), 10, 10, n, *caller);
696 LOG(
INFO) <<
PC_BENCH((QueueType(10000)), 1, 1, n, *caller);
697 LOG(
INFO) <<
PC_BENCH((QueueType(10000)), 10, 1, n, *caller);
698 LOG(
INFO) <<
PC_BENCH((QueueType(10000)), 1, 10, n, *caller);
699 LOG(
INFO) <<
PC_BENCH((QueueType(10000)), 10, 10, n, *caller);
700 LOG(
INFO) <<
PC_BENCH((QueueType(100000)), 32, 100, n, *caller);
712 template <
template <
typename>
class Atom,
bool Dynamic =
false>
717 std::atomic<uint64_t>&
sum,
720 for (
int i = t;
i < n;
i += numThreads) {
732 template <
template <
typename>
class Atom,
bool Dynamic =
false>
740 vector<std::thread>
threads(numThreads);
741 std::atomic<uint64_t>
sum(0);
742 for (
int t = 0;
t < numThreads; ++
t) {
744 runNeverFailThread<Atom, Dynamic>,
751 for (
auto&
t : threads) {
760 template <
template <
typename>
class Atom,
bool Dynamic =
false>
763 uint64_t elapsed = runNeverFailTest<Atom, Dynamic>(nt, n);
764 LOG(
INFO) << (elapsed * 1000.0) / (n * 2) <<
" nanos per op with " << nt
775 std::vector<int> nts{1, 3, 100};
777 runMtNeverFail<std::atomic>(nts, n);
781 std::vector<int> nts{1, 3, 100};
783 runMtNeverFail<EmulatedFutexAtomic>(nts, n);
786 template <
bool Dynamic = false>
792 runNeverFailTest<DeterministicAtomic, Dynamic>(nt, n);
796 runNeverFailTest<DeterministicAtomic, Dynamic>(nt, n);
802 std::vector<int> nts{3, 10};
808 template <
class Clock,
template <
typename>
class Atom,
bool Dynamic>
813 std::atomic<uint64_t>&
sum,
816 for (
int i = t;
i < n;
i += numThreads) {
818 auto soon =
Clock::now() + std::chrono::seconds(1);
829 template <
class Clock,
template <
typename>
class Atom,
bool Dynamic =
false>
837 vector<std::thread>
threads(numThreads);
838 std::atomic<uint64_t>
sum(0);
839 for (
int t = 0;
t < numThreads; ++
t) {
841 runNeverFailUntilThread<Clock, Atom, Dynamic>,
848 for (
auto&
t : threads) {
857 template <
bool Dynamic = false>
861 runNeverFailTest<std::chrono::system_clock, std::atomic, Dynamic>(
863 LOG(
INFO) << (elapsed * 1000.0) / (n * 2) <<
" nanos per op with " << nt
869 std::vector<int> nts{1, 3, 100};
874 template <
bool Dynamic = false>
878 runNeverFailTest<std::chrono::steady_clock, std::atomic, Dynamic>(
880 LOG(
INFO) << (elapsed * 1000.0) / (n * 2) <<
" nanos per op with " << nt
886 std::vector<int> nts{1, 3, 100};
918 #define LIFECYCLE_STEP(...) lc_step(__LINE__, __VA_ARGS__) 922 int delta =
i == what ||
i == what2 ? 1 : 0;
924 <<
"lc_counts[" << i <<
"] - lc_prev[" << i <<
"] was " 926 <<
", from line " << lineno;
931 template <
typename R>
942 : constructed(true) {
972 template <
typename R>
982 for (
int pass = 0; pass < 10; ++pass) {
983 for (
int i = 0;
i < 10; ++
i) {
984 queue.blockingWrite();
987 queue.blockingWrite(1,
"one");
1001 queue.blockingWrite(src);
1014 for (
int i = 0;
i < 50; ++
i) {
1019 queue.blockingRead(node);
1048 runPerfectForwardingTest<std::false_type>();
1052 runPerfectForwardingTest<std::true_type>();
1055 template <
bool Dynamic = false>
1090 c.blockingRead(dst);
1104 d.blockingRead(dst);
1107 c.blockingWrite(dst);
1125 run_queue_moving<true>();
1132 ASSERT_THROW(DynamicMPMCQueueInt cq(0), std::invalid_argument);
1135 template <
bool Dynamic>
1139 const auto wait = std::chrono::milliseconds(100);
1143 std::vector<std::thread>
threads;
1144 boost::barrier
b{3};
1145 for (
int i = 0;
i < 2;
i++) {
1146 threads.emplace_back([&,
i] {
1155 for (
int i = 0;
i < 2;
i++) {
1159 for (
int i = 0;
i < 2;
i++) {
1160 int other = (
i + 1) % 2;
1170 template <
bool Dynamic>
1175 const auto wait = std::chrono::milliseconds(100);
1178 std::vector<std::thread>
threads;
1179 boost::barrier
b{3};
1180 for (
int i = 0;
i < 2;
i++) {
1181 threads.emplace_back([&,
i] {
1192 for (
int i = 0;
i < 2;
i++) {
1197 for (
int i = 0;
i < 2;
i++) {
1198 int other = (
i + 1) % 2;
1209 testTryReadUntil<false>();
1213 testTryReadUntil<true>();
1217 testTryWriteUntil<false>();
1221 testTryWriteUntil<true>();
1224 template <
bool Dynamic>
1234 testTimeout<false>(queue);
1239 testTimeout<true>(queue);
void runElementTypeTest(T &&src)
Lifecycle & operator=(Lifecycle &&) noexcept
Lifecycle(Lifecycle &&) noexcept
void runMtNeverFail(std::vector< int > &nts, int n)
void runMtNeverFailUntilSystem(std::vector< int > &nts, int n)
std::atomic< int64_t > sum(0)
void runMtProdConsDeterministicDynamic(long seed, uint32_t prods, uint32_t cons, uint32_t numOps, size_t cap, size_t minCap, size_t mult)
string methodName() override
std::string sformat(StringPiece fmt, Args &&...args)
#define PC_BENCH(q, np, nc,...)
static FOLLY_TLS int lc_counts[MAX_LIFECYCLE_EVENT]
#define EXPECT_EQ(val1, val2)
std::chrono::steady_clock::time_point now()
constexpr detail::Map< Move > move
DeterministicAtomicImpl< T, DeterministicSchedule > DeterministicAtomic
Gen seq(Value first, Value last)
void runPerfectForwardingTest()
static std::thread thread(Func &&func, Args &&...args)
—— Concurrent Priority Queue Implementation ——
bool callWrite(Q &q, int i) override
requires E e noexcept(noexcept(s.error(std::move(e))))
Lifecycle(const Lifecycle &) noexcept
TryWriteUntilCaller(Duration &&duration)
void setFromEnv(T &var, const char *envvar)
static void lc_step(int lineno, int what=NOTHING, int what2=NOTHING)
string methodName() override
#define LIFECYCLE_STEP(...)
std::vector< std::thread::id > threads
string methodName() override
static int lc_outstanding()
static std::function< size_t(size_t)> uniform(uint64_t seed)
std::chrono::milliseconds Duration
Lifecycle(int, char const *) noexcept
void testTimeout(MPMCQueue< int, std::atomic, Dynamic > &q)
virtual bool callWrite(Q &q, int i)=0
#define FOLLY_ASSUME_FBVECTOR_COMPATIBLE_1(...)
void runMtNeverFailUntilSteady(std::vector< int > &nts, int n)
void runMtProdConsEmulatedFutex()
bool wait(Waiter *waiter, bool shouldSleep, Waiter *&next)
uint64_t runNeverFailTest(int numThreads, int numOps)
Lifecycle & operator=(const Lifecycle &) noexcept
static const char *const value
void run_mt_sequencer_test(int numThreads, int numOps, uint32_t init)
void runMtNeverFailDeterministic(std::vector< int > &nts, int n, long seed)
string methodName() override
std::enable_if<!std::is_array< T >::value, std::unique_ptr< T > >::type make_unique(Args &&...args)
void run_mt_sequencer_thread(int numThreads, int numOps, uint32_t init, TurnSequencer< Atom > &seq, Atom< uint32_t > &spinThreshold, int &prev, int i)
static FOLLY_TLS int lc_prev[MAX_LIFECYCLE_EVENT]
#define ASSERT_THROW(statement, expected_exception)
virtual string methodName()=0
static std::function< size_t(size_t)> uniformSubset(uint64_t seed, size_t n=2, size_t m=64)
void runTryEnqDeqThread(int numThreads, int n, MPMCQueue< int, Atom, Dynamic > &cq, std::atomic< uint64_t > &sum, int t)
static FOLLY_TLS int active_instances
Future< Unit > when(bool p, F &&thunk)
#define EXPECT_TRUE(condition)
string producerConsumerBench(Q &&queue, string qName, int numProducers, int numConsumers, int numOps, WriteMethodCaller< Q > &writer, bool ignoreContents=false)
void intrusive_ptr_release(RefCounted const *p)
int bind(NetworkSocket s, const sockaddr *name, socklen_t namelen)
TEST(MPMCQueue, sequencer)
void runNeverFailUntilThread(int numThreads, int n, MPMCQueue< int, Atom, Dynamic > &cq, std::atomic< uint64_t > &sum, int t)
bool callWrite(Q &q, int i) override
static unsigned long long allocated
bool callWrite(Q &q, int i) override
void intrusive_ptr_add_ref(RefCounted const *p)
void swap(SwapTrackingAlloc< T > &, SwapTrackingAlloc< T > &)
#define EXPECT_FALSE(condition)
static void join(std::thread &child)
void runMtProdConsDeterministic(long seed)
clock_type::time_point getCheckpoint() const
bool callWrite(Q &q, int i) override
void runTryEnqDeqTest(int numThreads, int numOps)
void runNeverFailThread(int numThreads, int n, MPMCQueue< int, Atom, Dynamic > &cq, std::atomic< uint64_t > &sum, int t)