23 #include <glog/logging.h> 37 DEFINE_double(targetLoadFactor, 0.75,
"Target memory utilization fraction.");
39 DEFINE_int32(numThreads, 8,
"Threads to use for concurrency tests.");
40 DEFINE_int64(numBMElements, 12 * 1000 * 1000,
"Size of maps for benchmarks.");
42 const double LF = FLAGS_maxLoadFactor / FLAGS_targetLoadFactor;
47 gettimeofday(&tv,
nullptr);
48 return int64_t(tv.tv_sec) * 1000 * 1000 + tv.tv_usec;
56 for (
int i = 0;
i < 100; ++
i) {
57 myMap.insert(make_pair(
i, folly::to<string>(
i)));
59 for (
int i = 0;
i < 100; ++
i) {
60 EXPECT_EQ(myMap.find(
i)->second, folly::to<string>(
i));
63 myMap.insert(std::make_pair(999,
"A"));
64 myMap.insert(std::make_pair(999,
"B"));
66 myMap.find(999)->second =
"B";
67 myMap.find(999)->second =
"C";
72 TEST(Ahm, BasicNoncopyable) {
77 for (
int i = 0;
i < 50; ++
i) {
78 myMap.insert(make_pair(
i, std::make_unique<int>(
i)));
80 for (
int i = 50;
i < 100; ++
i) {
81 myMap.insert(
i, std::make_unique<int>(
i));
83 for (
int i = 100;
i < 150; ++
i) {
84 myMap.emplace(
i,
new int(
i));
86 for (
int i = 150;
i < 200; ++
i) {
87 myMap.emplace(
i,
new int(
i), std::default_delete<int>());
89 for (
int i = 0;
i < 200; ++
i) {
92 for (
int i = 0;
i < 200;
i += 4) {
95 for (
int i = 0;
i < 200;
i += 4) {
122 return legalKey(a) && (strcmp(a, b) == 0);
125 return legalKey(a) && (a[0] !=
'\0') && (a[0] == b);
128 return legalKey(a) && (strlen(a) == b.size()) &&
129 (strcmp(a, b.begin()) == 0);
137 result +=
static_cast<size_t>(*(a++));
142 return static_cast<size_t>(
a);
146 for (
const auto&
ch : a) {
147 result +=
static_cast<size_t>(
ch);
164 myMap.
insert(std::make_pair(
"f", 42));
168 auto it = myMap.
find<
char>(
'f');
173 auto it = myMap.
find<
char>(
'g');
179 auto it = myMap.
find(piece);
185 VLOG(1) <<
"Overhead: " <<
sizeof(
AHArrayT) <<
" (array) " 188 float sizeFactor = 0.46f;
190 std::unique_ptr<AHMapT>
m(
new AHMapT(
int(numEntries * sizeFactor), config));
196 success &= ret.second;
197 success &= (m->
findAt(ret.first.getIndex())->second ==
genVal(
i));
204 success &= (ret.second ==
false);
205 success &= (ret.first->second ==
genVal(
i));
206 success &= (m->
findAt(ret.first.getIndex())->second ==
genVal(
i));
214 for (
size_t i = 0;
i < numEntries;
i++) {
225 success &= (retIt->second ==
genVal(
i));
228 success &= (retIt->first ==
i);
233 m->
find(8)->second = 5309;
247 int numEntries = 10000;
248 float sizeFactor = .46f;
249 std::unique_ptr<AHMapT>
m(
new AHMapT(
int(numEntries * sizeFactor), config));
252 for (
int i = 0;
i < numEntries;
i++) {
259 success &= (it->second ==
genVal(it->first));
274 explicit Counters(
size_t numCounters) : ahm(numCounters) {}
277 auto ret = ahm.
insert(std::make_pair(obj_id, 1));
280 __sync_fetch_and_add(&ret.first->second, 1);
285 auto ret = ahm.
find(obj_id);
286 return ret != ahm.
end() ? ret->second : 0;
292 ret.reserve(ahm.
size() * 32);
293 for (
const auto& e : ahm) {
294 ret += folly::to<string>(
" [", e.first,
":", e.second,
"]\n");
304 const int numKeys = 10;
307 vector<int64_t> keys;
310 for (
auto key : keys) {
312 threads.push_back(std::thread([&, key] { c.
increment(key); }));
315 for (
auto&
t : threads) {
319 for (
auto key : keys) {
320 int val = key * mult;
323 string::npos, str.find(folly::to<string>(
"[", key,
":", val,
"]")));
332 static bool throwException_ =
false;
333 throwException_ = !throwException_;
334 if (throwException_) {
349 TEST(Ahm, map_exception_safety) {
352 int numEntries = 10000;
353 float sizeFactor = 0.46f;
354 std::unique_ptr<MyMapT>
m(
new MyMapT(
int(numEntries * sizeFactor)));
358 for (
int i = 0;
i < numEntries;
i++) {
364 success &= !m->count(
i);
372 size_t numEntries = 3000;
374 std::unique_ptr<AHMapT>
s(
new AHMapT(numEntries, config));
377 for (
int iterations = 0; iterations < 4; ++iterations) {
380 for (
size_t i = 0;
i < numEntries; ++
i) {
381 success &= !(s->
count(
i));
384 success &= ret.second;
392 for (
size_t i = 0;
i < numEntries; ++
i) {
394 success &= (s->
size() == numEntries - 1 -
i);
395 success &= !(s->
count(
i));
396 success &= !(s->
erase(
i));
400 VLOG(1) <<
"Final number of subMaps = " << s->
numSubMaps();
405 inline KeyT randomizeKey(
int key) {
414 int numOpsPerThread = 0;
416 void* insertThread(
void* jj) {
418 for (
int i = 0;
i < numOpsPerThread; ++
i) {
419 KeyT key = randomizeKey(
i + j * numOpsPerThread);
425 void* qpInsertThread(
void* jj) {
427 for (
int i = 0;
i < numOpsPerThread; ++
i) {
428 KeyT key = randomizeKey(
i + j * numOpsPerThread);
434 void* insertThreadArr(
void* jj) {
436 for (
int i = 0;
i < numOpsPerThread; ++
i) {
437 KeyT key = randomizeKey(
i + j * numOpsPerThread);
443 std::atomic<bool> runThreadsCreatedAllThreads;
444 void runThreads(
void* (*mainFunc)(
void*),
int numThreads,
void** statuses) {
446 runThreadsCreatedAllThreads.store(
false);
448 for (
int64_t j = 0; j < numThreads; j++) {
449 threads.emplace_back([statuses, mainFunc, j]() {
450 auto ret = mainFunc((
void*)j);
451 if (statuses !=
nullptr) {
458 runThreadsCreatedAllThreads.store(
true);
459 for (
size_t i = 0;
i < threads.size(); ++
i) {
464 void runThreads(
void* (*mainFunc)(
void*)) {
465 runThreads(mainFunc, FLAGS_numThreads,
nullptr);
471 const int numInserts = 1000000 / 4;
474 numOpsPerThread = numInserts;
476 float sizeFactor = 0.46f;
477 int entrySize =
sizeof(
KeyT) +
sizeof(
ValueT);
478 VLOG(1) <<
"Testing " << numInserts <<
" unique " << entrySize
479 <<
" Byte entries replicated in " << FLAGS_numThreads
480 <<
" threads with " << FLAGS_maxLoadFactor * 100.0
481 <<
"% max load factor.";
483 globalAHM = std::make_unique<AHMapT>(int(numInserts * sizeFactor),
config);
485 size_t sizeInit = globalAHM->
capacity();
486 VLOG(1) <<
" Initial capacity: " << sizeInit;
489 runThreads([](
void*) ->
void* {
490 for (
int i = 0;
i < numOpsPerThread; ++
i) {
491 KeyT key = randomizeKey(
i);
498 size_t finalCap = globalAHM->
capacity();
499 size_t sizeAHM = globalAHM->
size();
500 VLOG(1) << elapsed / sizeAHM <<
" usec per " << FLAGS_numThreads
501 <<
" duplicate inserts (atomic).";
502 VLOG(1) <<
" Final capacity: " << finalCap <<
" in " 504 << sizeAHM * 100 / finalCap <<
"% load factor, " 505 << (finalCap - sizeInit) * 100 / sizeInit <<
"% growth).";
510 for (
int i = 0;
i < numInserts; ++
i) {
511 KeyT key = randomizeKey(
i);
512 success &= (globalAHM->
find(key)->second ==
genVal(key));
518 runThreads([](
void*) ->
void* {
520 for (
int i = 0;
i < numOpsPerThread; ++
i) {
521 globalAHM->
find(key);
528 VLOG(1) << elapsed / sizeAHM <<
" usec per " << FLAGS_numThreads
529 <<
" duplicate finds (atomic).";
534 const int kInsertPerThread = 100000;
535 int raceFinalSizeEstimate;
537 void* raceIterateThread(
void*) {
542 for (; it !=
end; ++it) {
544 if (count > raceFinalSizeEstimate) {
552 void* raceInsertRandomThread(
void*) {
553 for (
int i = 0;
i < kInsertPerThread; ++
i) {
564 TEST(Ahm, race_insert_iterate_thread_test) {
565 const int kInsertThreads = 20;
566 const int kIterateThreads = 20;
567 raceFinalSizeEstimate = kInsertThreads * kInsertPerThread;
569 VLOG(1) <<
"Testing iteration and insertion with " << kInsertThreads
570 <<
" threads inserting and " << kIterateThreads
571 <<
" threads iterating.";
573 globalAHM = std::make_unique<AHMapT>(raceFinalSizeEstimate / 9,
config);
575 vector<pthread_t> threadIds;
576 for (
int j = 0; j < kInsertThreads + kIterateThreads; j++) {
578 void* (*thread)(
void*) =
579 (j < kInsertThreads ? raceInsertRandomThread : raceIterateThread);
580 if (pthread_create(&tid,
nullptr, thread,
nullptr) != 0) {
581 LOG(ERROR) <<
"Could not start thread";
583 threadIds.push_back(tid);
586 for (
size_t i = 0;
i < threadIds.size(); ++
i) {
587 pthread_join(threadIds[
i],
nullptr);
589 VLOG(1) <<
"Ended up with " << globalAHM->
numSubMaps() <<
" submaps";
590 VLOG(1) <<
"Final size of map " << globalAHM->
size();
595 const int kTestEraseInsertions = 200000;
596 std::atomic<int32_t> insertedLevel;
598 void* testEraseInsertThread(
void*) {
599 for (
int i = 0;
i < kTestEraseInsertions; ++
i) {
600 KeyT key = randomizeKey(
i);
602 insertedLevel.store(
i, std::memory_order_release);
604 insertedLevel.store(kTestEraseInsertions, std::memory_order_release);
608 void* testEraseEraseThread(
void*) {
609 for (
int i = 0;
i < kTestEraseInsertions; ++
i) {
624 currentLevel = insertedLevel.load(std::memory_order_acquire);
625 if (currentLevel == kTestEraseInsertions) {
626 currentLevel += lag + 1;
628 }
while (currentLevel - lag <
i);
630 KeyT key = randomizeKey(
i);
631 while (globalAHM->
count(key)) {
632 if (globalAHM->
erase(key)) {
644 TEST(Ahm, thread_erase_insert_race) {
645 const int kInsertThreads = 1;
646 const int kEraseThreads = 10;
648 VLOG(1) <<
"Testing insertion and erase with " << kInsertThreads
649 <<
" thread inserting and " << kEraseThreads <<
" threads erasing.";
651 globalAHM = std::make_unique<AHMapT>(kTestEraseInsertions / 4,
config);
653 vector<pthread_t> threadIds;
654 for (
int64_t j = 0; j < kInsertThreads + kEraseThreads; j++) {
656 void* (*thread)(
void*) =
657 (j < kInsertThreads ? testEraseInsertThread : testEraseEraseThread);
658 if (pthread_create(&tid,
nullptr, thread, (
void*)j) != 0) {
659 LOG(ERROR) <<
"Could not start thread";
661 threadIds.push_back(tid);
664 for (
size_t i = 0;
i < threadIds.size();
i++) {
665 pthread_join(threadIds[
i],
nullptr);
671 VLOG(1) <<
"Ended up with " << globalAHM->
numSubMaps() <<
" submaps";
680 uintptr_t numInserted = 0;
681 while (!runThreadsCreatedAllThreads.load()) {
684 for (
int i = 0;
i < 2;
i++) {
689 return (
void*)numInserted;
691 TEST(Ahm, atomic_hash_array_insert_race) {
693 int numIterations = 5000;
694 constexpr
int numThreads = 4;
695 void* statuses[numThreads];
696 for (
int i = 0;
i < numIterations;
i++) {
700 for (
int j = 0; j < numThreads; j++) {
710 std::atomic<uint64_t> key{1};
715 std::thread write_thread([&]() {
726 std::thread read_thread([&]() {
733 auto it = map.
find(k);
734 if (it != map.
end()) {
745 TEST(Ahm, erase_after_insert_race) {
747 const size_t num_threads = 100;
748 const size_t num_iters = 500;
751 std::atomic<bool> go{
false};
752 std::vector<std::thread> ts;
753 for (
size_t i = 0;
i < num_threads; ++
i) {
754 ts.emplace_back([&]() {
758 for (
size_t n = 0; n < num_iters; ++n) {
773 TEST(Ahm, iterator_skips_empty_submaps) {
775 conf.growthFactor = 1;
785 auto it = map.
find(1);
803 void loadGlobalAha() {
804 std::cout <<
"loading global AHA with " << FLAGS_numThreads
808 numOpsPerThread = FLAGS_numBMElements / FLAGS_numThreads;
809 CHECK_EQ(0, FLAGS_numBMElements % FLAGS_numThreads)
810 <<
"kNumThreads must evenly divide kNumInserts.";
811 runThreads(insertThreadArr);
813 std::cout <<
" took " << elapsed / 1000 <<
" ms (" 814 << (elapsed * 1000 / FLAGS_numBMElements) <<
" ns/insert).\n";
818 void loadGlobalAhm() {
819 std::cout <<
"loading global AHM with " << FLAGS_numThreads
823 numOpsPerThread = FLAGS_numBMElements / FLAGS_numThreads;
824 runThreads(insertThread);
826 std::cout <<
" took " << elapsed / 1000 <<
" ms (" 827 << (elapsed * 1000 / FLAGS_numBMElements) <<
" ns/insert).\n";
831 void loadGlobalQPAhm() {
832 std::cout <<
"loading global QPAHM with " << FLAGS_numThreads
836 numOpsPerThread = FLAGS_numBMElements / FLAGS_numThreads;
837 runThreads(qpInsertThread);
839 std::cout <<
" took " << elapsed / 1000 <<
" ms (" 840 << (elapsed * 1000 / FLAGS_numBMElements) <<
" ns/insert).\n";
847 CHECK_LE(iters, FLAGS_numBMElements);
848 for (
size_t i = 0;
i < iters;
i++) {
849 KeyT key = randomizeKey(
i);
855 CHECK_LE(iters, FLAGS_numBMElements);
856 for (
size_t i = 0;
i < iters;
i++) {
857 KeyT key = randomizeKey(
i);
863 CHECK_LE(iters, FLAGS_numBMElements);
864 for (
size_t i = 0;
i < iters;
i++) {
865 KeyT key = randomizeKey(
i);
873 CHECK_LE(iters, FLAGS_numBMElements);
874 numOpsPerThread = iters / FLAGS_numThreads;
875 runThreads([](
void* jj) ->
void* {
877 while (!runThreadsCreatedAllThreads.load()) {
880 for (
int i = 0;
i < numOpsPerThread; ++
i) {
881 KeyT key =
i + j * numOpsPerThread * 100;
889 CHECK_LE(iters, FLAGS_numBMElements);
890 numOpsPerThread = iters / FLAGS_numThreads;
891 runThreads([](
void* jj) ->
void* {
893 while (!runThreadsCreatedAllThreads.load()) {
896 for (
int i = 0;
i < numOpsPerThread; ++
i) {
897 KeyT key =
i + j * numOpsPerThread * 100;
905 CHECK_LE(iters, FLAGS_numBMElements);
906 for (
size_t i = 0;
i < iters;
i++) {
907 KeyT key = randomizeKey(
i + iters * 100);
913 CHECK_LE(iters, FLAGS_numBMElements);
914 for (
size_t i = 0;
i < iters;
i++) {
915 KeyT key = randomizeKey(
i + iters * 100);
921 CHECK_LE(iters, FLAGS_numBMElements);
922 numOpsPerThread = iters / FLAGS_numThreads;
923 runThreads([](
void* jj) ->
void* {
925 while (!runThreadsCreatedAllThreads.load()) {
928 for (
int i = 0;
i < numOpsPerThread; ++
i) {
930 KeyT key = randomizeKey(
i + j * numOpsPerThread);
933 KeyT key = randomizeKey(
i + j * numOpsPerThread * 100);
942 CHECK_LE(iters, FLAGS_numBMElements);
943 numOpsPerThread = iters / FLAGS_numThreads;
944 runThreads([](
void* jj) ->
void* {
946 while (!runThreadsCreatedAllThreads.load()) {
949 for (
int i = 0;
i < numOpsPerThread; ++
i) {
951 KeyT key = randomizeKey(
i + j * numOpsPerThread);
954 KeyT key = randomizeKey(
i + j * numOpsPerThread * 100);
963 CHECK_LE(iters, FLAGS_numBMElements);
964 numOpsPerThread = iters / FLAGS_numThreads;
965 runThreads([](
void* jj) ->
void* {
967 while (!runThreadsCreatedAllThreads.load()) {
970 for (
int i = 0;
i < numOpsPerThread; ++
i) {
971 KeyT key = randomizeKey(
i + j * numOpsPerThread);
979 CHECK_LE(iters, FLAGS_numBMElements);
980 numOpsPerThread = iters / FLAGS_numThreads;
981 runThreads([](
void* jj) ->
void* {
983 while (!runThreadsCreatedAllThreads.load()) {
986 for (
int i = 0;
i < numOpsPerThread; ++
i) {
987 KeyT key = randomizeKey(
i + j * numOpsPerThread);
995 CHECK_LE(iters, FLAGS_numBMElements);
996 numOpsPerThread = iters / FLAGS_numThreads;
997 runThreads([](
void* jj) ->
void* {
999 while (!runThreadsCreatedAllThreads.load()) {
1002 for (
int i = 0;
i < numOpsPerThread; ++
i) {
1003 KeyT key = randomizeKey(
i + j * numOpsPerThread);
1012 for (
size_t i = 0;
i < iters; ++
i) {
1013 k = randomizeKey(
i) % iters;
1021 globalAHM = std::make_unique<AHMapT>(int(iters *
LF),
config);
1022 numOpsPerThread = iters / FLAGS_numThreads;
1024 runThreads(insertThread);
1029 globalQPAHM = std::make_unique<QPAHMapT>(int(iters *
LF),
qpConfig);
1030 numOpsPerThread = iters / FLAGS_numThreads;
1032 runThreads(qpInsertThread);
1037 std::unique_ptr<AHMapT> ahm(
new AHMapT(
int(iters *
LF), config));
1040 for (
size_t i = 0;
i < iters;
i++) {
1041 KeyT key = randomizeKey(
i);
1048 std::unique_ptr<QPAHMapT> ahm(
new QPAHMapT(
int(iters *
LF), qpConfig));
1051 for (
size_t i = 0;
i < iters;
i++) {
1052 KeyT key = randomizeKey(
i);
1061 int numCores = sysconf(_SC_NPROCESSORS_ONLN);
1066 folly::to<string>(
std::min(1000000,
int(FLAGS_numBMElements)));
1068 gflags::SetCommandLineOptionWithMode(
1069 "bm_max_iters", numIters.c_str(), gflags::SET_FLAG_IF_DEFAULT);
1070 gflags::SetCommandLineOptionWithMode(
1071 "bm_min_iters", numIters.c_str(), gflags::SET_FLAG_IF_DEFAULT);
1072 string numCoresStr = folly::to<string>(numCores);
1073 gflags::SetCommandLineOptionWithMode(
1074 "numThreads", numCoresStr.c_str(), gflags::SET_FLAG_IF_DEFAULT);
1076 std::cout <<
"\nRunning AHM benchmarks on machine with " << numCores
1077 <<
" logical cores.\n" 1078 " num elements per map: " 1079 << FLAGS_numBMElements <<
"\n" 1080 <<
" num threads for mt tests: " << FLAGS_numThreads <<
"\n" 1081 <<
" AHM load factor: " << FLAGS_targetLoadFactor <<
"\n\n";
1086 gflags::ParseCommandLineFlags(&argc, &argv,
true);
1088 if (!ret && FLAGS_benchmark) {
uint32_t getIndex() const
size_type count(key_type k) const
QPAHMapT::Config qpConfig
#define ASSERT_EQ(val1, val2)
size_t operator()(const char *a)
std::pair< const KeyT, ValueT > value_type
std::unique_ptr< AtomicHashArray, Deleter > SmartPtr
int RUN_ALL_TESTS() GTEST_MUST_USE_RESULT_
iterator find(LookupKeyT k)
#define EXPECT_EQ(val1, val2)
#define BENCHMARK_SUSPEND
folly::QuadraticProbingAtomicHashMap< KeyT, ValueT > QPAHMapT
AHMapT::value_type RecordT
AtomicHashArray< int32_t, int32_t > AHA
static std::unique_ptr< QPAHMapT > globalQPAHM
AtomicHashMap< KeyT, ValueT > AHMapT
#define EXPECT_GE(val1, val2)
std::pair< iterator, bool > insert(const value_type &r)
uint32_t jenkins_rev_mix32(uint32_t key) noexcept
static int genVal(int key)
std::vector< std::thread::id > threads
static std::unique_ptr< AHMapT > globalAHM
#define FOR_EACH_RANGE(i, begin, end)
size_t operator()(const StringPiece a)
bool operator()(const char *a, const StringPiece b)
static AHArrayT::SmartPtr globalAHA(nullptr)
static int64_t nowInUsec()
void increment(int64_t obj_id)
size_type erase(key_type k)
auto end(TestAdlIterable &instance)
DEFINE_int32(numThreads, 8,"Threads to use for concurrency tests.")
static map< string, int > m
static bool legalKey(const char *a)
Integer & operator=(const Integer &a)
AtomicHashArray< KeyT, ValueT > AHArrayT
#define EXPECT_TRUE(condition)
bool operator()(const char *a, const char *b)
DEFINE_int64(numBMElements, 12 *1000 *1000,"Size of maps for benchmarks.")
void * atomicHashArrayInsertRaceThread(void *)
static SmartPtr create(size_t maxSize, const Config &c=Config())
bool operator()(const char *a, const char &b)
std::atomic< int > counter
AtomicHashMap< int64_t, int64_t > ahm
std::pair< iterator, bool > insert(const value_type &r)
#define EXPECT_NE(val1, val2)
size_t operator()(const char &a)
auto atomicHashArrayInsertRaceArray
AtomicHashMap< const char *, int64_t, HashTraits, EqTraits > AHMCstrInt
GTEST_API_ void InitGoogleTest(int *argc, char **argv)
#define ASSERT_NE(val1, val2)
#define EXPECT_FALSE(condition)
BENCHMARK(st_aha_find, iters)
bool operator==(const Integer &a) const
Range< const char * > StringPiece
int64_t getValue(int64_t obj_id)
int main(int argc, char **argv)
AHMCstrInt::Config cstrIntCfg
DEFINE_double(targetLoadFactor, 0.75,"Target memory utilization fraction.")
iterator findAt(uint32_t idx)
auto doNotOptimizeAway(const T &datum) -> typename std::enable_if< !detail::DoNotOptimizeAwayNeedsIndirect< T >::value >::type
Counters(size_t numCounters)
#define EXPECT_GT(val1, val2)