proxygen
ThreadCachedIntTest.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2011-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <folly/ThreadCachedInt.h>
18 
19 #include <atomic>
20 #include <condition_variable>
21 #include <memory>
22 #include <thread>
23 
24 #include <glog/logging.h>
25 
26 #include <folly/Benchmark.h>
27 #include <folly/hash/Hash.h>
30 #include <folly/system/ThreadId.h>
31 
32 using namespace folly;
33 
34 using std::unique_ptr;
35 using std::vector;
36 
38 
40  public:
42  return counter.readFast();
43  }
44 };
45 
46 // Multithreaded tests. Creates a specified number of threads each of
47 // which iterates a different amount and dies.
48 
49 namespace {
50 // Set cacheSize to be large so cached data moves to target_ only when
51 // thread dies.
52 Counter g_counter_for_mt_slow(0, UINT32_MAX);
53 Counter g_counter_for_mt_fast(0, UINT32_MAX);
54 
55 // Used to sync between threads. The value of this variable is the
56 // maximum iteration index upto which Runner() is allowed to go.
57 uint32_t g_sync_for_mt(0);
58 std::condition_variable cv;
59 std::mutex cv_m;
60 
61 // Performs the specified number of iterations. Within each
62 // iteration, it increments counter 10 times. At the beginning of
63 // each iteration it checks g_sync_for_mt to see if it can proceed,
64 // otherwise goes into a loop sleeping and rechecking.
65 void Runner(Counter* counter, uint32_t iterations) {
66  for (uint32_t i = 0; i < iterations; ++i) {
67  std::unique_lock<std::mutex> lk(cv_m);
68  cv.wait(lk, [i] { return i < g_sync_for_mt; });
69  for (uint32_t j = 0; j < 10; ++j) {
70  counter->increment(1);
71  }
72  }
73 }
74 } // namespace
75 
76 // Slow test with fewer threads where there are more busy waits and
77 // many calls to readFull(). This attempts to test as many of the
78 // code paths in Counter as possible to ensure that counter values are
79 // properly passed from thread local state, both at calls to
80 // readFull() and at thread death.
81 TEST_F(ThreadCachedIntTest, MultithreadedSlow) {
82  static constexpr uint32_t kNumThreads = 20;
83  g_sync_for_mt = 0;
84  vector<unique_ptr<std::thread>> threads(kNumThreads);
85  // Creates kNumThreads threads. Each thread performs a different
86  // number of iterations in Runner() - threads[0] performs 1
87  // iteration, threads[1] performs 2 iterations, threads[2] performs
88  // 3 iterations, and so on.
89  for (uint32_t i = 0; i < kNumThreads; ++i) {
90  threads[i] =
91  std::make_unique<std::thread>(Runner, &g_counter_for_mt_slow, i + 1);
92  }
93  // Variable to grab current counter value.
94  int32_t counter_value;
95  // The expected value of the counter.
96  int32_t total = 0;
97  // The expected value of GetDeadThreadsTotal().
98  int32_t dead_total = 0;
99  // Each iteration of the following thread allows one additional
100  // iteration of the threads. Given that the threads perform
101  // different number of iterations from 1 through kNumThreads, one
102  // thread will complete in each of the iterations of the loop below.
103  for (uint32_t i = 0; i < kNumThreads; ++i) {
104  // Allow upto iteration i on all threads.
105  {
106  std::lock_guard<std::mutex> lk(cv_m);
107  g_sync_for_mt = i + 1;
108  }
109  cv.notify_all();
110  total += (kNumThreads - i) * 10;
111  // Loop until the counter reaches its expected value.
112  do {
113  counter_value = g_counter_for_mt_slow.readFull();
114  } while (counter_value < total);
115  // All threads have done what they can until iteration i, now make
116  // sure they don't go further by checking 10 more times in the
117  // following loop.
118  for (uint32_t j = 0; j < 10; ++j) {
119  counter_value = g_counter_for_mt_slow.readFull();
120  EXPECT_EQ(total, counter_value);
121  }
122  dead_total += (i + 1) * 10;
123  EXPECT_GE(dead_total, GetDeadThreadsTotal(g_counter_for_mt_slow));
124  }
125  // All threads are done.
126  for (uint32_t i = 0; i < kNumThreads; ++i) {
127  threads[i]->join();
128  }
129  counter_value = g_counter_for_mt_slow.readFull();
130  EXPECT_EQ(total, counter_value);
131  EXPECT_EQ(total, dead_total);
132  EXPECT_EQ(dead_total, GetDeadThreadsTotal(g_counter_for_mt_slow));
133 }
134 
135 // Fast test with lots of threads and only one call to readFull()
136 // at the end.
137 TEST_F(ThreadCachedIntTest, MultithreadedFast) {
138  static constexpr uint32_t kNumThreads = 1000;
139  g_sync_for_mt = 0;
140  vector<unique_ptr<std::thread>> threads(kNumThreads);
141  // Creates kNumThreads threads. Each thread performs a different
142  // number of iterations in Runner() - threads[0] performs 1
143  // iteration, threads[1] performs 2 iterations, threads[2] performs
144  // 3 iterations, and so on.
145  for (uint32_t i = 0; i < kNumThreads; ++i) {
146  threads[i] =
147  std::make_unique<std::thread>(Runner, &g_counter_for_mt_fast, i + 1);
148  }
149  // Let the threads run to completion.
150  {
151  std::lock_guard<std::mutex> lk(cv_m);
152  g_sync_for_mt = kNumThreads;
153  }
154  cv.notify_all();
155  // The expected value of the counter.
156  uint32_t total = 0;
157  for (uint32_t i = 0; i < kNumThreads; ++i) {
158  total += (kNumThreads - i) * 10;
159  }
160  // Wait for all threads to complete.
161  for (uint32_t i = 0; i < kNumThreads; ++i) {
162  threads[i]->join();
163  }
164  int32_t counter_value = g_counter_for_mt_fast.readFull();
165  EXPECT_EQ(total, counter_value);
166  EXPECT_EQ(total, GetDeadThreadsTotal(g_counter_for_mt_fast));
167 }
168 
169 TEST(ThreadCachedInt, SingleThreadedNotCached) {
171  EXPECT_EQ(0, val.readFast());
172  ++val;
173  EXPECT_EQ(1, val.readFast());
174  for (int i = 0; i < 41; ++i) {
175  val.increment(1);
176  }
177  EXPECT_EQ(42, val.readFast());
178  --val;
179  EXPECT_EQ(41, val.readFast());
180 }
181 
182 // Note: This is somewhat fragile to the implementation. If this causes
183 // problems, feel free to remove it.
184 TEST(ThreadCachedInt, SingleThreadedCached) {
186  EXPECT_EQ(0, val.readFast());
187  ++val;
188  EXPECT_EQ(0, val.readFast());
189  for (int i = 0; i < 7; ++i) {
190  val.increment(1);
191  }
192  EXPECT_EQ(0, val.readFast());
193  EXPECT_EQ(0, val.readFastAndReset());
194  EXPECT_EQ(8, val.readFull());
195  EXPECT_EQ(8, val.readFullAndReset());
196  EXPECT_EQ(0, val.readFull());
197  EXPECT_EQ(0, val.readFast());
198 }
199 
202 int kNumInserts = 100000;
203 DEFINE_int32(numThreads, 8, "Number simultaneous threads for benchmarks.");
204 #define CREATE_INC_FUNC(size) \
205  void incFunc##size() { \
206  const int num = kNumInserts / FLAGS_numThreads; \
207  for (int i = 0; i < num; ++i) { \
208  ++globalInt##size; \
209  } \
210  }
213 
214 // Confirms counts are accurate with competing threads
215 TEST(ThreadCachedInt, MultiThreadedCached) {
216  kNumInserts = 100000;
217  CHECK_EQ(0, kNumInserts % FLAGS_numThreads)
218  << "FLAGS_numThreads must evenly divide kNumInserts (" << kNumInserts
219  << ").";
220  const int numPerThread = kNumInserts / FLAGS_numThreads;
221  ThreadCachedInt<int64_t> TCInt64(0, numPerThread - 2);
222  {
223  std::atomic<bool> run(true);
224  std::atomic<int> threadsDone(0);
225  std::vector<std::thread> threads;
226  for (int i = 0; i < FLAGS_numThreads; ++i) {
227  threads.push_back(std::thread([&] {
228  FOR_EACH_RANGE (k, 0, numPerThread) { ++TCInt64; }
229  std::atomic_fetch_add(&threadsDone, 1);
230  while (run.load()) {
231  usleep(100);
232  }
233  }));
234  }
235 
236  // We create and increment another ThreadCachedInt here to make sure it
237  // doesn't interact with the other instances
238  ThreadCachedInt<int64_t> otherTCInt64(0, 10);
239  otherTCInt64.set(33);
240  ++otherTCInt64;
241 
242  while (threadsDone.load() < FLAGS_numThreads) {
243  usleep(100);
244  }
245 
246  ++otherTCInt64;
247 
248  // Threads are done incrementing, but caches have not been flushed yet, so
249  // we have to readFull.
250  EXPECT_NE(kNumInserts, TCInt64.readFast());
251  EXPECT_EQ(kNumInserts, TCInt64.readFull());
252 
253  run.store(false);
254  for (auto& t : threads) {
255  t.join();
256  }
257 
258  } // Caches are flushed when threads finish
259  EXPECT_EQ(kNumInserts, TCInt64.readFast());
260 }
261 
262 #define MAKE_MT_CACHE_SIZE_BM(size) \
263  void BM_mt_cache_size##size(int iters, int cacheSize) { \
264  kNumInserts = iters; \
265  globalInt##size.set(0); \
266  globalInt##size.setCacheSize(cacheSize); \
267  std::vector<std::thread> threads; \
268  for (int i = 0; i < FLAGS_numThreads; ++i) { \
269  threads.push_back(std::thread(incFunc##size)); \
270  } \
271  for (auto& t : threads) { \
272  t.join(); \
273  } \
274  }
277 
278 #define REG_BASELINE(name, inc_stmt) \
279  BENCHMARK(FB_CONCATENATE(BM_mt_baseline_, name), iters) { \
280  const int iterPerThread = iters / FLAGS_numThreads; \
281  std::vector<std::thread> threads; \
282  for (int i = 0; i < FLAGS_numThreads; ++i) { \
283  threads.push_back(std::thread([&]() { \
284  for (int j = 0; j < iterPerThread; ++j) { \
285  inc_stmt; \
286  } \
287  })); \
288  } \
289  for (auto& t : threads) { \
290  t.join(); \
291  } \
292  }
293 
296 std::atomic<int64_t> globalInt64Baseline(0);
297 std::atomic<int32_t> globalInt32Baseline(0);
300 
301 // Alternate lock-free implementation. Achieves about the same performance,
302 // but uses about 20x more memory than ThreadCachedInt with 24 threads.
304  static const int64_t kBuckets_ = 2048;
305  std::atomic<int64_t> ints_[kBuckets_];
306 
307  inline void inc(int64_t val = 1) {
308  int buck = hash::twang_mix64(folly::getCurrentThreadID()) & (kBuckets_ - 1);
309  std::atomic_fetch_add(&ints_[buck], val);
310  }
311 
312  // read the first few and extrapolate
314  int64_t ret = 0;
315  static const int numToRead = 8;
316  FOR_EACH_RANGE (i, 0, numToRead) {
317  ret += ints_[i].load(std::memory_order_relaxed);
318  }
319  return ret * (kBuckets_ / numToRead);
320  }
321 
322  // readFull is lock-free, but has to do thousands of loads...
324  int64_t ret = 0;
325  for (auto& i : ints_) {
326  // Fun fact - using memory_order_consume below reduces perf 30-40% in high
327  // contention benchmarks.
328  ret += i.load(std::memory_order_relaxed);
329  }
330  return ret;
331  }
332 };
334 
335 REG_BASELINE(_thread64, global__thread64 += 1)
336 REG_BASELINE(_thread32, global__thread32 += 1)
337 REG_BASELINE(ThreadLocal64, *globalTL64Baseline += 1)
338 REG_BASELINE(ThreadLocal32, *globalTL32Baseline += 1)
340  atomic_inc64,
341  std::atomic_fetch_add(&globalInt64Baseline, int64_t(1)))
343  atomic_inc32,
344  std::atomic_fetch_add(&globalInt32Baseline, int32_t(1)))
345 REG_BASELINE(ShardedAtm64, shd_int64.inc())
346 
348 BENCHMARK_PARAM(BM_mt_cache_size64, 10)
349 BENCHMARK_PARAM(BM_mt_cache_size64, 100)
350 BENCHMARK_PARAM(BM_mt_cache_size64, 1000)
352 BENCHMARK_PARAM(BM_mt_cache_size32, 10)
353 BENCHMARK_PARAM(BM_mt_cache_size32, 100)
354 BENCHMARK_PARAM(BM_mt_cache_size32, 1000)
356 
357 // single threaded
358 BENCHMARK(Atomic_readFull) {
359  doNotOptimizeAway(globalInt64Baseline.load(std::memory_order_relaxed));
360 }
361 BENCHMARK(ThrCache_readFull) {
362  doNotOptimizeAway(globalInt64.readFull());
363 }
364 BENCHMARK(Sharded_readFull) {
365  doNotOptimizeAway(shd_int64.readFull());
366 }
367 BENCHMARK(ThrCache_readFast) {
368  doNotOptimizeAway(globalInt64.readFast());
369 }
370 BENCHMARK(Sharded_readFast) {
371  doNotOptimizeAway(shd_int64.readFast());
372 }
374 
375 // multi threaded
377  Atomic_readFull,
378  doNotOptimizeAway(globalInt64Baseline.load(std::memory_order_relaxed)))
379 REG_BASELINE(ThrCache_readFull, doNotOptimizeAway(globalInt64.readFull()))
380 REG_BASELINE(Sharded_readFull, doNotOptimizeAway(shd_int64.readFull()))
381 REG_BASELINE(ThrCache_readFast, doNotOptimizeAway(globalInt64.readFast()))
382 REG_BASELINE(Sharded_readFast, doNotOptimizeAway(shd_int64.readFast()))
383 BENCHMARK_DRAW_LINE();
384 
385 int main(int argc, char** argv) {
386  testing::InitGoogleTest(&argc, argv);
387  gflags::ParseCommandLineFlags(&argc, &argv, true);
388  gflags::SetCommandLineOptionWithMode(
389  "bm_min_usec", "10000", gflags::SET_FLAG_IF_DEFAULT);
390  if (FLAGS_benchmark) {
392  }
393  return RUN_ALL_TESTS();
394 }
395 
396 /*
397  Ran with 20 threads on dual 12-core Xeon(R) X5650 @ 2.67GHz with 12-MB caches
398 
399  Benchmark Iters Total t t/iter iter/sec
400  ------------------------------------------------------------------------------
401  + 103% BM_mt_baseline__thread64 10000000 13.54 ms 1.354 ns 704.4 M
402 * BM_mt_baseline__thread32 10000000 6.651 ms 665.1 ps 1.4 G
403  +50.3% BM_mt_baseline_ThreadLocal64 10000000 9.994 ms 999.4 ps 954.2 M
404  +49.9% BM_mt_baseline_ThreadLocal32 10000000 9.972 ms 997.2 ps 956.4 M
405  +2650% BM_mt_baseline_atomic_inc64 10000000 182.9 ms 18.29 ns 52.13 M
406  +2665% BM_mt_baseline_atomic_inc32 10000000 183.9 ms 18.39 ns 51.85 M
407  +75.3% BM_mt_baseline_ShardedAtm64 10000000 11.66 ms 1.166 ns 817.8 M
408  +6670% BM_mt_cache_size64/0 10000000 450.3 ms 45.03 ns 21.18 M
409  +1644% BM_mt_cache_size64/10 10000000 116 ms 11.6 ns 82.2 M
410  + 381% BM_mt_cache_size64/100 10000000 32.04 ms 3.204 ns 297.7 M
411  + 129% BM_mt_cache_size64/1000 10000000 15.24 ms 1.524 ns 625.8 M
412  +6052% BM_mt_cache_size32/0 10000000 409.2 ms 40.92 ns 23.31 M
413  +1304% BM_mt_cache_size32/10 10000000 93.39 ms 9.339 ns 102.1 M
414  + 298% BM_mt_cache_size32/100 10000000 26.52 ms 2.651 ns 359.7 M
415  +68.1% BM_mt_cache_size32/1000 10000000 11.18 ms 1.118 ns 852.9 M
416 ------------------------------------------------------------------------------
417  +10.4% Atomic_readFull 10000000 36.05 ms 3.605 ns 264.5 M
418  + 619% ThrCache_readFull 10000000 235.1 ms 23.51 ns 40.57 M
419  SLOW Sharded_readFull 1981093 2 s 1.01 us 967.3 k
420 * ThrCache_readFast 10000000 32.65 ms 3.265 ns 292.1 M
421  +10.0% Sharded_readFast 10000000 35.92 ms 3.592 ns 265.5 M
422 ------------------------------------------------------------------------------
423  +4.54% BM_mt_baseline_Atomic_readFull 10000000 8.672 ms 867.2 ps 1.074 G
424  SLOW BM_mt_baseline_ThrCache_readFull 10000000 996.9 ms 99.69 ns 9.567 M
425  SLOW BM_mt_baseline_Sharded_readFull 10000000 891.5 ms 89.15 ns 10.7 M
426 * BM_mt_baseline_ThrCache_readFast 10000000 8.295 ms 829.5 ps 1.123 G
427  +12.7% BM_mt_baseline_Sharded_readFast 10000000 9.348 ms 934.8 ps 1020 M
428 ------------------------------------------------------------------------------
429 */
int main(int argc, char **argv)
ThreadLocal< int32_t > globalTL32Baseline
FOLLY_TLS int64_t global__thread64
void BM_mt_cache_size64(int iters, int cacheSize)
int RUN_ALL_TESTS() GTEST_MUST_USE_RESULT_
Definition: gtest.h:2232
#define EXPECT_EQ(val1, val2)
Definition: gtest.h:1922
#define REG_BASELINE(name, inc_stmt)
ssize_t readFull(int fd, void *buf, size_t count)
Definition: FileUtil.cpp:126
FOLLY_TLS int32_t global__thread32
STL namespace.
ThreadCachedInt< int32_t > globalInt32(0, 11)
double val
Definition: String.cpp:273
static size_t const kNumThreads
std::atomic< int64_t > globalInt64Baseline(0)
int kNumInserts
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
void runBenchmarks()
Definition: Benchmark.cpp:456
#define EXPECT_GE(val1, val2)
Definition: gtest.h:1932
ThreadCachedInt< int64_t > Counter
void inc(int64_t val=1)
#define CREATE_INC_FUNC(size)
std::vector< std::thread::id > threads
#define FOR_EACH_RANGE(i, begin, end)
Definition: Foreach.h:313
#define MAKE_MT_CACHE_SIZE_BM(size)
char ** argv
static void run(EventBaseManager *ebm, EventBase *eb, folly::Baton<> *stop, const StringPiece &name)
ShardedAtomicInt shd_int64
DEFINE_int32(numThreads, 8,"Number simultaneous threads for benchmarks.")
TEST_F(AsyncSSLSocketWriteTest, write_coalescing1)
#define BENCHMARK_PARAM(name, param)
Definition: Benchmark.h:417
std::atomic< int32_t > globalInt32Baseline(0)
uint64_t twang_mix64(uint64_t key) noexcept
Definition: Hash.h:49
ThreadLocal< int64_t > globalTL64Baseline
BENCHMARK(fbFollyGlobalBenchmarkBaseline)
Definition: Benchmark.cpp:84
std::mutex mutex
std::atomic< int > counter
uint64_t getCurrentThreadID()
Definition: ThreadId.h:42
BENCHMARK_DRAW_LINE()
#define EXPECT_NE(val1, val2)
Definition: gtest.h:1926
uint32_t GetDeadThreadsTotal(const Counter &counter)
GTEST_API_ void InitGoogleTest(int *argc, char **argv)
Definition: gtest.cc:5370
void BM_mt_cache_size32(int iters, int cacheSize)
KeyT k
ThreadCachedInt< int64_t > globalInt64(0, 11)
TEST(SequencedExecutor, CPUThreadPoolExecutor)
auto doNotOptimizeAway(const T &datum) -> typename std::enable_if< !detail::DoNotOptimizeAwayNeedsIndirect< T >::value >::type
Definition: Benchmark.h:258
void set(IntT newVal)