proxygen
DistributedMutexTest.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2018-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
17 #include <folly/MapUtil.h>
18 #include <folly/Synchronized.h>
23 
24 #include <chrono>
25 #include <thread>
26 
27 using namespace std::literals;
28 
29 namespace folly {
30 namespace test {
59  public:
60  ManualSchedule() = default;
62  // delete this schedule from the global map
63  auto schedules = schedules_.wlock();
64  for_each(*schedules, [&](auto& schedule, auto, auto iter) {
65  if (schedule.second == this) {
66  schedules->erase(iter);
67  }
68  });
69  }
70 
75  static void beforeSharedAccess() {
76  if (folly::kIsDebug) {
77  auto id = std::this_thread::get_id();
78 
79  // get the schedule assigned for the current thread, if one exists,
80  // otherwise proceed as normal
81  auto schedule = get_ptr(*schedules_.wlock(), id);
82  if (!schedule) {
83  return;
84  }
85 
86  // now try and get the callbacks for this thread, if there is a callback
87  // registered for the test, it must mean that we have a callback
88  auto callback = get_ptr((*(*schedule)->callbacks_.wlock()), id);
89  if (!callback) {
90  return;
91  }
92  (*callback)();
93  }
94  }
95  static void afterSharedAccess(bool) {
96  beforeSharedAccess();
97  }
98 
104  void setCallback(std::function<void()> callback) {
105  schedules_.wlock()->insert({std::this_thread::get_id(), this});
106  callbacks_.wlock()->insert({std::this_thread::get_id(), callback});
107  }
108 
113  callbacks_.wlock()->erase(std::this_thread::get_id());
114  }
115 
119  void wait(int id) {
120  if (folly::kIsDebug) {
121  auto& baton = (*batons_.wlock())[id];
122  baton.wait();
123  }
124  }
125  void post(int id) {
126  if (folly::kIsDebug) {
127  auto& baton = (*batons_.wlock())[id];
128  baton.post();
129  }
130  }
131 
132  private:
133  // the map of threads to the schedule started for that test
136  // the map of callbacks to be executed for a thread's atomic accesses
139  // batons for testing, this map will only ever be written to, so it is safe
140  // to hold references outside lock
142 };
143 
145  ManualSchedule::schedules_;
146 
147 template <typename T>
149 template <template <typename> class Atomic>
150 using TestDistributedMutex =
152 
160  ManualSchedule::beforeSharedAccess();
161  return 1;
162 }
165  uint32_t,
166  std::chrono::system_clock::time_point const*,
167  std::chrono::steady_clock::time_point const*,
168  uint32_t) {
169  ManualSchedule::beforeSharedAccess();
170  return detail::FutexResult::AWOKEN;
171 }
172 
173 template <typename Clock, typename Duration>
174 std::cv_status atomic_wait_until(
176  std::uintptr_t,
177  const std::chrono::time_point<Clock, Duration>&) {
178  ManualSchedule::beforeSharedAccess();
179  return std::cv_status::no_timeout;
180 }
181 
183  ManualSchedule::beforeSharedAccess();
184 }
185 } // namespace test
186 
187 namespace {
188 DEFINE_int32(stress_factor, 1000, "The stress test factor for tests");
189 constexpr auto kForever = 100h;
190 
192 
193 int sum(int n) {
194  return (n * (n + 1)) / 2;
195 }
196 
197 template <template <typename> class Atom = std::atomic>
198 void basicNThreads(int numThreads, int iterations = FLAGS_stress_factor) {
200  auto&& barrier = std::atomic<int>{0};
201  auto&& threads = std::vector<std::thread>{};
202  auto&& result = std::vector<int>{};
203 
204  auto&& function = [&](auto id) {
205  return [&, id] {
206  for (auto j = 0; j < iterations; ++j) {
207  auto state = mutex.lock();
208  EXPECT_EQ(barrier.fetch_add(1, std::memory_order_relaxed), 0);
209  result.push_back(id);
210  EXPECT_EQ(barrier.fetch_sub(1, std::memory_order_relaxed), 1);
211  mutex.unlock(std::move(state));
212  }
213  };
214  };
215 
216  for (auto i = 1; i <= numThreads; ++i) {
217  threads.push_back(DSched::thread(function(i)));
218  }
219  for (auto& thread : threads) {
220  DSched::join(thread);
221  }
222 
223  auto total = 0;
224  for (auto value : result) {
225  total += value;
226  }
227  EXPECT_EQ(total, sum(numThreads) * iterations);
228 }
229 } // namespace
230 
231 TEST(DistributedMutex, InternalDetailTestOne) {
232  auto value = 0;
233  auto ptr = reinterpret_cast<std::uintptr_t>(&value);
234  EXPECT_EQ(detail::distributed_mutex::extractAddress<int>(ptr), &value);
235  ptr = ptr | 0b1;
236  EXPECT_EQ(detail::distributed_mutex::extractAddress<int>(ptr), &value);
237 }
238 
240  auto&& mutex = DistributedMutex{};
241  auto state = mutex.lock();
242  mutex.unlock(std::move(state));
243 }
244 
245 TEST(DistributedMutex, BasicTryLock) {
246  auto&& mutex = DistributedMutex{};
247 
248  while (true) {
249  auto state = mutex.try_lock();
250  if (state) {
251  mutex.unlock(std::move(state));
252  break;
253  }
254  }
255 }
256 
257 TEST(DistributedMutex, TestSingleElementContentionChain) {
258  using namespace folly::detail;
259 
260  // Acquire the mutex once, let another thread form a contention chain on the
261  // mutex, and then release it. Observe the other thread grab the lock
262  auto&& schedule = test::ManualSchedule{};
264 
265  auto&& waiter = std::thread{[&]() {
266  schedule.setCallback([&, i = 0]() mutable {
267  if (i == 2) {
268  schedule.post(1);
269  }
270  ++i;
271  });
272 
273  schedule.wait(0);
274  auto state = mutex.lock();
275  mutex.unlock(std::move(state));
276  }};
277 
278  // lock the mutex, signal the waiter, and then wait till the first thread
279  // has gotten on the wait list
280  auto state = mutex.lock();
281  schedule.post(0);
282  schedule.wait(1);
283 
284  // release the mutex, and then wait for the waiter to acquire the lock
285  mutex.unlock(std::move(state));
286  waiter.join();
287 }
288 
289 TEST(DistributedMutex, TestTwoElementContentionChain) {
290  using namespace folly::detail;
291 
292  // Acquire the mutex once, let another thread form a contention chain on the
293  // mutex, and then release it. Observe the other thread grab the lock
294  auto&& schedule = test::ManualSchedule{};
296 
297  auto&& one = std::thread{[&]() {
298  schedule.setCallback([&, i = 0]() mutable {
299  if (i == 2) {
300  schedule.post(3);
301  }
302  ++i;
303  });
304 
305  schedule.wait(0);
306  auto state = mutex.lock();
307  mutex.unlock(std::move(state));
308  }};
309 
310  auto&& two = std::thread{[&]() {
311  schedule.setCallback([&, i = 0]() mutable {
312  if (i == 2) {
313  schedule.post(2);
314  }
315  ++i;
316  });
317 
318  schedule.wait(1);
319  auto state = mutex.lock();
320  mutex.unlock(std::move(state));
321  }};
322 
323  // lock the mutex, signal the waiter, and then wait till the first thread
324  // has gotten on the wait list
325  auto state = mutex.lock();
326  schedule.post(0);
327  schedule.post(1);
328  schedule.wait(2);
329  schedule.wait(3);
330 
331  // release the mutex, and then wait for the waiter to acquire the lock
332  mutex.unlock(std::move(state));
333  one.join();
334  two.join();
335 }
336 
337 TEST(DistributedMutex, TestTwoContentionChains) {
338  using namespace folly::detail;
339 
340  auto&& schedule = test::ManualSchedule{};
342 
343  auto&& one = std::thread{[&]() {
344  schedule.setCallback([&, i = 0]() mutable {
345  if (i == 2) {
346  schedule.post(0);
347  }
348  ++i;
349  });
350 
351  schedule.wait(1);
352  auto state = mutex.lock();
353  schedule.wait(4);
354  mutex.unlock(std::move(state));
355  }};
356  auto&& two = std::thread{[&]() {
357  schedule.setCallback([&, i = 0]() mutable {
358  if (i == 2) {
359  schedule.post(2);
360  }
361  ++i;
362  });
363 
364  schedule.wait(3);
365  auto state = mutex.lock();
366  schedule.wait(5);
367  mutex.unlock(std::move(state));
368  }};
369 
370  auto state = mutex.lock();
371  schedule.post(1);
372  schedule.post(3);
373  schedule.wait(0);
374  schedule.wait(2);
375 
376  // at this point there is one contention chain. Release it
377  mutex.unlock(std::move(state));
378 
379  // then start a new contention chain
380  auto&& three = std::thread{[&]() {
381  schedule.setCallback([&, i = 0]() mutable {
382  if (i == 2) {
383  schedule.post(4);
384  schedule.post(5);
385  }
386  ++i;
387  });
388 
389  auto lockState = mutex.lock();
390  schedule.post(6);
391  mutex.unlock(std::move(lockState));
392  }};
393 
394  // wait for the third thread to pick up the lock
395  schedule.wait(6);
396  one.join();
397  two.join();
398  three.join();
399 }
400 
401 TEST(DistributedMutex, StressTwoThreads) {
402  basicNThreads(2);
403 }
404 TEST(DistributedMutex, StressThreeThreads) {
405  basicNThreads(3);
406 }
407 TEST(DistributedMutex, StressFourThreads) {
408  basicNThreads(4);
409 }
410 TEST(DistributedMutex, StressFiveThreads) {
411  basicNThreads(5);
412 }
413 TEST(DistributedMutex, StressSixThreads) {
414  basicNThreads(6);
415 }
416 TEST(DistributedMutex, StressSevenThreads) {
417  basicNThreads(7);
418 }
419 TEST(DistributedMutex, StressEightThreads) {
420  basicNThreads(8);
421 }
422 TEST(DistributedMutex, StressSixteenThreads) {
423  basicNThreads(16);
424 }
425 TEST(DistributedMutex, StressThirtyTwoThreads) {
426  basicNThreads(32);
427 }
428 TEST(DistributedMutex, StressSixtyFourThreads) {
429  basicNThreads(64);
430 }
431 TEST(DistributedMutex, StressHundredThreads) {
432  basicNThreads(100);
433 }
434 TEST(DistributedMutex, StressHardwareConcurrencyThreads) {
435  basicNThreads(std::thread::hardware_concurrency());
436 }
437 
438 TEST(DistributedMutex, StressTryLock) {
439  auto&& mutex = DistributedMutex{};
440 
441  for (auto i = 0; i < FLAGS_stress_factor; ++i) {
442  while (true) {
443  auto state = mutex.try_lock();
444  if (state) {
445  mutex.unlock(std::move(state));
446  break;
447  }
448  }
449  }
450 }
451 
452 namespace {
453 constexpr auto numIterationsDeterministicTest(int threads) {
454  if (threads <= 8) {
455  return 100;
456  }
457 
458  return 10;
459 }
460 
461 void runBasicNThreadsDeterministic(int threads, int iterations) {
462  for (auto pass = 0; pass < 3; ++pass) {
463  auto&& schedule = DSched{DSched::uniform(pass)};
464  basicNThreads<test::DeterministicAtomic>(threads, iterations);
465  static_cast<void>(schedule);
466  }
467 }
468 } // namespace
469 
470 TEST(DistributedMutex, DeterministicStressTwoThreads) {
471  runBasicNThreadsDeterministic(2, numIterationsDeterministicTest(2));
472 }
473 TEST(DistributedMutex, DeterministicStressFourThreads) {
474  runBasicNThreadsDeterministic(4, numIterationsDeterministicTest(4));
475 }
476 TEST(DistributedMutex, DeterministicStressEightThreads) {
477  runBasicNThreadsDeterministic(8, numIterationsDeterministicTest(8));
478 }
479 TEST(DistributedMutex, DeterministicStressSixteenThreads) {
480  runBasicNThreadsDeterministic(16, numIterationsDeterministicTest(16));
481 }
482 TEST(DistributedMutex, DeterministicStressThirtyTwoThreads) {
483  runBasicNThreadsDeterministic(32, numIterationsDeterministicTest(32));
484 }
485 
486 TEST(DistributedMutex, TimedLockTimeout) {
487  auto&& mutex = DistributedMutex{};
488  auto&& start = folly::Baton<>{};
489  auto&& done = folly::Baton<>{};
490 
491  auto thread = std::thread{[&]() {
492  auto state = mutex.lock();
493  start.post();
494  done.wait();
495  mutex.unlock(std::move(state));
496  }};
497 
498  start.wait();
499  auto result = mutex.try_lock_for(10ms);
500  EXPECT_FALSE(result);
501  done.post();
502  thread.join();
503 }
504 
505 TEST(DistributedMutex, TimedLockAcquireAfterUnlock) {
506  auto&& mutex = DistributedMutex{};
507  auto&& start = folly::Baton<>{};
508 
509  auto thread = std::thread{[&]() {
510  auto state = mutex.lock();
511  start.post();
512  /* sleep override */
513  std::this_thread::sleep_for(10ms);
514  mutex.unlock(std::move(state));
515  }};
516 
517  start.wait();
518  auto result = mutex.try_lock_for(kForever);
519  EXPECT_TRUE(result);
520  thread.join();
521 }
522 
523 TEST(DistributedMutex, TimedLockAcquireAfterLock) {
525  auto&& schedule = test::ManualSchedule{};
526 
527  auto thread = std::thread{[&] {
528  schedule.setCallback([&, i = 0]() mutable {
529  if (i == 1) {
530  schedule.post(0);
531  schedule.wait(1);
532  }
533 
534  // when this thread goes into the atomic_notify_one() we let the other
535  // thread wake up
536  if (i == 3) {
537  schedule.post(2);
538  }
539 
540  ++i;
541  });
542 
543  auto state = mutex.lock();
544  mutex.unlock(std::move(state));
545  }};
546 
547  schedule.setCallback([&, i = 0]() mutable {
548  // allow the other thread to unlock after the current thread has set the
549  // timed waiter state into the mutex
550  if (i == 2) {
551  schedule.post(1);
552  schedule.wait(2);
553  }
554  ++i;
555  });
556  schedule.wait(0);
557  auto state = mutex.try_lock_for(kForever);
559  mutex.unlock(std::move(state));
560  thread.join();
561 }
562 
563 TEST(DistributedMutex, TimedLockAcquireAfterContentionChain) {
565  auto&& schedule = test::ManualSchedule{};
566 
567  auto one = std::thread{[&] {
568  schedule.setCallback([&, i = 0]() mutable {
569  if (i == 1) {
570  schedule.post(0);
571  schedule.wait(1);
572  schedule.wait(2);
573  }
574  ++i;
575  });
576 
577  auto state = mutex.lock();
578  mutex.unlock(std::move(state));
579  }};
580  auto two = std::thread{[&] {
581  schedule.setCallback([&, i = 0]() mutable {
582  // block the current thread until the first thread has acquired the
583  // lock
584  if (i == 0) {
585  schedule.wait(0);
586  }
587 
588  // when the current thread enqueues, let the first thread unlock so we
589  // get woken up
590  //
591  // then wait for the first thread to unlock
592  if (i == 2) {
593  schedule.post(1);
594  }
595 
596  ++i;
597  });
598 
599  auto state = mutex.lock();
600  mutex.unlock(std::move(state));
601  }};
602 
603  // make the current thread wait for the first thread to unlock
604  schedule.setCallback([&, i = 0]() mutable {
605  // let the first thread unlock after we have enqueued ourselves on the
606  // mutex
607  if (i == 2) {
608  schedule.post(2);
609  }
610  ++i;
611  });
612  auto state = mutex.try_lock_for(kForever);
614  mutex.unlock(std::move(state));
615 
616  one.join();
617  two.join();
618 }
619 
620 namespace {
621 template <template <typename> class Atom = std::atomic>
622 void stressTryLockWithConcurrentLocks(
623  int numThreads,
624  int iterations = FLAGS_stress_factor) {
625  auto&& threads = std::vector<std::thread>{};
627  auto&& atomic = std::atomic<std::uint64_t>{0};
628 
629  for (auto i = 0; i < numThreads; ++i) {
630  threads.push_back(DSched::thread([&] {
631  for (auto j = 0; j < iterations; ++j) {
632  auto state = mutex.lock();
633  EXPECT_EQ(atomic.fetch_add(1, std::memory_order_relaxed), 0);
634  EXPECT_EQ(atomic.fetch_sub(1, std::memory_order_relaxed), 1);
635  mutex.unlock(std::move(state));
636  }
637  }));
638  }
639 
640  for (auto i = 0; i < iterations; ++i) {
641  if (auto state = mutex.try_lock()) {
642  EXPECT_EQ(atomic.fetch_add(1, std::memory_order_relaxed), 0);
643  EXPECT_EQ(atomic.fetch_sub(1, std::memory_order_relaxed), 1);
644  mutex.unlock(std::move(state));
645  }
646  }
647 
648  for (auto& thread : threads) {
649  DSched::join(thread);
650  }
651 }
652 } // namespace
653 
654 TEST(DistributedMutex, StressTryLockWithConcurrentLocksTwoThreads) {
655  stressTryLockWithConcurrentLocks(2);
656 }
657 TEST(DistributedMutex, StressTryLockWithConcurrentLocksFourThreads) {
658  stressTryLockWithConcurrentLocks(4);
659 }
660 TEST(DistributedMutex, StressTryLockWithConcurrentLocksEightThreads) {
661  stressTryLockWithConcurrentLocks(8);
662 }
663 TEST(DistributedMutex, StressTryLockWithConcurrentLocksSixteenThreads) {
664  stressTryLockWithConcurrentLocks(16);
665 }
666 TEST(DistributedMutex, StressTryLockWithConcurrentLocksThirtyTwoThreads) {
667  stressTryLockWithConcurrentLocks(32);
668 }
669 TEST(DistributedMutex, StressTryLockWithConcurrentLocksSixtyFourThreads) {
670  stressTryLockWithConcurrentLocks(64);
671 }
672 
673 TEST(DistributedMutex, DeterministicTryLockWithLocksTwoThreads) {
674  auto iterations = numIterationsDeterministicTest(2);
675  stressTryLockWithConcurrentLocks(2, iterations);
676 
677  for (auto pass = 0; pass < 3; ++pass) {
678  auto&& schedule = DSched{DSched::uniform(pass)};
679  stressTryLockWithConcurrentLocks<test::DeterministicAtomic>(2, iterations);
680  static_cast<void>(schedule);
681  }
682 }
683 TEST(DistributedMutex, DeterministicTryLockWithFourThreads) {
684  auto iterations = numIterationsDeterministicTest(4);
685  stressTryLockWithConcurrentLocks(4, iterations);
686 
687  for (auto pass = 0; pass < 3; ++pass) {
688  auto&& schedule = DSched{DSched::uniform(pass)};
689  stressTryLockWithConcurrentLocks<test::DeterministicAtomic>(4, iterations);
690  static_cast<void>(schedule);
691  }
692 }
693 TEST(DistributedMutex, DeterministicTryLockWithLocksEightThreads) {
694  auto iterations = numIterationsDeterministicTest(8);
695  stressTryLockWithConcurrentLocks(8, iterations);
696 
697  for (auto pass = 0; pass < 3; ++pass) {
698  auto&& schedule = DSched{DSched::uniform(pass)};
699  stressTryLockWithConcurrentLocks<test::DeterministicAtomic>(8, iterations);
700  static_cast<void>(schedule);
701  }
702 }
703 TEST(DistributedMutex, DeterministicTryLockWithLocksSixteenThreads) {
704  auto iterations = numIterationsDeterministicTest(16);
705  stressTryLockWithConcurrentLocks(16, iterations);
706 
707  for (auto pass = 0; pass < 3; ++pass) {
708  auto&& schedule = DSched{DSched::uniform(pass)};
709  stressTryLockWithConcurrentLocks<test::DeterministicAtomic>(16, iterations);
710  static_cast<void>(schedule);
711  }
712 }
713 TEST(DistributedMutex, DeterministicTryLockWithLocksThirtyTwoThreads) {
714  auto iterations = numIterationsDeterministicTest(32);
715  stressTryLockWithConcurrentLocks(32, iterations);
716 
717  for (auto pass = 0; pass < 3; ++pass) {
718  auto&& schedule = DSched{DSched::uniform(pass)};
719  stressTryLockWithConcurrentLocks<test::DeterministicAtomic>(32, iterations);
720  static_cast<void>(schedule);
721  }
722 }
723 TEST(DistributedMutex, DeterministicTryLockWithLocksSixtyFourThreads) {
724  stressTryLockWithConcurrentLocks(64, 5);
725 
726  for (auto pass = 0; pass < 3; ++pass) {
727  auto&& schedule = DSched{DSched::uniform(pass)};
728  stressTryLockWithConcurrentLocks<test::DeterministicAtomic>(64, 5);
729  static_cast<void>(schedule);
730  }
731 }
732 
733 namespace {
734 template <template <typename> class Atom = std::atomic>
735 void concurrentTryLocks(int numThreads, int iterations = FLAGS_stress_factor) {
736  auto&& threads = std::vector<std::thread>{};
738  auto&& atomic = std::atomic<std::uint64_t>{0};
739 
740  for (auto i = 0; i < numThreads; ++i) {
741  threads.push_back(DSched::thread([&] {
742  for (auto j = 0; j < iterations; ++j) {
743  if (auto state = mutex.try_lock()) {
744  EXPECT_EQ(atomic.fetch_add(1, std::memory_order_relaxed), 0);
745  EXPECT_EQ(atomic.fetch_sub(1, std::memory_order_relaxed), 1);
746  mutex.unlock(std::move(state));
747  }
748  }
749  }));
750  }
751 
752  for (auto& thread : threads) {
753  DSched::join(thread);
754  }
755 }
756 } // namespace
757 
758 TEST(DistributedMutex, StressTryLockWithTwoThreads) {
759  concurrentTryLocks(2);
760 }
761 TEST(DistributedMutex, StressTryLockFourThreads) {
762  concurrentTryLocks(4);
763 }
764 TEST(DistributedMutex, StressTryLockEightThreads) {
765  concurrentTryLocks(8);
766 }
767 TEST(DistributedMutex, StressTryLockSixteenThreads) {
768  concurrentTryLocks(16);
769 }
770 TEST(DistributedMutex, StressTryLockThirtyTwoThreads) {
771  concurrentTryLocks(32);
772 }
773 TEST(DistributedMutex, StressTryLockSixtyFourThreads) {
774  concurrentTryLocks(64);
775 }
776 
777 TEST(DistributedMutex, DeterministicTryLockTwoThreads) {
778  auto iterations = numIterationsDeterministicTest(2);
779  concurrentTryLocks(2, iterations);
780 
781  for (auto pass = 0; pass < 3; ++pass) {
782  auto&& schedule = DSched{DSched::uniform(pass)};
783  concurrentTryLocks<test::DeterministicAtomic>(2, iterations);
784  static_cast<void>(schedule);
785  }
786 }
787 TEST(DistributedMutex, DeterministicTryLockFourThreads) {
788  auto iterations = numIterationsDeterministicTest(4);
789  concurrentTryLocks(4, iterations);
790 
791  for (auto pass = 0; pass < 3; ++pass) {
792  auto&& schedule = DSched{DSched::uniform(pass)};
793  concurrentTryLocks<test::DeterministicAtomic>(4, iterations);
794  static_cast<void>(schedule);
795  }
796 }
797 TEST(DistributedMutex, DeterministicTryLockEightThreads) {
798  auto iterations = numIterationsDeterministicTest(8);
799  concurrentTryLocks(8, iterations);
800 
801  for (auto pass = 0; pass < 3; ++pass) {
802  auto&& schedule = DSched{DSched::uniform(pass)};
803  concurrentTryLocks<test::DeterministicAtomic>(8, iterations);
804  static_cast<void>(schedule);
805  }
806 }
807 TEST(DistributedMutex, DeterministicTryLockSixteenThreads) {
808  auto iterations = numIterationsDeterministicTest(16);
809  concurrentTryLocks(16, iterations);
810 
811  for (auto pass = 0; pass < 3; ++pass) {
812  auto&& schedule = DSched{DSched::uniform(pass)};
813  concurrentTryLocks<test::DeterministicAtomic>(16, iterations);
814  static_cast<void>(schedule);
815  }
816 }
817 TEST(DistributedMutex, DeterministicTryLockThirtyTwoThreads) {
818  auto iterations = numIterationsDeterministicTest(32);
819  concurrentTryLocks(32, iterations);
820 
821  for (auto pass = 0; pass < 3; ++pass) {
822  auto&& schedule = DSched{DSched::uniform(pass)};
823  concurrentTryLocks<test::DeterministicAtomic>(32, iterations);
824  static_cast<void>(schedule);
825  }
826 }
827 TEST(DistributedMutex, DeterministicTryLockSixtyFourThreads) {
828  concurrentTryLocks(64, 5);
829 
830  for (auto pass = 0; pass < 3; ++pass) {
831  auto&& schedule = DSched{DSched::uniform(pass)};
832  concurrentTryLocks<test::DeterministicAtomic>(64, 5);
833  static_cast<void>(schedule);
834  }
835 }
836 
837 } // namespace folly
const Map::mapped_type * get_ptr(const Map &map, const Key &key)
Definition: MapUtil.h:169
void * ptr
static Synchronized< std::unordered_map< std::thread::id, ManualSchedule * > > schedules_
*than *hazptr_holder h
Definition: Hazptr.h:116
std::atomic< int64_t > sum(0)
constexpr auto kIsDebug
Definition: Portability.h:264
#define EXPECT_EQ(val1, val2)
Definition: gtest.h:1922
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
Atom< std::uint32_t > Futex
Definition: Futex.h:51
detail::FutexResult futexWaitImpl(const detail::Futex< ManualAtomic > *, uint32_t, std::chrono::system_clock::time_point const *, std::chrono::steady_clock::time_point const *, uint32_t)
static std::thread thread(Func &&func, Args &&...args)
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
std::cv_status atomic_wait_until(const ManualAtomic< std::uintptr_t > *, std::uintptr_t, const std::chrono::time_point< Clock, Duration > &)
std::vector< std::thread::id > threads
static std::function< size_t(size_t)> uniform(uint64_t seed)
void setCallback(std::function< void()> callback)
#define Atom
Synchronized< std::unordered_map< int, folly::Baton<> > > batons_
int futexWakeImpl(const detail::Futex< ManualAtomic > *, int, uint32_t)
static const char *const value
Definition: Conv.cpp:50
auto start
#define EXPECT_TRUE(condition)
Definition: gtest.h:1859
TEST(DistributedMutex, DeterministicTryLockSixtyFourThreads)
#define DEFINE_int32(_name, _default, _description)
Definition: GFlags.h:54
std::mutex mutex
void for_each(T const &range, Function< void(typename T::value_type const &) const > const &func)
#define EXPECT_FALSE(condition)
Definition: gtest.h:1862
static void join(std::thread &child)
Synchronized< std::unordered_map< std::thread::id, std::function< void()> > > callbacks_
void atomic_notify_one(const ManualAtomic< std::uintptr_t > *)
state
Definition: http_parser.c:272