20 #include <condition_variable> 24 #include <glog/logging.h> 32 using namespace folly;
34 using std::unique_ptr;
42 return counter.readFast();
52 Counter g_counter_for_mt_slow(0, UINT32_MAX);
53 Counter g_counter_for_mt_fast(0, UINT32_MAX);
58 std::condition_variable cv;
67 std::unique_lock<std::mutex> lk(cv_m);
68 cv.wait(lk, [
i] {
return i < g_sync_for_mt; });
70 counter->increment(1);
84 vector<unique_ptr<std::thread>>
threads(kNumThreads);
91 std::make_unique<std::thread>(Runner, &g_counter_for_mt_slow,
i + 1);
106 std::lock_guard<std::mutex> lk(cv_m);
107 g_sync_for_mt =
i + 1;
110 total += (kNumThreads -
i) * 10;
113 counter_value = g_counter_for_mt_slow.readFull();
114 }
while (counter_value < total);
119 counter_value = g_counter_for_mt_slow.readFull();
122 dead_total += (
i + 1) * 10;
123 EXPECT_GE(dead_total, GetDeadThreadsTotal(g_counter_for_mt_slow));
129 counter_value = g_counter_for_mt_slow.readFull();
132 EXPECT_EQ(dead_total, GetDeadThreadsTotal(g_counter_for_mt_slow));
140 vector<unique_ptr<std::thread>>
threads(kNumThreads);
147 std::make_unique<std::thread>(Runner, &g_counter_for_mt_fast,
i + 1);
151 std::lock_guard<std::mutex> lk(cv_m);
158 total += (kNumThreads -
i) * 10;
164 int32_t counter_value = g_counter_for_mt_fast.readFull();
166 EXPECT_EQ(total, GetDeadThreadsTotal(g_counter_for_mt_fast));
174 for (
int i = 0;
i < 41; ++
i) {
189 for (
int i = 0;
i < 7; ++
i) {
203 DEFINE_int32(numThreads, 8,
"Number simultaneous threads for benchmarks.");
204 #define CREATE_INC_FUNC(size) \ 205 void incFunc##size() { \ 206 const int num = kNumInserts / FLAGS_numThreads; \ 207 for (int i = 0; i < num; ++i) { \ 218 <<
"FLAGS_numThreads must evenly divide kNumInserts (" <<
kNumInserts 220 const int numPerThread =
kNumInserts / FLAGS_numThreads;
223 std::atomic<bool>
run(
true);
224 std::atomic<int> threadsDone(0);
225 std::vector<std::thread>
threads;
226 for (
int i = 0;
i < FLAGS_numThreads; ++
i) {
227 threads.push_back(std::thread([&] {
229 std::atomic_fetch_add(&threadsDone, 1);
239 otherTCInt64.
set(33);
242 while (threadsDone.load() < FLAGS_numThreads) {
254 for (
auto&
t : threads) {
262 #define MAKE_MT_CACHE_SIZE_BM(size) \ 263 void BM_mt_cache_size##size(int iters, int cacheSize) { \ 264 kNumInserts = iters; \ 265 globalInt##size.set(0); \ 266 globalInt##size.setCacheSize(cacheSize); \ 267 std::vector<std::thread> threads; \ 268 for (int i = 0; i < FLAGS_numThreads; ++i) { \ 269 threads.push_back(std::thread(incFunc##size)); \ 271 for (auto& t : threads) { \ 278 #define REG_BASELINE(name, inc_stmt) \ 279 BENCHMARK(FB_CONCATENATE(BM_mt_baseline_, name), iters) { \ 280 const int iterPerThread = iters / FLAGS_numThreads; \ 281 std::vector<std::thread> threads; \ 282 for (int i = 0; i < FLAGS_numThreads; ++i) { \ 283 threads.push_back(std::thread([&]() { \ 284 for (int j = 0; j < iterPerThread; ++j) { \ 289 for (auto& t : threads) { \ 305 std::atomic<int64_t> ints_[kBuckets_];
309 std::atomic_fetch_add(&ints_[buck],
val);
315 static const int numToRead = 8;
317 ret += ints_[
i].load(std::memory_order_relaxed);
319 return ret * (kBuckets_ / numToRead);
325 for (
auto&
i : ints_) {
328 ret +=
i.load(std::memory_order_relaxed);
383 BENCHMARK_DRAW_LINE();
387 gflags::ParseCommandLineFlags(&argc, &argv,
true);
388 gflags::SetCommandLineOptionWithMode(
389 "bm_min_usec",
"10000", gflags::SET_FLAG_IF_DEFAULT);
390 if (FLAGS_benchmark) {
int main(int argc, char **argv)
ThreadLocal< int32_t > globalTL32Baseline
FOLLY_TLS int64_t global__thread64
void BM_mt_cache_size64(int iters, int cacheSize)
int RUN_ALL_TESTS() GTEST_MUST_USE_RESULT_
#define EXPECT_EQ(val1, val2)
#define REG_BASELINE(name, inc_stmt)
ssize_t readFull(int fd, void *buf, size_t count)
FOLLY_TLS int32_t global__thread32
ThreadCachedInt< int32_t > globalInt32(0, 11)
static size_t const kNumThreads
std::atomic< int64_t > globalInt64Baseline(0)
—— Concurrent Priority Queue Implementation ——
#define EXPECT_GE(val1, val2)
ThreadCachedInt< int64_t > Counter
#define CREATE_INC_FUNC(size)
std::vector< std::thread::id > threads
#define FOR_EACH_RANGE(i, begin, end)
#define MAKE_MT_CACHE_SIZE_BM(size)
static void run(EventBaseManager *ebm, EventBase *eb, folly::Baton<> *stop, const StringPiece &name)
ShardedAtomicInt shd_int64
DEFINE_int32(numThreads, 8,"Number simultaneous threads for benchmarks.")
TEST_F(AsyncSSLSocketWriteTest, write_coalescing1)
#define BENCHMARK_PARAM(name, param)
std::atomic< int32_t > globalInt32Baseline(0)
uint64_t twang_mix64(uint64_t key) noexcept
ThreadLocal< int64_t > globalTL64Baseline
BENCHMARK(fbFollyGlobalBenchmarkBaseline)
std::atomic< int > counter
uint64_t getCurrentThreadID()
#define EXPECT_NE(val1, val2)
uint32_t GetDeadThreadsTotal(const Counter &counter)
GTEST_API_ void InitGoogleTest(int *argc, char **argv)
void BM_mt_cache_size32(int iters, int cacheSize)
ThreadCachedInt< int64_t > globalInt64(0, 11)
TEST(SequencedExecutor, CPUThreadPoolExecutor)
auto doNotOptimizeAway(const T &datum) -> typename std::enable_if< !detail::DoNotOptimizeAwayNeedsIndirect< T >::value >::type