18 #include <boost/thread.hpp> 26 #include <glog/logging.h> 28 using namespace folly;
35 static std::vector<int>
nthr = {1, 2, 4, 8};
39 template <
class PriorityQueue>
77 basicOpsTest<RelaxedConcurrentPriorityQueue<int, false, true>>();
79 basicOpsTest<RelaxedConcurrentPriorityQueue<int, false, true, 0>>();
80 basicOpsTest<RelaxedConcurrentPriorityQueue<int, false, true, 0, 1>>();
81 basicOpsTest<RelaxedConcurrentPriorityQueue<int, false, true, 0, 3>>();
83 basicOpsTest<RelaxedConcurrentPriorityQueue<int, false, true, 1, 1>>();
84 basicOpsTest<RelaxedConcurrentPriorityQueue<int, false, true, 2, 1>>();
85 basicOpsTest<RelaxedConcurrentPriorityQueue<int, false, true, 3, 3>>();
87 basicOpsTest<RelaxedConcurrentPriorityQueue<int, true, true>>();
88 basicOpsTest<RelaxedConcurrentPriorityQueue<int, true, true, 0>>();
89 basicOpsTest<RelaxedConcurrentPriorityQueue<int, true, true, 2>>();
93 template <
typename Func>
95 boost::barrier barrier_start{
nthreads + 1};
98 threads[tid] = std::thread([&, tid] {
104 barrier_start.wait();
106 for (
auto&
t : threads) {
113 duration = std::chrono::duration_cast<std::chrono::nanoseconds>(tend - tbegin)
118 template <
class PriorityQueue>
123 rng.seed(FLAGS_elems);
126 for (
int i = 0;
i < FLAGS_elems;
i++) {
135 while (counter < FLAGS_elems) {
144 TEST(CPQ, SingleThrStrictImplTest) {
146 singleThreadTest<RelaxedConcurrentPriorityQueue<int, false, false, 0>>();
147 singleThreadTest<RelaxedConcurrentPriorityQueue<int, false, false, 0, 1>>();
148 singleThreadTest<RelaxedConcurrentPriorityQueue<int, false, false, 0, 8>>();
150 singleThreadTest<RelaxedConcurrentPriorityQueue<int, true, false, 0>>();
151 singleThreadTest<RelaxedConcurrentPriorityQueue<int, true, false, 0, 1>>();
152 singleThreadTest<RelaxedConcurrentPriorityQueue<int, true, false, 0, 8>>();
155 TEST(CPQ, SingleThrRelaxedImplTest) {
157 singleThreadTest<RelaxedConcurrentPriorityQueue<int, false, false, 1>>();
158 singleThreadTest<RelaxedConcurrentPriorityQueue<int, false, false, 8>>();
159 singleThreadTest<RelaxedConcurrentPriorityQueue<int, false, false, 8, 2>>();
160 singleThreadTest<RelaxedConcurrentPriorityQueue<int, false, false, 8, 8>>();
161 singleThreadTest<RelaxedConcurrentPriorityQueue<int, false, false, 2, 128>>();
162 singleThreadTest<RelaxedConcurrentPriorityQueue<int, false, false, 100, 8>>();
164 singleThreadTest<RelaxedConcurrentPriorityQueue<int, true, false, 1>>();
165 singleThreadTest<RelaxedConcurrentPriorityQueue<int, true, false, 8>>();
170 template <
class PriorityQueue>
176 rng.seed(FLAGS_elems);
179 for (
int i = 0;
i < FLAGS_elems;
i++) {
185 std::atomic<uint64_t> pop_sum(0);
186 std::atomic<int> to_end(0);
192 int index = to_end.fetch_add(1, std::memory_order_acq_rel);
193 if (index < FLAGS_elems) {
198 pop_sum.fetch_add(val, std::memory_order_acq_rel);
207 TEST(CPQ, ConcurrentPopStrictImplTest) {
224 TEST(CPQ, ConcurrentPopRelaxedImplTest) {
245 template <
class PriorityQueue>
249 std::atomic<uint64_t> counter_sum(0);
256 for (
int i = tid;
i < FLAGS_elems;
i +=
nthreads) {
261 counter_sum.fetch_add(local_sum, std::memory_order_acq_rel);
267 for (
int i = 0;
i < FLAGS_elems;
i++) {
276 TEST(CPQ, ConcurrentPushStrictImplTest) {
277 concurrentPush<RelaxedConcurrentPriorityQueue<int, false, false, 0>>();
278 concurrentPush<RelaxedConcurrentPriorityQueue<int, false, false, 0, 8>>();
279 concurrentPush<RelaxedConcurrentPriorityQueue<int, false, false, 0, 128>>();
282 TEST(CPQ, ConcurrentPushRelaxedImplTest) {
283 concurrentPush<RelaxedConcurrentPriorityQueue<int, false, false>>();
284 concurrentPush<RelaxedConcurrentPriorityQueue<int, false, false, 1, 8>>();
285 concurrentPush<RelaxedConcurrentPriorityQueue<int, false, false, 2, 128>>();
286 concurrentPush<RelaxedConcurrentPriorityQueue<int, false, false, 128, 8>>();
287 concurrentPush<RelaxedConcurrentPriorityQueue<int, false, false, 8, 8>>();
290 template <
class PriorityQueue>
294 std::atomic<uint64_t> counter_push(0);
295 std::atomic<uint64_t> counter_pop(0);
301 std::atomic<int> to_end(0);
311 for (
int i = tid;
i < FLAGS_elems;
i +=
nthreads) {
319 for (
int i = 0;
i <
ops;
i++) {
331 counter_push.fetch_add(local_push, std::memory_order_seq_cst);
332 counter_pop.fetch_add(local_pop, std::memory_order_seq_cst);
335 uint64_t r = counter_push.load(std::memory_order_seq_cst) -
336 counter_pop.load(std::memory_order_seq_cst);
338 uint64_t index = to_end.fetch_add(1, std::memory_order_acq_rel);
351 template <
class PriorityQueue>
364 for (
int i = tid;
i < FLAGS_elems;
i +=
nthreads) {
371 for (
int i = 0;
i <
ops;
i++) {
383 static std::vector<int>
sizes = {0, 1024};
385 TEST(CPQ, ConcurrentMixedStrictImplTest) {
386 for (
auto size : sizes) {
387 concurrentOps<RelaxedConcurrentPriorityQueue<int, false, false, 0>>(
size);
388 concurrentOps<RelaxedConcurrentPriorityQueue<int, false, false, 0, 1>>(
390 concurrentOps<RelaxedConcurrentPriorityQueue<int, false, false, 0, 32>>(
395 TEST(CPQ, ConcurrentMixedRelaxedImplTest) {
396 for (
auto size : sizes) {
397 concurrentOps<RelaxedConcurrentPriorityQueue<int, false, false>>(
size);
398 concurrentOps<RelaxedConcurrentPriorityQueue<int, false, false, 1, 32>>(
400 concurrentOps<RelaxedConcurrentPriorityQueue<int, false, false, 32, 1>>(
402 concurrentOps<RelaxedConcurrentPriorityQueue<int, false, false, 8, 8>>(
404 concurrentOps<RelaxedConcurrentPriorityQueue<int, true, false, 8, 8>>(
size);
408 TEST(CPQ, StrictImplSizeTest) {
409 for (
auto size : sizes) {
410 concurrentSizeTest<RelaxedConcurrentPriorityQueue<int, false, true, 0>>(
412 concurrentSizeTest<RelaxedConcurrentPriorityQueue<int, true, true, 0>>(
417 TEST(CPQ, RelaxedImplSizeTest) {
418 for (
auto size : sizes) {
419 concurrentSizeTest<RelaxedConcurrentPriorityQueue<int, false, true>>(
size);
420 concurrentSizeTest<RelaxedConcurrentPriorityQueue<int, true, true, 2, 8>>(
422 concurrentSizeTest<RelaxedConcurrentPriorityQueue<int, false, true, 8, 2>>(
427 template <
class PriorityQueue>
430 uint32_t total_threads = PushThr + PopThr;
433 std::atomic<uint64_t> sum_push(0);
434 std::atomic<uint64_t> sum_pop(0);
436 auto fn_popthr = [&](
uint32_t tid) {
437 for (
int i = tid;
i <
ops;
i += PopThr) {
440 sum_pop.fetch_add(val, std::memory_order_acq_rel);
444 auto fn_pushthr = [&](
uint32_t tid) {
447 for (
int i = tid;
i <
ops;
i += PushThr) {
450 sum_push.fetch_add(val, std::memory_order_acq_rel);
453 boost::barrier barrier_start{total_threads + 1};
455 std::vector<std::thread> threads_push(PushThr);
456 for (
int tid = 0; tid < PushThr; ++tid) {
457 threads_push[tid] = std::thread([&, tid] {
458 barrier_start.wait();
462 std::vector<std::thread> threads_pop(PopThr);
463 for (
int tid = 0; tid < PopThr; ++tid) {
464 threads_pop[tid] = std::thread([&, tid] {
465 barrier_start.wait();
470 barrier_start.wait();
472 for (
auto&
t : threads_push) {
475 for (
auto&
t : threads_pop) {
481 TEST(CPQ, PusherPopperBlockingTest) {
482 for (
auto i : nthr) {
483 for (
auto j : nthr) {
485 multiPusherPopper<RelaxedConcurrentPriorityQueue<int, true, false, 0, 1>>(
488 multiPusherPopper<RelaxedConcurrentPriorityQueue<int, true, false, 1, 8>>(
494 multiPusherPopper<RelaxedConcurrentPriorityQueue<int, true, false>>(
i, j);
501 TEST(CPQ, PusherPopperSpinningTest) {
502 for (
auto i : nthr) {
503 for (
auto j : nthr) {
514 multiPusherPopper<RelaxedConcurrentPriorityQueue<int, false, false>>(
522 template <
class PriorityQueue>
527 boost::barrier
b{
static_cast<unsigned int>(nPop + 1)};
528 std::atomic<int> finished{0};
530 std::vector<std::thread> threads_pop(nPop);
531 for (
int tid = 0; tid < nPop; ++tid) {
532 threads_pop[tid] = std::thread([&] {
536 finished.fetch_add(1, std::memory_order_acq_rel);
547 while (finished.load(std::memory_order_acquire) !=
c)
549 EXPECT_EQ(finished.load(std::memory_order_acquire),
c);
552 for (
auto&
t : threads_pop) {
557 template <
class PriorityQueue>
561 for (
int iter = 0; iter < FLAGS_reps * 10; iter++) {
563 boost::barrier
b{
static_cast<unsigned int>(nThrs + nThrs + 1)};
564 std::atomic<uint32_t> finished{0};
565 std::vector<std::thread> threads_pop(nThrs);
566 for (
uint32_t tid = 0; tid < nThrs; ++tid) {
567 threads_pop[tid] = std::thread([&] {
571 finished.fetch_add(1, std::memory_order_acq_rel);
575 std::vector<std::thread> threads_push(nThrs);
576 for (
uint32_t tid = 0; tid < nThrs; ++tid) {
577 threads_push[tid] = std::thread([&, tid] {
580 while (finished.load(std::memory_order_acquire) != nThrs)
582 EXPECT_EQ(finished.load(std::memory_order_acquire), nThrs);
587 for (
auto&
t : threads_pop) {
590 for (
auto&
t : threads_push) {
598 blockingFirst<RelaxedConcurrentPriorityQueue<int, true, false, 0, 1>>();
599 blockingFirst<RelaxedConcurrentPriorityQueue<int, true, false, 0, 16>>();
601 blockingFirst<RelaxedConcurrentPriorityQueue<int, true, false>>();
602 blockingFirst<RelaxedConcurrentPriorityQueue<int, true, false, 8, 8>>();
603 blockingFirst<RelaxedConcurrentPriorityQueue<int, true, false, 16, 1>>();
605 blockingFirst<RelaxedConcurrentPriorityQueue<int, false, false>>();
606 blockingFirst<RelaxedConcurrentPriorityQueue<int, false, false, 8, 8>>();
607 blockingFirst<RelaxedConcurrentPriorityQueue<int, false, false, 16, 1>>();
612 concurrentBlocking<RelaxedConcurrentPriorityQueue<int, true, false, 0, 1>>();
613 concurrentBlocking<RelaxedConcurrentPriorityQueue<int, true, false, 0, 16>>();
615 concurrentBlocking<RelaxedConcurrentPriorityQueue<int, true, false>>();
616 concurrentBlocking<RelaxedConcurrentPriorityQueue<int, true, false, 8, 8>>();
617 concurrentBlocking<RelaxedConcurrentPriorityQueue<int, true, false, 16, 1>>();
619 concurrentBlocking<RelaxedConcurrentPriorityQueue<int, false, false>>();
620 concurrentBlocking<RelaxedConcurrentPriorityQueue<int, false, false, 8, 8>>();
629 template <
class PriorityQueue,
template <
typename>
class Atom = std::atomic>
631 for (
int r = 0; r < FLAGS_reps; r++) {
639 for (
int i = 0;
i < FLAGS_elems;
i++) {
644 pre_size = FLAGS_elems;
645 Atom<uint64_t> atom_push_sum{0};
646 Atom<uint64_t> atom_pop_sum{0};
647 std::vector<std::thread>
threads(thr);
648 for (
int tid = 0; tid < thr; ++tid) {
654 for (
int i = 0;
i < FLAGS_ops;
i++) {
661 atom_push_sum.fetch_add(push_sum, std::memory_order_acq_rel);
662 atom_pop_sum.fetch_add(pop_sum, std::memory_order_acq_rel);
666 for (
auto&
t : threads) {
671 while (pre_size > 0) {
678 EXPECT_EQ(atom_pop_sum, pre_sum + atom_push_sum);
682 TEST(CPQ, DSchedMixedStrictTest) {
693 DeterministicAtomic>();
702 DeterministicAtomic>,
703 DeterministicAtomic>();
712 DeterministicAtomic>,
713 DeterministicAtomic>();
722 DeterministicAtomic>,
723 DeterministicAtomic>();
732 DeterministicAtomic>,
733 DeterministicAtomic>();
742 DeterministicAtomic>,
743 DeterministicAtomic>();
746 TEST(CPQ, DSchedMixedRelaxedTest) {
757 DeterministicAtomic>();
766 DeterministicAtomic>,
767 DeterministicAtomic>();
776 DeterministicAtomic>,
777 DeterministicAtomic>();
786 DeterministicAtomic>,
787 DeterministicAtomic>();
796 DeterministicAtomic>,
797 DeterministicAtomic>();
806 DeterministicAtomic>,
807 DeterministicAtomic>();
816 DeterministicAtomic>,
817 DeterministicAtomic>();
826 DeterministicAtomic>,
827 DeterministicAtomic>();
830 template <
typename T>
844 template <
typename T>
846 std::priority_queue<T>
q_;
851 std::lock_guard<std::mutex>
g(m_);
856 std::lock_guard<std::mutex>
g(m_);
867 template <
class PriorityQueue>
875 if (name.find(
"RCPQ") != std::string::npos) {
881 uint32_t total_threads = PushThr + PopThr;
883 for (
int r = 0; r < reps; ++r) {
888 rng.seed(initial_size);
895 auto fn_popthr = [&](
uint32_t tid) {
896 for (
int i = tid;
i <
ops;
i += PopThr) {
902 auto fn_pushthr = [&](
uint32_t tid) {
905 for (
int i = tid;
i <
ops;
i += PushThr) {
910 boost::barrier barrier_start{total_threads + 1};
911 std::vector<std::thread> threads_push(PushThr);
912 for (
uint32_t tid = 0; tid < PushThr; ++tid) {
913 threads_push[tid] = std::thread([&, tid] {
914 barrier_start.wait();
918 std::vector<std::thread> threads_pop(PopThr);
919 for (
uint32_t tid = 0; tid < PopThr; ++tid) {
920 threads_pop[tid] = std::thread([&, tid] {
921 barrier_start.wait();
925 barrier_start.wait();
928 for (
auto&
t : threads_push) {
931 for (
auto&
t : threads_pop) {
936 dur = std::chrono::duration_cast<std::chrono::nanoseconds>(tend - tbegin)
943 std::cout << std::setw(12) <<
name;
944 std::cout <<
" " << std::setw(8) << max / ops <<
" ns";
945 std::cout <<
" " << std::setw(8) << avg / ops <<
" ns";
946 std::cout <<
" " << std::setw(8) << min / ops <<
" ns";
947 std::cout << std::endl;
951 template <
class PriorityQueue>
959 for (
int r = 0; r < reps; ++r) {
964 rng.seed(initial_size);
994 std::cout << std::setw(12) <<
name;
995 std::cout <<
" " << std::setw(8) << max / ops <<
" ns";
996 std::cout <<
" " << std::setw(8) << avg / ops <<
" ns";
997 std::cout <<
" " << std::setw(8) << min / ops <<
" ns";
998 std::cout << std::endl;
1002 template <
class PriorityQueue>
1007 int valid = initial_size / top_percent;
1011 int target = initial_size - valid;
1012 for (
int r = 0; r < reps; ++r) {
1014 std::unordered_set<int>
filter;
1016 rng.seed(initial_size + r);
1024 }
while (filter.find(val) != filter.end());
1034 if (val >= target) {
1037 if (stop > 0 && val < target) {
1047 std::cout << std::setw(16) << name <<
" ";
1048 std::cout <<
"Lower priority popped: " << avg;
1049 std::cout << std::endl;
1052 using FCPQ = folly::FlatCombiningPriorityQueue<int>;
1058 std::vector<int> test_sizes = {64, 512, 65536};
1059 std::vector<int>
nthrs = {1, 2, 4, 8, 12, 14, 16, 28, 32, 56};
1061 std::cout <<
"Threads have equal chance to push and pop. \n" 1062 <<
"The bench caculates the avg execution time for\n" 1063 <<
"one operation (push OR pop).\n" 1064 <<
"GL : std::priority_queue protected by global lock\n" 1065 <<
"FL : flatcombinning priority queue\n" 1066 <<
"RCPQ: the relaxed concurrent priority queue\n" 1068 std::cout <<
"\nTest_name, Max time, Avg time, Min time" << std::endl;
1069 for (
auto s : test_sizes) {
1070 std::cout <<
"\n ------ Initial size: " <<
s <<
" ------" << std::endl;
1071 for (
int i : nthrs) {
1073 std::cout <<
"Thread number: " <<
i << std::endl;
1074 throughtput_test<GlobalLockPQ<int>>(
"GL",
s);
1075 throughtput_test<FCPQ>(
"FC",
s);
1076 throughtput_test<RelaxedConcurrentPriorityQueue<int>>(
"RCPQ",
s);
1085 std::vector<int> test_sizes = {0, 512, 65536};
1086 std::vector<int>
nthrs = {1, 2, 4, 8, 12, 16, 24};
1088 std::cout <<
"<Producer, Consumer> pattern \n" 1089 <<
"The bench caculates the avg execution time for\n" 1090 <<
"push AND pop pair(two operations).\n" 1091 <<
"GL : std::priority_queue protected by global lock\n" 1092 <<
"FL : flatcombinning priority queue\n" 1093 <<
"RCPQ SPN: RCPQ spinning\n" 1094 <<
"RCPQ BLK: RCPQ blocking\n" 1096 for (
int s : test_sizes) {
1097 std::cout <<
"\n ------ Scalability ------" << std::endl;
1098 for (
int m : nthrs) {
1099 for (
int n : nthrs) {
1103 std::cout <<
"<" <<
m <<
" , " << n <<
"> , size = " <<
s <<
":" 1105 producer_consumer_test<GlobalLockPQ<int>>(
"GL",
m, n,
s);
1106 producer_consumer_test<FCPQ>(
"FC",
m, n,
s);
1109 "RCPQ SPN",
m, n,
s);
1112 "RCPQ BLK",
m, n,
s);
1115 std::cout <<
"\n ------ Unbalanced(Producer<Consumer) ------" << std::endl;
1116 for (
int m : nthrs) {
1117 for (
int n : nthrs) {
1118 if (
m > 4 || n - 4 <=
m) {
1121 std::cout <<
"<" <<
m <<
" , " << n <<
"> , size = " <<
s <<
":" 1123 producer_consumer_test<GlobalLockPQ<int>>(
"GL",
m, n,
s);
1124 producer_consumer_test<FCPQ>(
"FC",
m, n,
s);
1127 "RCPQ SPN",
m, n,
s);
1130 "RCPQ BLK",
m, n,
s);
1134 std::cout <<
"\n ------ Unbalanced(Producer>Consumer) ------" << std::endl;
1135 for (
int m : nthrs) {
1136 for (
int n : nthrs) {
1137 if (m <= 8 || n >
m - 4 || n % 4 != 0) {
1140 std::cout <<
"<" <<
m <<
" , " << n <<
"> , size = " <<
s <<
":" 1142 producer_consumer_test<GlobalLockPQ<int>>(
"GL",
m, n,
s);
1143 producer_consumer_test<FCPQ>(
"FC",
m, n,
s);
1146 "RCPQ SPN",
m, n,
s);
1149 "RCPQ BLK",
m, n,
s);
1159 std::vector<int> test_sizes = {512, 65536, 1 << 20};
1160 std::vector<int> rates = {1000, 100, 10};
1161 for (
auto s : test_sizes) {
1162 for (
auto p : rates) {
1163 std::cout <<
"\n------ Size: " <<
s <<
" Get top: " << 100. / p <<
"%" 1164 <<
" (Num: " <<
s / p <<
")" 1165 <<
" ------" << std::endl;
1166 accuracy_test<Queue<int>>(
"FIFO Q",
s, p);
1167 accuracy_test<RelaxedConcurrentPriorityQueue<int, false, false, 0>>(
1168 "RCPQ(strict)",
s, p);
1169 accuracy_test<RelaxedConcurrentPriorityQueue<int, false, false, 2>>(
1170 "RCPQ(batch=2)",
s, p);
1171 accuracy_test<RelaxedConcurrentPriorityQueue<int, false, false, 8>>(
1172 "RCPQ(batch=8)",
s, p);
1173 accuracy_test<RelaxedConcurrentPriorityQueue<int, false, false, 16>>(
1174 "RCPQ(batch=16)",
s, p);
1175 accuracy_test<RelaxedConcurrentPriorityQueue<int, false, false, 50>>(
1176 "RCPQ(batch=50)",
s, p);
static uint64_t run_once(const Func &fn)
execute the function for nthreads
static uint64_t producer_consumer_test(std::string name, uint32_t PushThr, uint32_t PopThr, uint64_t initial_size)
DEFINE_bool(bench, false,"run benchmark")
std::atomic< int64_t > sum(0)
PUSHMI_INLINE_VAR constexpr detail::filter_fn filter
DEFINE_int32(reps, 1,"number of reps")
#define EXPECT_EQ(val1, val2)
std::chrono::steady_clock::time_point now()
DeterministicAtomicImpl< T, DeterministicSchedule > DeterministicAtomic
static std::thread thread(Func &&func, Args &&...args)
—— Concurrent Priority Queue Implementation ——
DEFINE_int64(threadtimeout_ms, 60000,"Idle time before ThreadPoolExecutor threads are joined")
void multiPusherPopper(int PushThr, int PopThr)
std::vector< std::thread::id > threads
constexpr auto size(C const &c) -> decltype(c.size())
static std::function< size_t(size_t)> uniform(uint64_t seed)
folly::FlatCombiningPriorityQueue< int > FCPQ
static void accuracy_test(std::string name, uint64_t initial_size, uint32_t top_percent)
void concurrentSizeTest(int ops)
std::priority_queue< T > q_
static map< string, int > m
std::mt19937 DefaultGenerator
void concurrentPopforSharedBuffer()
#define EXPECT_TRUE(condition)
std::atomic< int > counter
void concurrentOps(int ops)
#define EXPECT_FALSE(condition)
static void join(std::thread &child)
static uint64_t throughtput_test(std::string name, uint64_t initial_size)
static void DSchedMixedTest()
TEST(SequencedExecutor, CPUThreadPoolExecutor)
static std::vector< int > sizes
void concurrentBlocking()
uint64_t bench(const int nprod, const int ncons, const std::string &name)
static std::vector< int > nthr