proxygen
DeterministicSchedule.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2013-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include <assert.h>
20 
21 #include <algorithm>
22 #include <list>
23 #include <mutex>
24 #include <random>
25 #include <unordered_map>
26 #include <utility>
27 
28 #include <folly/Random.h>
29 
30 namespace folly {
31 namespace test {
32 
33 FOLLY_TLS sem_t* DeterministicSchedule::tls_sem;
34 FOLLY_TLS DeterministicSchedule* DeterministicSchedule::tls_sched;
35 FOLLY_TLS unsigned DeterministicSchedule::tls_threadId;
38 
39 // access is protected by futexLock
40 static std::unordered_map<
41  const detail::Futex<DeterministicAtomic>*,
42  std::list<std::pair<uint32_t, bool*>>>
44 
46 
48  const std::function<size_t(size_t)>& scheduler)
49  : scheduler_(scheduler), nextThreadId_(1), step_(0) {
50  assert(tls_sem == nullptr);
51  assert(tls_sched == nullptr);
52  assert(tls_aux_act == nullptr);
53 
54  tls_sem = new sem_t;
55  sem_init(tls_sem, 0, 1);
56  sems_.push_back(tls_sem);
57 
58  tls_sched = this;
59 }
60 
62  assert(tls_sched == this);
63  assert(sems_.size() == 1);
64  assert(sems_[0] == tls_sem);
66 }
67 
68 std::function<size_t(size_t)> DeterministicSchedule::uniform(uint64_t seed) {
69  auto rand = std::make_shared<std::ranlux48>(seed);
70  return [rand](size_t numActive) {
71  auto dist = std::uniform_int_distribution<size_t>(0, numActive - 1);
72  return dist(*rand);
73  };
74 }
75 
76 struct UniformSubset {
77  UniformSubset(uint64_t seed, size_t subsetSize, size_t stepsBetweenSelect)
78  : uniform_(DeterministicSchedule::uniform(seed)),
79  subsetSize_(subsetSize),
80  stepsBetweenSelect_(stepsBetweenSelect),
81  stepsLeft_(0) {}
82 
83  size_t operator()(size_t numActive) {
84  adjustPermSize(numActive);
85  if (stepsLeft_-- == 0) {
86  stepsLeft_ = stepsBetweenSelect_ - 1;
87  shufflePrefix();
88  }
89  return perm_[uniform_(std::min(numActive, subsetSize_))];
90  }
91 
92  private:
93  std::function<size_t(size_t)> uniform_;
94  const size_t subsetSize_;
95  const size_t stepsBetweenSelect_;
96 
97  size_t stepsLeft_;
98  // only the first subsetSize_ is properly randomized
99  std::vector<size_t> perm_;
100 
101  void adjustPermSize(size_t numActive) {
102  if (perm_.size() > numActive) {
103  perm_.erase(
104  std::remove_if(
105  perm_.begin(),
106  perm_.end(),
107  [=](size_t x) { return x >= numActive; }),
108  perm_.end());
109  } else {
110  while (perm_.size() < numActive) {
111  perm_.push_back(perm_.size());
112  }
113  }
114  assert(perm_.size() == numActive);
115  }
116 
117  void shufflePrefix() {
118  for (size_t i = 0; i < std::min(perm_.size() - 1, subsetSize_); ++i) {
119  size_t j = uniform_(perm_.size() - i) + i;
120  std::swap(perm_[i], perm_[j]);
121  }
122  }
123 };
124 
125 std::function<size_t(size_t)>
127  auto gen = std::make_shared<UniformSubset>(seed, n, m);
128  return [=](size_t numActive) { return (*gen)(numActive); };
129 }
130 
132  if (tls_sem) {
133  sem_wait(tls_sem);
134  }
135 }
136 
138  auto sched = tls_sched;
139  if (!sched) {
140  return;
141  }
142  sem_post(sched->sems_[sched->scheduler_(sched->sems_.size())]);
143 }
144 
146  auto sched = tls_sched;
147  if (!sched) {
148  return;
149  }
150  sched->callAux(success);
151  sem_post(sched->sems_[sched->scheduler_(sched->sems_.size())]);
152 }
153 
155  if (tls_sched) {
156  return tls_sched->scheduler_(n);
157  }
158  return Random::rand32() % n;
159 }
160 
162  unsigned* cpu,
163  unsigned* node,
164  void* /* unused */) {
165  if (!tls_threadId && tls_sched) {
169  }
170  if (cpu) {
171  *cpu = tls_threadId;
172  }
173  if (node) {
174  *node = tls_threadId;
175  }
176  return 0;
177 }
178 
180  tls_aux_act = aux;
181 }
182 
184  aux_chk = aux;
185 }
186 
188  aux_chk = nullptr;
189 }
190 
192  auto sched = tls_sched;
193  if (sched) {
194  sched->sems_.push_back(sem);
195  }
196 }
197 
199  auto sched = tls_sched;
200  if (sched) {
201  sched->sems_.erase(
202  std::find(sched->sems_.begin(), sched->sems_.end(), tls_sem));
203  }
204  return tls_sem;
205 }
206 
208  sem_t* s = new sem_t;
209  sem_init(s, 0, 0);
211  sems_.push_back(s);
213  return s;
214 }
215 
217  assert(tls_sem == nullptr);
218  assert(tls_sched == nullptr);
219  tls_sem = sem;
220  tls_sched = this;
221  bool started = false;
222  while (!started) {
224  if (active_.count(std::this_thread::get_id()) == 1) {
225  started = true;
226  }
228  }
229 }
230 
232  assert(tls_sched == this);
234  auto parent = joins_.find(std::this_thread::get_id());
235  if (parent != joins_.end()) {
236  reschedule(parent->second);
237  joins_.erase(parent);
238  }
239  sems_.erase(std::find(sems_.begin(), sems_.end(), tls_sem));
240  active_.erase(std::this_thread::get_id());
241  if (sems_.size() > 0) {
242  FOLLY_TEST_DSCHED_VLOG("exiting");
244  }
245  sem_destroy(tls_sem);
246  delete tls_sem;
247  tls_sem = nullptr;
248  tls_sched = nullptr;
249  tls_aux_act = nullptr;
250 }
251 
252 void DeterministicSchedule::join(std::thread& child) {
253  auto sched = tls_sched;
254  if (sched) {
256  assert(sched->joins_.count(child.get_id()) == 0);
257  if (sched->active_.count(child.get_id())) {
258  sem_t* sem = descheduleCurrentThread();
259  sched->joins_.insert({child.get_id(), sem});
261  // Wait to be scheduled by exiting child thread
263  assert(!sched->active_.count(child.get_id()));
264  }
266  }
267  FOLLY_TEST_DSCHED_VLOG("joined " << std::hex << child.get_id());
268  child.join();
269 }
270 
271 void DeterministicSchedule::callAux(bool success) {
272  ++step_;
273  if (tls_aux_act) {
274  tls_aux_act(success);
275  tls_aux_act = nullptr;
276  }
277  if (aux_chk) {
278  aux_chk(step_);
279  }
280 }
281 
282 void DeterministicSchedule::post(sem_t* sem) {
284  sem_post(sem);
285  FOLLY_TEST_DSCHED_VLOG("sem_post(" << sem << ")");
287 }
288 
291  int rv = sem_trywait(sem);
292  int e = rv == 0 ? 0 : errno;
294  "sem_trywait(" << sem << ") = " << rv << " errno=" << e);
296  if (rv == 0) {
297  return true;
298  } else {
299  assert(e == EAGAIN);
300  return false;
301  }
302 }
303 
304 void DeterministicSchedule::wait(sem_t* sem) {
305  while (!tryWait(sem)) {
306  // we're not busy waiting because this is a deterministic schedule
307  }
308 }
309 
311  const detail::Futex<DeterministicAtomic>* futex,
312  uint32_t expected,
313  std::chrono::system_clock::time_point const* absSystemTimeout,
314  std::chrono::steady_clock::time_point const* absSteadyTimeout,
315  uint32_t waitMask) {
316  using namespace test;
317  using namespace std::chrono;
318  using namespace folly::detail;
319 
320  bool hasTimeout = absSystemTimeout != nullptr || absSteadyTimeout != nullptr;
321  bool awoken = false;
322  FutexResult result = FutexResult::AWOKEN;
323 
326  "futexWait(" << futex << ", " << std::hex << expected << ", .., "
327  << std::hex << waitMask << ") beginning..");
328  futexLock.lock();
329  if (futex->load_direct() == expected) {
330  auto& queue = futexQueues[futex];
331  queue.emplace_back(waitMask, &awoken);
332  auto ours = queue.end();
333  ours--;
334  while (!awoken) {
335  futexLock.unlock();
338  futexLock.lock();
339 
340  // Simulate spurious wake-ups, timeouts each time with
341  // a 10% probability if we haven't been woken up already
342  if (!awoken && hasTimeout &&
344  assert(futexQueues.count(futex) != 0 && &futexQueues[futex] == &queue);
345  queue.erase(ours);
346  if (queue.empty()) {
347  futexQueues.erase(futex);
348  }
349  // Simulate ETIMEDOUT 90% of the time and other failures
350  // remaining time
351  result = DeterministicSchedule::getRandNumber(100) >= 10
354  break;
355  }
356  }
357  } else {
358  result = FutexResult::VALUE_CHANGED;
359  }
360  futexLock.unlock();
361 
362  char const* resultStr = "?";
363  switch (result) {
364  case FutexResult::AWOKEN:
365  resultStr = "AWOKEN";
366  break;
368  resultStr = "TIMEDOUT";
369  break;
371  resultStr = "INTERRUPTED";
372  break;
373  case FutexResult::VALUE_CHANGED:
374  resultStr = "VALUE_CHANGED";
375  break;
376  }
378  "futexWait(" << futex << ", " << std::hex << expected << ", .., "
379  << std::hex << waitMask << ") -> " << resultStr);
381  return result;
382 }
383 
386  int count,
387  uint32_t wakeMask) {
388  using namespace test;
389  using namespace std::chrono;
390 
391  int rv = 0;
393  futexLock.lock();
394  if (futexQueues.count(futex) > 0) {
395  auto& queue = futexQueues[futex];
396  auto iter = queue.begin();
397  while (iter != queue.end() && rv < count) {
398  auto cur = iter++;
399  if ((cur->first & wakeMask) != 0) {
400  *(cur->second) = true;
401  rv++;
402  queue.erase(cur);
403  }
404  }
405  if (queue.empty()) {
406  futexQueues.erase(futex);
407  }
408  }
409  futexLock.unlock();
411  "futexWake(" << futex << ", " << count << ", " << std::hex << wakeMask
412  << ") -> " << rv);
414  return rv;
415 }
416 
417 } // namespace test
418 } // namespace folly
419 
420 namespace folly {
421 
422 template <>
423 CacheLocality const& CacheLocality::system<test::DeterministicAtomic>() {
424  static CacheLocality cache(CacheLocality::uniform(16));
425  return cache;
426 }
427 
428 template <>
431 }
432 } // namespace folly
Definition: InvokeTest.cpp:58
static FOLLY_TLS unsigned tls_threadId
static CacheLocality uniform(size_t numCpus)
static std::mutex futexLock
std::function< size_t(size_t)> scheduler_
#define FOLLY_TEST_DSCHED_VLOG(...)
static const int seed
static FOLLY_TLS DeterministicSchedule * tls_sched
const int x
Atom< std::uint32_t > Futex
Definition: Futex.h:51
detail::FutexResult futexWaitImpl(const detail::Futex< ManualAtomic > *, uint32_t, std::chrono::system_clock::time_point const *, std::chrono::steady_clock::time_point const *, uint32_t)
size_t operator()(size_t numActive)
UniformSubset(uint64_t seed, size_t subsetSize, size_t stepsBetweenSelect)
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
std::shared_ptr< folly::FunctionScheduler > scheduler
Definition: FilePoller.cpp:50
DeterministicSchedule(const std::function< size_t(size_t)> &scheduler)
static std::function< size_t(size_t)> uniform(uint64_t seed)
LogLevel min
Definition: LogLevel.cpp:30
std::unordered_set< std::thread::id > active_
std::function< void(uint64_t)> AuxChk
std::function< size_t(size_t)> uniform_
int(* Func)(unsigned *cpu, unsigned *node, void *unused)
Function pointer to a function with the same signature as getcpu(2).
static map< string, int > m
std::uniform_int_distribution< milliseconds::rep > dist
int futexWakeImpl(const detail::Futex< ManualAtomic > *, int, uint32_t)
void adjustPermSize(size_t numActive)
static std::function< size_t(size_t)> uniformSubset(uint64_t seed, size_t n=2, size_t m=64)
int * count
std::function< void(bool)> AuxAct
std::mutex mutex
static set< string > s
folly::Function< void()> child
Definition: AtFork.cpp:35
void swap(SwapTrackingAlloc< T > &, SwapTrackingAlloc< T > &)
Definition: F14TestUtil.h:414
static void join(std::thread &child)
static uint32_t rand32()
Definition: Random.h:213
std::unordered_map< std::thread::id, sem_t * > joins_
static thread_local AuxAct tls_aux_act
static Getcpu::Func pickGetcpuFunc()
Returns the best getcpu implementation for Atom.
static int getcpu(unsigned *cpu, unsigned *node, void *unused)
folly::Function< void()> parent
Definition: AtFork.cpp:34
static std::unordered_map< const detail::Futex< DeterministicAtomic > *, std::list< std::pair< uint32_t, bool * > > > futexQueues