25 #include <unordered_map> 40 static std::unordered_map<
41 const detail::Futex<DeterministicAtomic>*,
42 std::list<std::pair<uint32_t, bool*>>>
48 const std::function<
size_t(
size_t)>&
scheduler)
49 : scheduler_(
scheduler), nextThreadId_(1), step_(0) {
63 assert(
sems_.size() == 1);
69 auto rand = std::make_shared<std::ranlux48>(
seed);
70 return [rand](
size_t numActive) {
71 auto dist = std::uniform_int_distribution<size_t>(0, numActive - 1);
79 subsetSize_(subsetSize),
80 stepsBetweenSelect_(stepsBetweenSelect),
84 adjustPermSize(numActive);
85 if (stepsLeft_-- == 0) {
86 stepsLeft_ = stepsBetweenSelect_ - 1;
89 return perm_[uniform_(
std::min(numActive, subsetSize_))];
102 if (perm_.size() > numActive) {
107 [=](
size_t x) {
return x >= numActive; }),
110 while (perm_.size() < numActive) {
111 perm_.push_back(perm_.size());
114 assert(perm_.size() == numActive);
118 for (
size_t i = 0;
i <
std::min(perm_.size() - 1, subsetSize_); ++
i) {
119 size_t j = uniform_(perm_.size() -
i) +
i;
125 std::function<size_t(size_t)>
127 auto gen = std::make_shared<UniformSubset>(
seed, n,
m);
128 return [=](
size_t numActive) {
return (*gen)(numActive); };
142 sem_post(sched->sems_[sched->scheduler_(sched->sems_.size())]);
151 sem_post(sched->sems_[sched->scheduler_(sched->sems_.size())]);
194 sched->
sems_.push_back(sem);
202 std::find(sched->sems_.begin(), sched->sems_.end(),
tls_sem));
208 sem_t*
s =
new sem_t;
221 bool started =
false;
224 if (
active_.count(std::this_thread::get_id()) == 1) {
240 active_.erase(std::this_thread::get_id());
241 if (
sems_.size() > 0) {
256 assert(sched->joins_.count(child.get_id()) == 0);
257 if (sched->active_.count(child.get_id())) {
259 sched->joins_.insert({child.get_id(), sem});
263 assert(!sched->active_.count(child.get_id()));
291 int rv = sem_trywait(sem);
292 int e = rv == 0 ? 0 : errno;
294 "sem_trywait(" << sem <<
") = " << rv <<
" errno=" << e);
311 const detail::Futex<DeterministicAtomic>* futex,
313 std::chrono::system_clock::time_point
const* absSystemTimeout,
314 std::chrono::steady_clock::time_point
const* absSteadyTimeout,
316 using namespace test;
320 bool hasTimeout = absSystemTimeout !=
nullptr || absSteadyTimeout !=
nullptr;
326 "futexWait(" << futex <<
", " << std::hex << expected <<
", .., " 327 << std::hex << waitMask <<
") beginning..");
329 if (futex->load_direct() == expected) {
331 queue.emplace_back(waitMask, &awoken);
332 auto ours = queue.end();
342 if (!awoken && hasTimeout &&
358 result = FutexResult::VALUE_CHANGED;
362 char const* resultStr =
"?";
364 case FutexResult::AWOKEN:
365 resultStr =
"AWOKEN";
368 resultStr =
"TIMEDOUT";
371 resultStr =
"INTERRUPTED";
373 case FutexResult::VALUE_CHANGED:
374 resultStr =
"VALUE_CHANGED";
378 "futexWait(" << futex <<
", " << std::hex << expected <<
", .., " 379 << std::hex << waitMask <<
") -> " << resultStr);
388 using namespace test;
396 auto iter = queue.begin();
397 while (iter != queue.end() && rv <
count) {
399 if ((cur->first & wakeMask) != 0) {
400 *(cur->second) =
true;
411 "futexWake(" << futex <<
", " << count <<
", " << std::hex << wakeMask
static sem_t * descheduleCurrentThread()
static void setAuxAct(AuxAct &aux)
static FOLLY_TLS unsigned tls_threadId
static CacheLocality uniform(size_t numCpus)
static std::mutex futexLock
std::function< size_t(size_t)> scheduler_
#define FOLLY_TEST_DSCHED_VLOG(...)
static void beforeSharedAccess()
static bool tryWait(sem_t *sem)
static FOLLY_TLS DeterministicSchedule * tls_sched
Atom< std::uint32_t > Futex
detail::FutexResult futexWaitImpl(const detail::Futex< ManualAtomic > *, uint32_t, std::chrono::system_clock::time_point const *, std::chrono::steady_clock::time_point const *, uint32_t)
sem_t * beforeThreadCreate()
static void wait(sem_t *sem)
static void afterSharedAccess()
—— Concurrent Priority Queue Implementation ——
static void post(sem_t *sem)
std::shared_ptr< folly::FunctionScheduler > scheduler
DeterministicSchedule(const std::function< size_t(size_t)> &scheduler)
void afterThreadCreate(sem_t *)
static std::function< size_t(size_t)> uniform(uint64_t seed)
static void setAuxChk(AuxChk &aux)
std::unordered_set< std::thread::id > active_
static size_t getRandNumber(size_t n)
std::function< void(uint64_t)> AuxChk
int(* Func)(unsigned *cpu, unsigned *node, void *unused)
Function pointer to a function with the same signature as getcpu(2).
static map< string, int > m
static void reschedule(sem_t *sem)
std::uniform_int_distribution< milliseconds::rep > dist
int futexWakeImpl(const detail::Futex< ManualAtomic > *, int, uint32_t)
static std::function< size_t(size_t)> uniformSubset(uint64_t seed, size_t n=2, size_t m=64)
std::function< void(bool)> AuxAct
static FOLLY_TLS sem_t * tls_sem
folly::Function< void()> child
void swap(SwapTrackingAlloc< T > &, SwapTrackingAlloc< T > &)
static void join(std::thread &child)
std::unordered_map< std::thread::id, sem_t * > joins_
static thread_local AuxAct tls_aux_act
static Getcpu::Func pickGetcpuFunc()
Returns the best getcpu implementation for Atom.
std::vector< sem_t * > sems_
static int getcpu(unsigned *cpu, unsigned *node, void *unused)
folly::Function< void()> parent
static void clearAuxChk()
static std::unordered_map< const detail::Futex< DeterministicAtomic > *, std::list< std::pair< uint32_t, bool * > > > futexQueues