proxygen
AtomicHashMapTest.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2012-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <folly/AtomicHashMap.h>
18 
19 #include <atomic>
20 #include <memory>
21 #include <thread>
22 
23 #include <glog/logging.h>
24 
25 #include <folly/Benchmark.h>
26 #include <folly/Conv.h>
29 
32 using folly::StringPiece;
33 using std::string;
34 using std::vector;
35 
36 // Tunables:
37 DEFINE_double(targetLoadFactor, 0.75, "Target memory utilization fraction.");
38 DEFINE_double(maxLoadFactor, 0.80, "Max before growth.");
39 DEFINE_int32(numThreads, 8, "Threads to use for concurrency tests.");
40 DEFINE_int64(numBMElements, 12 * 1000 * 1000, "Size of maps for benchmarks.");
41 
42 const double LF = FLAGS_maxLoadFactor / FLAGS_targetLoadFactor;
43 const int maxBMElements = int(FLAGS_numBMElements * LF); // hit our target LF.
44 
45 static int64_t nowInUsec() {
46  timeval tv;
47  gettimeofday(&tv, nullptr);
48  return int64_t(tv.tv_sec) * 1000 * 1000 + tv.tv_usec;
49 }
50 
51 TEST(Ahm, BasicStrings) {
53  AHM myMap(1024);
54  EXPECT_TRUE(myMap.begin() == myMap.end());
55 
56  for (int i = 0; i < 100; ++i) {
57  myMap.insert(make_pair(i, folly::to<string>(i)));
58  }
59  for (int i = 0; i < 100; ++i) {
60  EXPECT_EQ(myMap.find(i)->second, folly::to<string>(i));
61  }
62 
63  myMap.insert(std::make_pair(999, "A"));
64  myMap.insert(std::make_pair(999, "B"));
65  EXPECT_EQ(myMap.find(999)->second, "A"); // shouldn't have overwritten
66  myMap.find(999)->second = "B";
67  myMap.find(999)->second = "C";
68  EXPECT_EQ(myMap.find(999)->second, "C");
69  EXPECT_EQ(myMap.find(999)->first, 999);
70 }
71 
72 TEST(Ahm, BasicNoncopyable) {
74  AHM myMap(1024);
75  EXPECT_TRUE(myMap.begin() == myMap.end());
76 
77  for (int i = 0; i < 50; ++i) {
78  myMap.insert(make_pair(i, std::make_unique<int>(i)));
79  }
80  for (int i = 50; i < 100; ++i) {
81  myMap.insert(i, std::make_unique<int>(i));
82  }
83  for (int i = 100; i < 150; ++i) {
84  myMap.emplace(i, new int(i));
85  }
86  for (int i = 150; i < 200; ++i) {
87  myMap.emplace(i, new int(i), std::default_delete<int>());
88  }
89  for (int i = 0; i < 200; ++i) {
90  EXPECT_EQ(*(myMap.find(i)->second), i);
91  }
92  for (int i = 0; i < 200; i += 4) {
93  myMap.erase(i);
94  }
95  for (int i = 0; i < 200; i += 4) {
96  EXPECT_EQ(myMap.find(i), myMap.end());
97  }
98 }
99 
100 typedef int32_t KeyT;
101 typedef int32_t ValueT;
102 
109 static AHArrayT::SmartPtr globalAHA(nullptr);
110 static std::unique_ptr<AHMapT> globalAHM;
111 static std::unique_ptr<QPAHMapT> globalQPAHM;
112 
113 // Generate a deterministic value based on an input key
114 static int genVal(int key) {
115  return key / 3;
116 }
117 
118 static bool legalKey(const char* a);
119 
120 struct EqTraits {
121  bool operator()(const char* a, const char* b) {
122  return legalKey(a) && (strcmp(a, b) == 0);
123  }
124  bool operator()(const char* a, const char& b) {
125  return legalKey(a) && (a[0] != '\0') && (a[0] == b);
126  }
127  bool operator()(const char* a, const StringPiece b) {
128  return legalKey(a) && (strlen(a) == b.size()) &&
129  (strcmp(a, b.begin()) == 0);
130  }
131 };
132 
133 struct HashTraits {
134  size_t operator()(const char* a) {
135  size_t result = 0;
136  while (a[0] != 0) {
137  result += static_cast<size_t>(*(a++));
138  }
139  return result;
140  }
141  size_t operator()(const char& a) {
142  return static_cast<size_t>(a);
143  }
144  size_t operator()(const StringPiece a) {
145  size_t result = 0;
146  for (const auto& ch : a) {
147  result += static_cast<size_t>(ch);
148  }
149  return result;
150  }
151 };
152 
155 
156 static bool legalKey(const char* a) {
157  return a != cstrIntCfg.emptyKey && a != cstrIntCfg.lockedKey &&
158  a != cstrIntCfg.erasedKey;
159 }
160 
161 TEST(Ahm, BasicLookup) {
162  AHMCstrInt myMap(1024, cstrIntCfg);
163  EXPECT_TRUE(myMap.begin() == myMap.end());
164  myMap.insert(std::make_pair("f", 42));
165  EXPECT_EQ(42, myMap.find("f")->second);
166  {
167  // Look up a single char, successfully.
168  auto it = myMap.find<char>('f');
169  EXPECT_EQ(42, it->second);
170  }
171  {
172  // Look up a single char, unsuccessfully.
173  auto it = myMap.find<char>('g');
174  EXPECT_TRUE(it == myMap.end());
175  }
176  {
177  // Look up a string piece, successfully.
178  const StringPiece piece("f");
179  auto it = myMap.find(piece);
180  EXPECT_EQ(42, it->second);
181  }
182 }
183 
184 TEST(Ahm, grow) {
185  VLOG(1) << "Overhead: " << sizeof(AHArrayT) << " (array) "
186  << sizeof(AHMapT) + sizeof(AHArrayT) << " (map/set) Bytes.";
187  uint64_t numEntries = 10000;
188  float sizeFactor = 0.46f;
189 
190  std::unique_ptr<AHMapT> m(new AHMapT(int(numEntries * sizeFactor), config));
191 
192  // load map - make sure we succeed and the index is accurate
193  bool success = true;
194  for (uint64_t i = 0; i < numEntries; i++) {
195  auto ret = m->insert(RecordT(i, genVal(i)));
196  success &= ret.second;
197  success &= (m->findAt(ret.first.getIndex())->second == genVal(i));
198  }
199  // Overwrite vals to make sure there are no dups
200  // Every insert should fail because the keys are already in the map.
201  success = true;
202  for (uint64_t i = 0; i < numEntries; i++) {
203  auto ret = m->insert(RecordT(i, genVal(i * 2)));
204  success &= (ret.second == false); // fail on collision
205  success &= (ret.first->second == genVal(i)); // return the previous value
206  success &= (m->findAt(ret.first.getIndex())->second == genVal(i));
207  }
208  EXPECT_TRUE(success);
209 
210  // check correctness
211  EXPECT_GT(m->numSubMaps(), 1); // make sure we grew
212  success = true;
213  EXPECT_EQ(m->size(), numEntries);
214  for (size_t i = 0; i < numEntries; i++) {
215  success &= (m->find(i)->second == genVal(i));
216  }
217  EXPECT_TRUE(success);
218 
219  // Check findAt
220  success = true;
222  for (int32_t i = 0; i < int32_t(numEntries); i++) {
223  retIt = m->find(i);
224  retIt = m->findAt(retIt.getIndex());
225  success &= (retIt->second == genVal(i));
226  // We use a uint32_t index so that this comparison is between two
227  // variables of the same type.
228  success &= (retIt->first == i);
229  }
230  EXPECT_TRUE(success);
231 
232  // Try modifying value
233  m->find(8)->second = 5309;
234  EXPECT_EQ(m->find(8)->second, 5309);
235 
236  // check clear()
237  m->clear();
238  success = true;
239  for (uint64_t i = 0; i < numEntries / 2; i++) {
240  success &= m->insert(RecordT(i, genVal(i))).second;
241  }
242  EXPECT_TRUE(success);
243  EXPECT_EQ(m->size(), numEntries / 2);
244 }
245 
246 TEST(Ahm, iterator) {
247  int numEntries = 10000;
248  float sizeFactor = .46f;
249  std::unique_ptr<AHMapT> m(new AHMapT(int(numEntries * sizeFactor), config));
250 
251  // load map - make sure we succeed and the index is accurate
252  for (int i = 0; i < numEntries; i++) {
253  m->insert(RecordT(i, genVal(i)));
254  }
255 
256  bool success = true;
257  int count = 0;
258  FOR_EACH (it, *m) {
259  success &= (it->second == genVal(it->first));
260  ++count;
261  }
262  EXPECT_TRUE(success);
263  EXPECT_EQ(count, numEntries);
264 }
265 
266 class Counters {
267  private:
268  // Note: Unfortunately can't currently put a std::atomic<int64_t> in
269  // the value in ahm since it doesn't support types that are both non-copy
270  // and non-move constructible yet.
272 
273  public:
274  explicit Counters(size_t numCounters) : ahm(numCounters) {}
275 
276  void increment(int64_t obj_id) {
277  auto ret = ahm.insert(std::make_pair(obj_id, 1));
278  if (!ret.second) {
279  // obj_id already exists, increment count
280  __sync_fetch_and_add(&ret.first->second, 1);
281  }
282  }
283 
285  auto ret = ahm.find(obj_id);
286  return ret != ahm.end() ? ret->second : 0;
287  }
288 
289  // export the counters without blocking increments
290  string toString() {
291  string ret = "{\n";
292  ret.reserve(ahm.size() * 32);
293  for (const auto& e : ahm) {
294  ret += folly::to<string>(" [", e.first, ":", e.second, "]\n");
295  }
296  ret += "}\n";
297  return ret;
298  }
299 };
300 
301 // If you get an error "terminate called without an active exception", there
302 // might be too many threads getting created - decrease numKeys and/or mult.
303 TEST(Ahm, counter) {
304  const int numKeys = 10;
305  const int mult = 10;
306  Counters c(numKeys);
307  vector<int64_t> keys;
308  FOR_EACH_RANGE (i, 1, numKeys) { keys.push_back(i); }
309  vector<std::thread> threads;
310  for (auto key : keys) {
311  FOR_EACH_RANGE (i, 0, key * mult) {
312  threads.push_back(std::thread([&, key] { c.increment(key); }));
313  }
314  }
315  for (auto& t : threads) {
316  t.join();
317  }
318  string str = c.toString();
319  for (auto key : keys) {
320  int val = key * mult;
321  EXPECT_EQ(val, c.getValue(key));
322  EXPECT_NE(
323  string::npos, str.find(folly::to<string>("[", key, ":", val, "]")));
324  }
325 }
326 
327 class Integer {
328  public:
329  explicit Integer(KeyT v = 0) : v_(v) {}
330 
332  static bool throwException_ = false;
333  throwException_ = !throwException_;
334  if (throwException_) {
335  throw 1;
336  }
337  v_ = a.v_;
338  return *this;
339  }
340 
341  bool operator==(const Integer& a) const {
342  return v_ == a.v_;
343  }
344 
345  private:
347 };
348 
349 TEST(Ahm, map_exception_safety) {
350  typedef AtomicHashMap<KeyT, Integer> MyMapT;
351 
352  int numEntries = 10000;
353  float sizeFactor = 0.46f;
354  std::unique_ptr<MyMapT> m(new MyMapT(int(numEntries * sizeFactor)));
355 
356  bool success = true;
357  int count = 0;
358  for (int i = 0; i < numEntries; i++) {
359  try {
360  m->insert(i, Integer(genVal(i)));
361  success &= (m->find(i)->second == Integer(genVal(i)));
362  ++count;
363  } catch (...) {
364  success &= !m->count(i);
365  }
366  }
367  EXPECT_EQ(count, m->size());
368  EXPECT_TRUE(success);
369 }
370 
371 TEST(Ahm, basicErase) {
372  size_t numEntries = 3000;
373 
374  std::unique_ptr<AHMapT> s(new AHMapT(numEntries, config));
375  // Iterate filling up the map and deleting all keys a few times
376  // to test more than one subMap.
377  for (int iterations = 0; iterations < 4; ++iterations) {
378  // Testing insertion of keys
379  bool success = true;
380  for (size_t i = 0; i < numEntries; ++i) {
381  success &= !(s->count(i));
382  auto ret = s->insert(RecordT(i, i));
383  success &= s->count(i);
384  success &= ret.second;
385  }
386  EXPECT_TRUE(success);
387  EXPECT_EQ(s->size(), numEntries);
388 
389  // Delete every key in the map and verify that the key is gone and the the
390  // size is correct.
391  success = true;
392  for (size_t i = 0; i < numEntries; ++i) {
393  success &= s->erase(i);
394  success &= (s->size() == numEntries - 1 - i);
395  success &= !(s->count(i));
396  success &= !(s->erase(i));
397  }
398  EXPECT_TRUE(success);
399  }
400  VLOG(1) << "Final number of subMaps = " << s->numSubMaps();
401 }
402 
403 namespace {
404 
405 inline KeyT randomizeKey(int key) {
406  // We deterministically randomize the key to more accurately simulate
407  // real-world usage, and to avoid pathalogical performance patterns (e.g.
408  // those related to std::hash<int64_t>()(1) == 1).
409  //
410  // Use a hash function we don't normally use for ints to avoid interactions.
411  return folly::hash::jenkins_rev_mix32(key);
412 }
413 
414 int numOpsPerThread = 0;
415 
416 void* insertThread(void* jj) {
417  int64_t j = (int64_t)jj;
418  for (int i = 0; i < numOpsPerThread; ++i) {
419  KeyT key = randomizeKey(i + j * numOpsPerThread);
420  globalAHM->insert(key, genVal(key));
421  }
422  return nullptr;
423 }
424 
425 void* qpInsertThread(void* jj) {
426  int64_t j = (int64_t)jj;
427  for (int i = 0; i < numOpsPerThread; ++i) {
428  KeyT key = randomizeKey(i + j * numOpsPerThread);
429  globalQPAHM->insert(key, genVal(key));
430  }
431  return nullptr;
432 }
433 
434 void* insertThreadArr(void* jj) {
435  int64_t j = (int64_t)jj;
436  for (int i = 0; i < numOpsPerThread; ++i) {
437  KeyT key = randomizeKey(i + j * numOpsPerThread);
438  globalAHA->insert(std::make_pair(key, genVal(key)));
439  }
440  return nullptr;
441 }
442 
443 std::atomic<bool> runThreadsCreatedAllThreads;
444 void runThreads(void* (*mainFunc)(void*), int numThreads, void** statuses) {
446  runThreadsCreatedAllThreads.store(false);
447  vector<std::thread> threads;
448  for (int64_t j = 0; j < numThreads; j++) {
449  threads.emplace_back([statuses, mainFunc, j]() {
450  auto ret = mainFunc((void*)j);
451  if (statuses != nullptr) {
452  statuses[j] = ret;
453  }
454  });
455  }
456  susp.dismiss();
457 
458  runThreadsCreatedAllThreads.store(true);
459  for (size_t i = 0; i < threads.size(); ++i) {
460  threads[i].join();
461  }
462 }
463 
464 void runThreads(void* (*mainFunc)(void*)) {
465  runThreads(mainFunc, FLAGS_numThreads, nullptr);
466 }
467 
468 } // namespace
469 
470 TEST(Ahm, collision_test) {
471  const int numInserts = 1000000 / 4;
472 
473  // Doing the same number on each thread so we collide.
474  numOpsPerThread = numInserts;
475 
476  float sizeFactor = 0.46f;
477  int entrySize = sizeof(KeyT) + sizeof(ValueT);
478  VLOG(1) << "Testing " << numInserts << " unique " << entrySize
479  << " Byte entries replicated in " << FLAGS_numThreads
480  << " threads with " << FLAGS_maxLoadFactor * 100.0
481  << "% max load factor.";
482 
483  globalAHM = std::make_unique<AHMapT>(int(numInserts * sizeFactor), config);
484 
485  size_t sizeInit = globalAHM->capacity();
486  VLOG(1) << " Initial capacity: " << sizeInit;
487 
488  double start = nowInUsec();
489  runThreads([](void*) -> void* { // collisionInsertThread
490  for (int i = 0; i < numOpsPerThread; ++i) {
491  KeyT key = randomizeKey(i);
492  globalAHM->insert(key, genVal(key));
493  }
494  return nullptr;
495  });
496  double elapsed = nowInUsec() - start;
497 
498  size_t finalCap = globalAHM->capacity();
499  size_t sizeAHM = globalAHM->size();
500  VLOG(1) << elapsed / sizeAHM << " usec per " << FLAGS_numThreads
501  << " duplicate inserts (atomic).";
502  VLOG(1) << " Final capacity: " << finalCap << " in "
503  << globalAHM->numSubMaps() << " sub maps ("
504  << sizeAHM * 100 / finalCap << "% load factor, "
505  << (finalCap - sizeInit) * 100 / sizeInit << "% growth).";
506 
507  // check correctness
508  EXPECT_EQ(sizeAHM, numInserts);
509  bool success = true;
510  for (int i = 0; i < numInserts; ++i) {
511  KeyT key = randomizeKey(i);
512  success &= (globalAHM->find(key)->second == genVal(key));
513  }
514  EXPECT_TRUE(success);
515 
516  // check colliding finds
517  start = nowInUsec();
518  runThreads([](void*) -> void* { // collisionFindThread
519  KeyT key(0);
520  for (int i = 0; i < numOpsPerThread; ++i) {
521  globalAHM->find(key);
522  }
523  return nullptr;
524  });
525 
526  elapsed = nowInUsec() - start;
527 
528  VLOG(1) << elapsed / sizeAHM << " usec per " << FLAGS_numThreads
529  << " duplicate finds (atomic).";
530 }
531 
532 namespace {
533 
534 const int kInsertPerThread = 100000;
535 int raceFinalSizeEstimate;
536 
537 void* raceIterateThread(void*) {
538  int count = 0;
539 
540  AHMapT::iterator it = globalAHM->begin();
541  AHMapT::iterator end = globalAHM->end();
542  for (; it != end; ++it) {
543  ++count;
544  if (count > raceFinalSizeEstimate) {
545  EXPECT_FALSE("Infinite loop in iterator.");
546  return nullptr;
547  }
548  }
549  return nullptr;
550 }
551 
552 void* raceInsertRandomThread(void*) {
553  for (int i = 0; i < kInsertPerThread; ++i) {
554  KeyT key = rand();
555  globalAHM->insert(key, genVal(key));
556  }
557  return nullptr;
558 }
559 
560 } // namespace
561 
562 // Test for race conditions when inserting and iterating at the same time and
563 // creating multiple submaps.
564 TEST(Ahm, race_insert_iterate_thread_test) {
565  const int kInsertThreads = 20;
566  const int kIterateThreads = 20;
567  raceFinalSizeEstimate = kInsertThreads * kInsertPerThread;
568 
569  VLOG(1) << "Testing iteration and insertion with " << kInsertThreads
570  << " threads inserting and " << kIterateThreads
571  << " threads iterating.";
572 
573  globalAHM = std::make_unique<AHMapT>(raceFinalSizeEstimate / 9, config);
574 
575  vector<pthread_t> threadIds;
576  for (int j = 0; j < kInsertThreads + kIterateThreads; j++) {
577  pthread_t tid;
578  void* (*thread)(void*) =
579  (j < kInsertThreads ? raceInsertRandomThread : raceIterateThread);
580  if (pthread_create(&tid, nullptr, thread, nullptr) != 0) {
581  LOG(ERROR) << "Could not start thread";
582  } else {
583  threadIds.push_back(tid);
584  }
585  }
586  for (size_t i = 0; i < threadIds.size(); ++i) {
587  pthread_join(threadIds[i], nullptr);
588  }
589  VLOG(1) << "Ended up with " << globalAHM->numSubMaps() << " submaps";
590  VLOG(1) << "Final size of map " << globalAHM->size();
591 }
592 
593 namespace {
594 
595 const int kTestEraseInsertions = 200000;
596 std::atomic<int32_t> insertedLevel;
597 
598 void* testEraseInsertThread(void*) {
599  for (int i = 0; i < kTestEraseInsertions; ++i) {
600  KeyT key = randomizeKey(i);
601  globalAHM->insert(key, genVal(key));
602  insertedLevel.store(i, std::memory_order_release);
603  }
604  insertedLevel.store(kTestEraseInsertions, std::memory_order_release);
605  return nullptr;
606 }
607 
608 void* testEraseEraseThread(void*) {
609  for (int i = 0; i < kTestEraseInsertions; ++i) {
610  /*
611  * Make sure that we don't get ahead of the insert thread, because
612  * part of the condition for this unit test succeeding is that the
613  * map ends up empty.
614  *
615  * Note, there is a subtle case here when a new submap is
616  * allocated: the erasing thread might get 0 from count(key)
617  * because it hasn't seen numSubMaps_ update yet. To avoid this
618  * race causing problems for the test (it's ok for real usage), we
619  * lag behind the inserter by more than just element.
620  */
621  const int lag = 10;
622  int currentLevel;
623  do {
624  currentLevel = insertedLevel.load(std::memory_order_acquire);
625  if (currentLevel == kTestEraseInsertions) {
626  currentLevel += lag + 1;
627  }
628  } while (currentLevel - lag < i);
629 
630  KeyT key = randomizeKey(i);
631  while (globalAHM->count(key)) {
632  if (globalAHM->erase(key)) {
633  break;
634  }
635  }
636  }
637  return nullptr;
638 }
639 
640 } // namespace
641 
642 // Here we have a single thread inserting some values, and several threads
643 // racing to delete the values in the order they were inserted.
644 TEST(Ahm, thread_erase_insert_race) {
645  const int kInsertThreads = 1;
646  const int kEraseThreads = 10;
647 
648  VLOG(1) << "Testing insertion and erase with " << kInsertThreads
649  << " thread inserting and " << kEraseThreads << " threads erasing.";
650 
651  globalAHM = std::make_unique<AHMapT>(kTestEraseInsertions / 4, config);
652 
653  vector<pthread_t> threadIds;
654  for (int64_t j = 0; j < kInsertThreads + kEraseThreads; j++) {
655  pthread_t tid;
656  void* (*thread)(void*) =
657  (j < kInsertThreads ? testEraseInsertThread : testEraseEraseThread);
658  if (pthread_create(&tid, nullptr, thread, (void*)j) != 0) {
659  LOG(ERROR) << "Could not start thread";
660  } else {
661  threadIds.push_back(tid);
662  }
663  }
664  for (size_t i = 0; i < threadIds.size(); i++) {
665  pthread_join(threadIds[i], nullptr);
666  }
667 
668  EXPECT_TRUE(globalAHM->empty());
669  EXPECT_EQ(globalAHM->size(), 0);
670 
671  VLOG(1) << "Ended up with " << globalAHM->numSubMaps() << " submaps";
672 }
673 
674 // Repro for T#483734: Duplicate AHM inserts due to incorrect AHA return value.
678 void* atomicHashArrayInsertRaceThread(void* /* j */) {
679  AHA* arr = atomicHashArrayInsertRaceArray.get();
680  uintptr_t numInserted = 0;
681  while (!runThreadsCreatedAllThreads.load()) {
682  ;
683  }
684  for (int i = 0; i < 2; i++) {
685  if (arr->insert(RecordT(randomizeKey(i), 0)).first != arr->end()) {
686  numInserted++;
687  }
688  }
689  return (void*)numInserted;
690 }
691 TEST(Ahm, atomic_hash_array_insert_race) {
692  AHA* arr = atomicHashArrayInsertRaceArray.get();
693  int numIterations = 5000;
694  constexpr int numThreads = 4;
695  void* statuses[numThreads];
696  for (int i = 0; i < numIterations; i++) {
697  arr->clear();
698  runThreads(atomicHashArrayInsertRaceThread, numThreads, statuses);
699  EXPECT_GE(arr->size(), 1);
700  for (int j = 0; j < numThreads; j++) {
701  EXPECT_EQ(arr->size(), uintptr_t(statuses[j]));
702  }
703  }
704 }
705 
706 // Repro for T#5841499. Race between erase() and find() on the same key.
707 TEST(Ahm, erase_find_race) {
708  const uint64_t limit = 10000;
710  std::atomic<uint64_t> key{1};
711 
712  // Invariant: all values are equal to their keys.
713  // At any moment there is one or two consecutive keys in the map.
714 
715  std::thread write_thread([&]() {
716  while (true) {
717  uint64_t k = ++key;
718  if (k > limit) {
719  break;
720  }
721  map.insert(k + 1, k + 1);
722  map.erase(k);
723  }
724  });
725 
726  std::thread read_thread([&]() {
727  while (true) {
728  uint64_t k = key.load();
729  if (k > limit) {
730  break;
731  }
732 
733  auto it = map.find(k);
734  if (it != map.end()) {
735  ASSERT_EQ(k, it->second);
736  }
737  }
738  });
739 
740  read_thread.join();
741  write_thread.join();
742 }
743 
744 // Erase right after insert race bug repro (t9130653)
745 TEST(Ahm, erase_after_insert_race) {
746  const uint64_t limit = 10000;
747  const size_t num_threads = 100;
748  const size_t num_iters = 500;
750 
751  std::atomic<bool> go{false};
752  std::vector<std::thread> ts;
753  for (size_t i = 0; i < num_threads; ++i) {
754  ts.emplace_back([&]() {
755  while (!go) {
756  continue;
757  }
758  for (size_t n = 0; n < num_iters; ++n) {
759  map.erase(1);
760  map.insert(1, 1);
761  }
762  });
763  }
764 
765  go = true;
766 
767  for (auto& t : ts) {
768  t.join();
769  }
770 }
771 
772 // Repro for a bug when iterator didn't skip empty submaps.
773 TEST(Ahm, iterator_skips_empty_submaps) {
775  conf.growthFactor = 1;
776 
778 
779  map.insert(1, 1);
780  map.insert(2, 2);
781  map.insert(3, 3);
782 
783  map.erase(2);
784 
785  auto it = map.find(1);
786 
787  ASSERT_NE(map.end(), it);
788  ASSERT_EQ(1, it->first);
789  ASSERT_EQ(1, it->second);
790 
791  ++it;
792 
793  ASSERT_NE(map.end(), it);
794  ASSERT_EQ(3, it->first);
795  ASSERT_EQ(3, it->second);
796 
797  ++it;
798  ASSERT_EQ(map.end(), it);
799 }
800 
801 namespace {
802 
803 void loadGlobalAha() {
804  std::cout << "loading global AHA with " << FLAGS_numThreads
805  << " threads...\n";
808  numOpsPerThread = FLAGS_numBMElements / FLAGS_numThreads;
809  CHECK_EQ(0, FLAGS_numBMElements % FLAGS_numThreads)
810  << "kNumThreads must evenly divide kNumInserts.";
811  runThreads(insertThreadArr);
812  uint64_t elapsed = nowInUsec() - start;
813  std::cout << " took " << elapsed / 1000 << " ms ("
814  << (elapsed * 1000 / FLAGS_numBMElements) << " ns/insert).\n";
815  EXPECT_EQ(globalAHA->size(), FLAGS_numBMElements);
816 }
817 
818 void loadGlobalAhm() {
819  std::cout << "loading global AHM with " << FLAGS_numThreads
820  << " threads...\n";
822  globalAHM = std::make_unique<AHMapT>(maxBMElements, config);
823  numOpsPerThread = FLAGS_numBMElements / FLAGS_numThreads;
824  runThreads(insertThread);
825  uint64_t elapsed = nowInUsec() - start;
826  std::cout << " took " << elapsed / 1000 << " ms ("
827  << (elapsed * 1000 / FLAGS_numBMElements) << " ns/insert).\n";
828  EXPECT_EQ(globalAHM->size(), FLAGS_numBMElements);
829 }
830 
831 void loadGlobalQPAhm() {
832  std::cout << "loading global QPAHM with " << FLAGS_numThreads
833  << " threads...\n";
835  globalQPAHM = std::make_unique<QPAHMapT>(maxBMElements, qpConfig);
836  numOpsPerThread = FLAGS_numBMElements / FLAGS_numThreads;
837  runThreads(qpInsertThread);
838  uint64_t elapsed = nowInUsec() - start;
839  std::cout << " took " << elapsed / 1000 << " ms ("
840  << (elapsed * 1000 / FLAGS_numBMElements) << " ns/insert).\n";
841  EXPECT_EQ(globalQPAHM->size(), FLAGS_numBMElements);
842 }
843 
844 } // namespace
845 
846 BENCHMARK(st_aha_find, iters) {
847  CHECK_LE(iters, FLAGS_numBMElements);
848  for (size_t i = 0; i < iters; i++) {
849  KeyT key = randomizeKey(i);
850  folly::doNotOptimizeAway(globalAHA->find(key)->second);
851  }
852 }
853 
854 BENCHMARK(st_ahm_find, iters) {
855  CHECK_LE(iters, FLAGS_numBMElements);
856  for (size_t i = 0; i < iters; i++) {
857  KeyT key = randomizeKey(i);
858  folly::doNotOptimizeAway(globalAHM->find(key)->second);
859  }
860 }
861 
862 BENCHMARK(st_qpahm_find, iters) {
863  CHECK_LE(iters, FLAGS_numBMElements);
864  for (size_t i = 0; i < iters; i++) {
865  KeyT key = randomizeKey(i);
866  folly::doNotOptimizeAway(globalQPAHM->find(key)->second);
867  }
868 }
869 
871 
872 BENCHMARK(mt_ahm_miss, iters) {
873  CHECK_LE(iters, FLAGS_numBMElements);
874  numOpsPerThread = iters / FLAGS_numThreads;
875  runThreads([](void* jj) -> void* {
876  int64_t j = (int64_t)jj;
877  while (!runThreadsCreatedAllThreads.load()) {
878  ;
879  }
880  for (int i = 0; i < numOpsPerThread; ++i) {
881  KeyT key = i + j * numOpsPerThread * 100;
882  folly::doNotOptimizeAway(globalAHM->find(key) == globalAHM->end());
883  }
884  return nullptr;
885  });
886 }
887 
888 BENCHMARK(mt_qpahm_miss, iters) {
889  CHECK_LE(iters, FLAGS_numBMElements);
890  numOpsPerThread = iters / FLAGS_numThreads;
891  runThreads([](void* jj) -> void* {
892  int64_t j = (int64_t)jj;
893  while (!runThreadsCreatedAllThreads.load()) {
894  ;
895  }
896  for (int i = 0; i < numOpsPerThread; ++i) {
897  KeyT key = i + j * numOpsPerThread * 100;
898  folly::doNotOptimizeAway(globalQPAHM->find(key) == globalQPAHM->end());
899  }
900  return nullptr;
901  });
902 }
903 
904 BENCHMARK(st_ahm_miss, iters) {
905  CHECK_LE(iters, FLAGS_numBMElements);
906  for (size_t i = 0; i < iters; i++) {
907  KeyT key = randomizeKey(i + iters * 100);
908  folly::doNotOptimizeAway(globalAHM->find(key) == globalAHM->end());
909  }
910 }
911 
912 BENCHMARK(st_qpahm_miss, iters) {
913  CHECK_LE(iters, FLAGS_numBMElements);
914  for (size_t i = 0; i < iters; i++) {
915  KeyT key = randomizeKey(i + iters * 100);
916  folly::doNotOptimizeAway(globalQPAHM->find(key) == globalQPAHM->end());
917  }
918 }
919 
920 BENCHMARK(mt_ahm_find_insert_mix, iters) {
921  CHECK_LE(iters, FLAGS_numBMElements);
922  numOpsPerThread = iters / FLAGS_numThreads;
923  runThreads([](void* jj) -> void* {
924  int64_t j = (int64_t)jj;
925  while (!runThreadsCreatedAllThreads.load()) {
926  ;
927  }
928  for (int i = 0; i < numOpsPerThread; ++i) {
929  if (i % 128) { // ~1% insert mix
930  KeyT key = randomizeKey(i + j * numOpsPerThread);
931  folly::doNotOptimizeAway(globalAHM->find(key)->second);
932  } else {
933  KeyT key = randomizeKey(i + j * numOpsPerThread * 100);
934  globalAHM->insert(key, genVal(key));
935  }
936  }
937  return nullptr;
938  });
939 }
940 
941 BENCHMARK(mt_qpahm_find_insert_mix, iters) {
942  CHECK_LE(iters, FLAGS_numBMElements);
943  numOpsPerThread = iters / FLAGS_numThreads;
944  runThreads([](void* jj) -> void* {
945  int64_t j = (int64_t)jj;
946  while (!runThreadsCreatedAllThreads.load()) {
947  ;
948  }
949  for (int i = 0; i < numOpsPerThread; ++i) {
950  if (i % 128) { // ~1% insert mix
951  KeyT key = randomizeKey(i + j * numOpsPerThread);
952  folly::doNotOptimizeAway(globalQPAHM->find(key)->second);
953  } else {
954  KeyT key = randomizeKey(i + j * numOpsPerThread * 100);
955  globalQPAHM->insert(key, genVal(key));
956  }
957  }
958  return nullptr;
959  });
960 }
961 
962 BENCHMARK(mt_aha_find, iters) {
963  CHECK_LE(iters, FLAGS_numBMElements);
964  numOpsPerThread = iters / FLAGS_numThreads;
965  runThreads([](void* jj) -> void* {
966  int64_t j = (int64_t)jj;
967  while (!runThreadsCreatedAllThreads.load()) {
968  ;
969  }
970  for (int i = 0; i < numOpsPerThread; ++i) {
971  KeyT key = randomizeKey(i + j * numOpsPerThread);
972  folly::doNotOptimizeAway(globalAHA->find(key)->second);
973  }
974  return nullptr;
975  });
976 }
977 
978 BENCHMARK(mt_ahm_find, iters) {
979  CHECK_LE(iters, FLAGS_numBMElements);
980  numOpsPerThread = iters / FLAGS_numThreads;
981  runThreads([](void* jj) -> void* {
982  int64_t j = (int64_t)jj;
983  while (!runThreadsCreatedAllThreads.load()) {
984  ;
985  }
986  for (int i = 0; i < numOpsPerThread; ++i) {
987  KeyT key = randomizeKey(i + j * numOpsPerThread);
988  folly::doNotOptimizeAway(globalAHM->find(key)->second);
989  }
990  return nullptr;
991  });
992 }
993 
994 BENCHMARK(mt_qpahm_find, iters) {
995  CHECK_LE(iters, FLAGS_numBMElements);
996  numOpsPerThread = iters / FLAGS_numThreads;
997  runThreads([](void* jj) -> void* {
998  int64_t j = (int64_t)jj;
999  while (!runThreadsCreatedAllThreads.load()) {
1000  ;
1001  }
1002  for (int i = 0; i < numOpsPerThread; ++i) {
1003  KeyT key = randomizeKey(i + j * numOpsPerThread);
1004  folly::doNotOptimizeAway(globalQPAHM->find(key)->second);
1005  }
1006  return nullptr;
1007  });
1008 }
1009 
1011 BENCHMARK(st_baseline_modulus_and_random, iters) {
1012  for (size_t i = 0; i < iters; ++i) {
1013  k = randomizeKey(i) % iters;
1014  }
1015 }
1016 
1017 // insertions go last because they reset the map
1018 
1019 BENCHMARK(mt_ahm_insert, iters) {
1021  globalAHM = std::make_unique<AHMapT>(int(iters * LF), config);
1022  numOpsPerThread = iters / FLAGS_numThreads;
1023  }
1024  runThreads(insertThread);
1025 }
1026 
1027 BENCHMARK(mt_qpahm_insert, iters) {
1029  globalQPAHM = std::make_unique<QPAHMapT>(int(iters * LF), qpConfig);
1030  numOpsPerThread = iters / FLAGS_numThreads;
1031  }
1032  runThreads(qpInsertThread);
1033 }
1034 
1035 BENCHMARK(st_ahm_insert, iters) {
1037  std::unique_ptr<AHMapT> ahm(new AHMapT(int(iters * LF), config));
1038  susp.dismiss();
1039 
1040  for (size_t i = 0; i < iters; i++) {
1041  KeyT key = randomizeKey(i);
1042  ahm->insert(key, genVal(key));
1043  }
1044 }
1045 
1046 BENCHMARK(st_qpahm_insert, iters) {
1048  std::unique_ptr<QPAHMapT> ahm(new QPAHMapT(int(iters * LF), qpConfig));
1049  susp.dismiss();
1050 
1051  for (size_t i = 0; i < iters; i++) {
1052  KeyT key = randomizeKey(i);
1053  ahm->insert(key, genVal(key));
1054  }
1055 }
1056 
1058  config.maxLoadFactor = FLAGS_maxLoadFactor;
1059  qpConfig.maxLoadFactor = FLAGS_maxLoadFactor;
1060  configRace.maxLoadFactor = 0.5;
1061  int numCores = sysconf(_SC_NPROCESSORS_ONLN);
1062  loadGlobalAha();
1063  loadGlobalAhm();
1064  loadGlobalQPAhm();
1065  string numIters =
1066  folly::to<string>(std::min(1000000, int(FLAGS_numBMElements)));
1067 
1068  gflags::SetCommandLineOptionWithMode(
1069  "bm_max_iters", numIters.c_str(), gflags::SET_FLAG_IF_DEFAULT);
1070  gflags::SetCommandLineOptionWithMode(
1071  "bm_min_iters", numIters.c_str(), gflags::SET_FLAG_IF_DEFAULT);
1072  string numCoresStr = folly::to<string>(numCores);
1073  gflags::SetCommandLineOptionWithMode(
1074  "numThreads", numCoresStr.c_str(), gflags::SET_FLAG_IF_DEFAULT);
1075 
1076  std::cout << "\nRunning AHM benchmarks on machine with " << numCores
1077  << " logical cores.\n"
1078  " num elements per map: "
1079  << FLAGS_numBMElements << "\n"
1080  << " num threads for mt tests: " << FLAGS_numThreads << "\n"
1081  << " AHM load factor: " << FLAGS_targetLoadFactor << "\n\n";
1082 }
1083 
1084 int main(int argc, char** argv) {
1085  testing::InitGoogleTest(&argc, argv);
1086  gflags::ParseCommandLineFlags(&argc, &argv, true);
1087  auto ret = RUN_ALL_TESTS();
1088  if (!ret && FLAGS_benchmark) {
1089  benchmarkSetup();
1091  }
1092  return ret;
1093 }
1094 
1095 /*
1096 loading global AHA with 8 threads...
1097  took 487 ms (40 ns/insert).
1098 loading global AHM with 8 threads...
1099  took 478 ms (39 ns/insert).
1100 loading global QPAHM with 8 threads...
1101  took 478 ms (39 ns/insert).
1102 
1103 Running AHM benchmarks on machine with 24 logical cores.
1104  num elements per map: 12000000
1105  num threads for mt tests: 24
1106  AHM load factor: 0.75
1107 
1108 ============================================================================
1109 folly/test/AtomicHashMapTest.cpp relative time/iter iters/s
1110 ============================================================================
1111 st_aha_find 92.63ns 10.80M
1112 st_ahm_find 107.78ns 9.28M
1113 st_qpahm_find 90.69ns 11.03M
1114 ----------------------------------------------------------------------------
1115 mt_ahm_miss 2.09ns 477.36M
1116 mt_qpahm_miss 1.37ns 728.82M
1117 st_ahm_miss 241.07ns 4.15M
1118 st_qpahm_miss 223.17ns 4.48M
1119 mt_ahm_find_insert_mix 8.05ns 124.24M
1120 mt_qpahm_find_insert_mix 9.10ns 109.85M
1121 mt_aha_find 6.82ns 146.68M
1122 mt_ahm_find 7.95ns 125.77M
1123 mt_qpahm_find 6.81ns 146.83M
1124 st_baseline_modulus_and_random 6.02ns 166.03M
1125 mt_ahm_insert 14.29ns 69.97M
1126 mt_qpahm_insert 11.68ns 85.61M
1127 st_ahm_insert 125.39ns 7.98M
1128 st_qpahm_insert 128.76ns 7.77M
1129 ============================================================================
1130 */
int numSubMaps() const
bool empty() const
BENCHMARK_DRAW_LINE()
size_type count(key_type k) const
QPAHMapT::Config qpConfig
string toString()
#define ASSERT_EQ(val1, val2)
Definition: gtest.h:1956
size_t operator()(const char *a)
std::pair< const KeyT, ValueT > value_type
char b
void benchmarkSetup()
std::unique_ptr< AtomicHashArray, Deleter > SmartPtr
int RUN_ALL_TESTS() GTEST_MUST_USE_RESULT_
Definition: gtest.h:2232
int32_t KeyT
iterator find(LookupKeyT k)
#define EXPECT_EQ(val1, val2)
Definition: gtest.h:1922
#define BENCHMARK_SUSPEND
Definition: Benchmark.h:576
folly::QuadraticProbingAtomicHashMap< KeyT, ValueT > QPAHMapT
double val
Definition: String.cpp:273
AHMapT::value_type RecordT
AtomicHashArray< int32_t, int32_t > AHA
static std::unique_ptr< QPAHMapT > globalQPAHM
void runBenchmarks()
Definition: Benchmark.cpp:456
int32_t ValueT
AtomicHashMap< KeyT, ValueT > AHMapT
#define EXPECT_GE(val1, val2)
Definition: gtest.h:1932
const int maxBMElements
std::pair< iterator, bool > insert(const value_type &r)
uint32_t jenkins_rev_mix32(uint32_t key) noexcept
Definition: Hash.h:97
auto ch
static int genVal(int key)
std::vector< std::thread::id > threads
static std::unique_ptr< AHMapT > globalAHM
AHArrayT::Config config
#define FOR_EACH_RANGE(i, begin, end)
Definition: Foreach.h:313
size_t operator()(const StringPiece a)
char ** argv
bool operator()(const char *a, const StringPiece b)
static AHArrayT::SmartPtr globalAHA(nullptr)
LogLevel min
Definition: LogLevel.cpp:30
static int64_t nowInUsec()
void increment(int64_t obj_id)
size_type erase(key_type k)
auto end(TestAdlIterable &instance)
Definition: ForeachTest.cpp:62
static Map map(mapCap)
DEFINE_int32(numThreads, 8,"Threads to use for concurrency tests.")
static map< string, int > m
static bool legalKey(const char *a)
char a
const double LF
auto start
Integer & operator=(const Integer &a)
AtomicHashArray< KeyT, ValueT > AHArrayT
AHA::Config configRace
int * count
#define EXPECT_TRUE(condition)
Definition: gtest.h:1859
bool operator()(const char *a, const char *b)
DEFINE_int64(numBMElements, 12 *1000 *1000,"Size of maps for benchmarks.")
void * atomicHashArrayInsertRaceThread(void *)
static SmartPtr create(size_t maxSize, const Config &c=Config())
#define FOR_EACH(i, c)
Definition: Foreach.h:143
bool operator()(const char *a, const char &b)
std::atomic< int > counter
const char * string
Definition: Conv.cpp:212
AtomicHashMap< int64_t, int64_t > ahm
std::pair< iterator, bool > insert(const value_type &r)
#define EXPECT_NE(val1, val2)
Definition: gtest.h:1926
static set< string > s
size_t operator()(const char &a)
auto atomicHashArrayInsertRaceArray
TEST(Ahm, BasicStrings)
AtomicHashMap< const char *, int64_t, HashTraits, EqTraits > AHMCstrInt
GTEST_API_ void InitGoogleTest(int *argc, char **argv)
Definition: gtest.cc:5370
#define ASSERT_NE(val1, val2)
Definition: gtest.h:1960
#define EXPECT_FALSE(condition)
Definition: gtest.h:1862
BENCHMARK(st_aha_find, iters)
bool operator==(const Integer &a) const
Integer(KeyT v=0)
Range< const char * > StringPiece
size_t capacity() const
int64_t getValue(int64_t obj_id)
int main(int argc, char **argv)
AHMCstrInt::Config cstrIntCfg
char c
KeyT k
DEFINE_double(targetLoadFactor, 0.75,"Target memory utilization fraction.")
iterator findAt(uint32_t idx)
auto doNotOptimizeAway(const T &datum) -> typename std::enable_if< !detail::DoNotOptimizeAwayNeedsIndirect< T >::value >::type
Definition: Benchmark.h:258
Counters(size_t numCounters)
#define EXPECT_GT(val1, val2)
Definition: gtest.h:1934