63 auto schedules = schedules_.wlock();
64 for_each(*schedules, [&](
auto& schedule,
auto,
auto iter) {
65 if (schedule.second ==
this) {
66 schedules->erase(iter);
77 auto id = std::this_thread::get_id();
81 auto schedule =
get_ptr(*schedules_.wlock(), id);
88 auto callback =
get_ptr((*(*schedule)->callbacks_.wlock()),
id);
105 schedules_.wlock()->insert({std::this_thread::get_id(),
this});
106 callbacks_.wlock()->insert({std::this_thread::get_id(), callback});
113 callbacks_.wlock()->erase(std::this_thread::get_id());
121 auto& baton = (*batons_.wlock())[
id];
127 auto& baton = (*batons_.wlock())[
id];
145 ManualSchedule::schedules_;
147 template <
typename T>
149 template <
template <
typename>
class Atomic>
160 ManualSchedule::beforeSharedAccess();
166 std::chrono::system_clock::time_point
const*,
167 std::chrono::steady_clock::time_point
const*,
169 ManualSchedule::beforeSharedAccess();
170 return detail::FutexResult::AWOKEN;
173 template <
typename Clock,
typename Duration>
177 const std::chrono::time_point<Clock, Duration>&) {
178 ManualSchedule::beforeSharedAccess();
179 return std::cv_status::no_timeout;
183 ManualSchedule::beforeSharedAccess();
188 DEFINE_int32(stress_factor, 1000,
"The stress test factor for tests");
189 constexpr
auto kForever = 100
h;
194 return (n * (n + 1)) / 2;
197 template <
template <
typename>
class Atom = std::atomic>
198 void basicNThreads(
int numThreads,
int iterations = FLAGS_stress_factor) {
200 auto&& barrier = std::atomic<int>{0};
201 auto&&
threads = std::vector<std::thread>{};
202 auto&& result = std::vector<int>{};
204 auto&&
function = [&](
auto id) {
206 for (
auto j = 0; j < iterations; ++j) {
208 EXPECT_EQ(barrier.fetch_add(1, std::memory_order_relaxed), 0);
209 result.push_back(
id);
210 EXPECT_EQ(barrier.fetch_sub(1, std::memory_order_relaxed), 1);
216 for (
auto i = 1;
i <= numThreads; ++
i) {
224 for (
auto value : result) {
233 auto ptr =
reinterpret_cast<std::uintptr_t
>(&
value);
265 auto&& waiter = std::thread{[&]() {
266 schedule.setCallback([&,
i = 0]()
mutable {
297 auto&& one = std::thread{[&]() {
298 schedule.setCallback([&,
i = 0]()
mutable {
310 auto&& two = std::thread{[&]() {
311 schedule.setCallback([&,
i = 0]()
mutable {
343 auto&& one = std::thread{[&]() {
344 schedule.setCallback([&,
i = 0]()
mutable {
356 auto&& two = std::thread{[&]() {
357 schedule.setCallback([&,
i = 0]()
mutable {
380 auto&& three = std::thread{[&]() {
381 schedule.setCallback([&,
i = 0]()
mutable {
389 auto lockState =
mutex.lock();
435 basicNThreads(std::thread::hardware_concurrency());
441 for (
auto i = 0;
i < FLAGS_stress_factor; ++
i) {
453 constexpr
auto numIterationsDeterministicTest(
int threads) {
461 void runBasicNThreadsDeterministic(
int threads,
int iterations) {
462 for (
auto pass = 0; pass < 3; ++pass) {
464 basicNThreads<test::DeterministicAtomic>(
threads, iterations);
465 static_cast<void>(schedule);
471 runBasicNThreadsDeterministic(2, numIterationsDeterministicTest(2));
474 runBasicNThreadsDeterministic(4, numIterationsDeterministicTest(4));
477 runBasicNThreadsDeterministic(8, numIterationsDeterministicTest(8));
480 runBasicNThreadsDeterministic(16, numIterationsDeterministicTest(16));
483 runBasicNThreadsDeterministic(32, numIterationsDeterministicTest(32));
491 auto thread = std::thread{[&]() {
499 auto result =
mutex.try_lock_for(10ms);
509 auto thread = std::thread{[&]() {
513 std::this_thread::sleep_for(10ms);
518 auto result =
mutex.try_lock_for(kForever);
527 auto thread = std::thread{[&] {
547 schedule.setCallback([&,
i = 0]()
mutable {
567 auto one = std::thread{[&] {
580 auto two = std::thread{[&] {
581 schedule.setCallback([&,
i = 0]()
mutable {
604 schedule.setCallback([&,
i = 0]()
mutable {
621 template <
template <
typename>
class Atom = std::atomic>
622 void stressTryLockWithConcurrentLocks(
624 int iterations = FLAGS_stress_factor) {
625 auto&&
threads = std::vector<std::thread>{};
627 auto&& atomic = std::atomic<std::uint64_t>{0};
629 for (
auto i = 0;
i < numThreads; ++
i) {
631 for (
auto j = 0; j < iterations; ++j) {
633 EXPECT_EQ(atomic.fetch_add(1, std::memory_order_relaxed), 0);
634 EXPECT_EQ(atomic.fetch_sub(1, std::memory_order_relaxed), 1);
640 for (
auto i = 0;
i < iterations; ++
i) {
642 EXPECT_EQ(atomic.fetch_add(1, std::memory_order_relaxed), 0);
643 EXPECT_EQ(atomic.fetch_sub(1, std::memory_order_relaxed), 1);
655 stressTryLockWithConcurrentLocks(2);
658 stressTryLockWithConcurrentLocks(4);
661 stressTryLockWithConcurrentLocks(8);
664 stressTryLockWithConcurrentLocks(16);
667 stressTryLockWithConcurrentLocks(32);
670 stressTryLockWithConcurrentLocks(64);
674 auto iterations = numIterationsDeterministicTest(2);
675 stressTryLockWithConcurrentLocks(2, iterations);
677 for (
auto pass = 0; pass < 3; ++pass) {
679 stressTryLockWithConcurrentLocks<test::DeterministicAtomic>(2, iterations);
680 static_cast<void>(schedule);
684 auto iterations = numIterationsDeterministicTest(4);
685 stressTryLockWithConcurrentLocks(4, iterations);
687 for (
auto pass = 0; pass < 3; ++pass) {
689 stressTryLockWithConcurrentLocks<test::DeterministicAtomic>(4, iterations);
690 static_cast<void>(schedule);
694 auto iterations = numIterationsDeterministicTest(8);
695 stressTryLockWithConcurrentLocks(8, iterations);
697 for (
auto pass = 0; pass < 3; ++pass) {
699 stressTryLockWithConcurrentLocks<test::DeterministicAtomic>(8, iterations);
700 static_cast<void>(schedule);
704 auto iterations = numIterationsDeterministicTest(16);
705 stressTryLockWithConcurrentLocks(16, iterations);
707 for (
auto pass = 0; pass < 3; ++pass) {
709 stressTryLockWithConcurrentLocks<test::DeterministicAtomic>(16, iterations);
710 static_cast<void>(schedule);
714 auto iterations = numIterationsDeterministicTest(32);
715 stressTryLockWithConcurrentLocks(32, iterations);
717 for (
auto pass = 0; pass < 3; ++pass) {
719 stressTryLockWithConcurrentLocks<test::DeterministicAtomic>(32, iterations);
720 static_cast<void>(schedule);
724 stressTryLockWithConcurrentLocks(64, 5);
726 for (
auto pass = 0; pass < 3; ++pass) {
728 stressTryLockWithConcurrentLocks<test::DeterministicAtomic>(64, 5);
729 static_cast<void>(schedule);
734 template <
template <
typename>
class Atom = std::atomic>
735 void concurrentTryLocks(
int numThreads,
int iterations = FLAGS_stress_factor) {
736 auto&&
threads = std::vector<std::thread>{};
738 auto&& atomic = std::atomic<std::uint64_t>{0};
740 for (
auto i = 0;
i < numThreads; ++
i) {
742 for (
auto j = 0; j < iterations; ++j) {
744 EXPECT_EQ(atomic.fetch_add(1, std::memory_order_relaxed), 0);
745 EXPECT_EQ(atomic.fetch_sub(1, std::memory_order_relaxed), 1);
759 concurrentTryLocks(2);
762 concurrentTryLocks(4);
765 concurrentTryLocks(8);
768 concurrentTryLocks(16);
771 concurrentTryLocks(32);
774 concurrentTryLocks(64);
778 auto iterations = numIterationsDeterministicTest(2);
779 concurrentTryLocks(2, iterations);
781 for (
auto pass = 0; pass < 3; ++pass) {
783 concurrentTryLocks<test::DeterministicAtomic>(2, iterations);
784 static_cast<void>(schedule);
788 auto iterations = numIterationsDeterministicTest(4);
789 concurrentTryLocks(4, iterations);
791 for (
auto pass = 0; pass < 3; ++pass) {
793 concurrentTryLocks<test::DeterministicAtomic>(4, iterations);
794 static_cast<void>(schedule);
798 auto iterations = numIterationsDeterministicTest(8);
799 concurrentTryLocks(8, iterations);
801 for (
auto pass = 0; pass < 3; ++pass) {
803 concurrentTryLocks<test::DeterministicAtomic>(8, iterations);
804 static_cast<void>(schedule);
808 auto iterations = numIterationsDeterministicTest(16);
809 concurrentTryLocks(16, iterations);
811 for (
auto pass = 0; pass < 3; ++pass) {
813 concurrentTryLocks<test::DeterministicAtomic>(16, iterations);
814 static_cast<void>(schedule);
818 auto iterations = numIterationsDeterministicTest(32);
819 concurrentTryLocks(32, iterations);
821 for (
auto pass = 0; pass < 3; ++pass) {
823 concurrentTryLocks<test::DeterministicAtomic>(32, iterations);
824 static_cast<void>(schedule);
828 concurrentTryLocks(64, 5);
830 for (
auto pass = 0; pass < 3; ++pass) {
832 concurrentTryLocks<test::DeterministicAtomic>(64, 5);
833 static_cast<void>(schedule);
const Map::mapped_type * get_ptr(const Map &map, const Key &key)
static Synchronized< std::unordered_map< std::thread::id, ManualSchedule * > > schedules_
std::atomic< int64_t > sum(0)
#define EXPECT_EQ(val1, val2)
constexpr detail::Map< Move > move
Atom< std::uint32_t > Futex
detail::FutexResult futexWaitImpl(const detail::Futex< ManualAtomic > *, uint32_t, std::chrono::system_clock::time_point const *, std::chrono::steady_clock::time_point const *, uint32_t)
static void beforeSharedAccess()
static std::thread thread(Func &&func, Args &&...args)
—— Concurrent Priority Queue Implementation ——
std::cv_status atomic_wait_until(const ManualAtomic< std::uintptr_t > *, std::uintptr_t, const std::chrono::time_point< Clock, Duration > &)
std::vector< std::thread::id > threads
static void afterSharedAccess(bool)
static std::function< size_t(size_t)> uniform(uint64_t seed)
void setCallback(std::function< void()> callback)
Synchronized< std::unordered_map< int, folly::Baton<> > > batons_
int futexWakeImpl(const detail::Futex< ManualAtomic > *, int, uint32_t)
static const char *const value
#define EXPECT_TRUE(condition)
TEST(DistributedMutex, DeterministicTryLockSixtyFourThreads)
#define DEFINE_int32(_name, _default, _description)
void for_each(T const &range, Function< void(typename T::value_type const &) const > const &func)
#define EXPECT_FALSE(condition)
static void join(std::thread &child)
Synchronized< std::unordered_map< std::thread::id, std::function< void()> > > callbacks_
void atomic_notify_one(const ManualAtomic< std::uintptr_t > *)