proxygen
DeterministicSchedule.h
Go to the documentation of this file.
1 /*
2  * Copyright 2013-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <assert.h>
20 #include <boost/noncopyable.hpp>
21 #include <errno.h>
22 #include <glog/logging.h>
23 #include <atomic>
24 #include <functional>
25 #include <mutex>
26 #include <queue>
27 #include <thread>
28 #include <unordered_set>
29 #include <vector>
30 
31 #include <folly/ScopeGuard.h>
33 #include <folly/detail/Futex.h>
36 
37 namespace folly {
38 namespace test {
39 
40 // This is ugly, but better perf for DeterministicAtomic translates
41 // directly to more states explored and tested
42 #define FOLLY_TEST_DSCHED_VLOG(...) \
43  do { \
44  if (false) { \
45  VLOG(2) << std::hex << std::this_thread::get_id() << ": " \
46  << __VA_ARGS__; \
47  } \
48  } while (false)
49 
50 /* signatures of user-defined auxiliary functions */
51 using AuxAct = std::function<void(bool)>;
52 using AuxChk = std::function<void(uint64_t)>;
53 
75 class DeterministicSchedule : boost::noncopyable {
76  public:
82  explicit DeterministicSchedule(
83  const std::function<size_t(size_t)>& scheduler);
84 
87 
95  static std::function<size_t(size_t)> uniform(uint64_t seed);
96 
103  static std::function<size_t(size_t)>
104  uniformSubset(uint64_t seed, size_t n = 2, size_t m = 64);
105 
108  static void beforeSharedAccess();
109 
112  static void afterSharedAccess();
113 
118  static void afterSharedAccess(bool success);
119 
122  template <typename Func, typename... Args>
123  static inline std::thread thread(Func&& func, Args&&... args) {
124  // TODO: maybe future versions of gcc will allow forwarding to thread
125  auto sched = tls_sched;
126  auto sem = sched ? sched->beforeThreadCreate() : nullptr;
127  auto child = std::thread(
128  [=](Args... a) {
129  if (sched) {
130  sched->afterThreadCreate(sem);
131  beforeSharedAccess();
132  FOLLY_TEST_DSCHED_VLOG("running");
133  afterSharedAccess();
134  }
135  SCOPE_EXIT {
136  if (sched) {
137  sched->beforeThreadExit();
138  }
139  };
140  func(a...);
141  },
142  args...);
143  if (sched) {
145  sched->active_.insert(child.get_id());
146  FOLLY_TEST_DSCHED_VLOG("forked " << std::hex << child.get_id());
148  }
149  return child;
150  }
151 
153  static void join(std::thread& child);
154 
156  static void post(sem_t* sem);
157 
160  static bool tryWait(sem_t* sem);
161 
163  static void wait(sem_t* sem);
164 
167  static size_t getRandNumber(size_t n);
168 
170  static int getcpu(unsigned* cpu, unsigned* node, void* unused);
171 
177  static void setAuxAct(AuxAct& aux);
178 
183  static void setAuxChk(AuxChk& aux);
184 
186  static void clearAuxChk();
187 
189  static sem_t* descheduleCurrentThread();
190 
192  static void reschedule(sem_t* sem);
193 
194  private:
195  static FOLLY_TLS sem_t* tls_sem;
196  static FOLLY_TLS DeterministicSchedule* tls_sched;
197  static FOLLY_TLS unsigned tls_threadId;
198  static thread_local AuxAct tls_aux_act;
199  static AuxChk aux_chk;
200 
201  std::function<size_t(size_t)> scheduler_;
202  std::vector<sem_t*> sems_;
203  std::unordered_set<std::thread::id> active_;
204  std::unordered_map<std::thread::id, sem_t*> joins_;
205  unsigned nextThreadId_;
206  /* step_ keeps count of shared accesses that correspond to user
207  * synchronization steps (atomic accesses for now).
208  * The reason for keeping track of this here and not just with
209  * auxiliary data is to provide users with warning signs (e.g.,
210  * skipped steps) if they inadvertently forget to set up aux
211  * functions for some shared accesses. */
213 
214  sem_t* beforeThreadCreate();
215  void afterThreadCreate(sem_t*);
216  void beforeThreadExit();
218  void callAux(bool);
219 };
220 
225 template <
226  typename T,
227  typename Schedule = DeterministicSchedule,
228  template <typename> class Atom = std::atomic>
230  DeterministicAtomicImpl() = default;
231  ~DeterministicAtomicImpl() = default;
234  delete;
235 
236  constexpr /* implicit */ DeterministicAtomicImpl(T v) noexcept : data_(v) {}
237 
239  return data_.is_lock_free();
240  }
241 
243  T& v0,
244  T v1,
245  std::memory_order mo = std::memory_order_seq_cst) noexcept {
246  return compare_exchange_strong(
248  }
250  T& v0,
251  T v1,
252  std::memory_order success,
253  std::memory_order failure) noexcept {
254  Schedule::beforeSharedAccess();
255  auto orig = v0;
256  bool rv = data_.compare_exchange_strong(v0, v1, success, failure);
258  this << ".compare_exchange_strong(" << std::hex << orig << ", "
259  << std::hex << v1 << ") -> " << rv << "," << std::hex << v0);
260  Schedule::afterSharedAccess(rv);
261  return rv;
262  }
263 
265  T& v0,
266  T v1,
267  std::memory_order mo = std::memory_order_seq_cst) noexcept {
268  return compare_exchange_weak(
270  }
272  T& v0,
273  T v1,
274  std::memory_order success,
275  std::memory_order failure) noexcept {
276  Schedule::beforeSharedAccess();
277  auto orig = v0;
278  bool rv = data_.compare_exchange_weak(v0, v1, success, failure);
280  this << ".compare_exchange_weak(" << std::hex << orig << ", "
281  << std::hex << v1 << ") -> " << rv << "," << std::hex << v0);
282  Schedule::afterSharedAccess(rv);
283  return rv;
284  }
285 
286  T exchange(T v, std::memory_order mo = std::memory_order_seq_cst) noexcept {
287  Schedule::beforeSharedAccess();
288  T rv = data_.exchange(v, mo);
290  this << ".exchange(" << std::hex << v << ") -> " << std::hex << rv);
291  Schedule::afterSharedAccess(true);
292  return rv;
293  }
294 
295  /* implicit */ operator T() const noexcept {
296  Schedule::beforeSharedAccess();
297  T rv = data_.operator T();
298  FOLLY_TEST_DSCHED_VLOG(this << "() -> " << std::hex << rv);
299  Schedule::afterSharedAccess(true);
300  return rv;
301  }
302 
303  T load(std::memory_order mo = std::memory_order_seq_cst) const noexcept {
304  Schedule::beforeSharedAccess();
305  T rv = data_.load(mo);
306  FOLLY_TEST_DSCHED_VLOG(this << ".load() -> " << std::hex << rv);
307  Schedule::afterSharedAccess(true);
308  return rv;
309  }
310 
312  Schedule::beforeSharedAccess();
313  T rv = (data_ = v);
314  FOLLY_TEST_DSCHED_VLOG(this << " = " << std::hex << v);
315  Schedule::afterSharedAccess(true);
316  return rv;
317  }
318 
319  void store(T v, std::memory_order mo = std::memory_order_seq_cst) noexcept {
320  Schedule::beforeSharedAccess();
321  data_.store(v, mo);
322  FOLLY_TEST_DSCHED_VLOG(this << ".store(" << std::hex << v << ")");
323  Schedule::afterSharedAccess(true);
324  }
325 
327  Schedule::beforeSharedAccess();
328  T rv = ++data_;
329  FOLLY_TEST_DSCHED_VLOG(this << " pre++ -> " << std::hex << rv);
330  Schedule::afterSharedAccess(true);
331  return rv;
332  }
333 
334  T operator++(int /* postDummy */) noexcept {
335  Schedule::beforeSharedAccess();
336  T rv = data_++;
337  FOLLY_TEST_DSCHED_VLOG(this << " post++ -> " << std::hex << rv);
338  Schedule::afterSharedAccess(true);
339  return rv;
340  }
341 
343  Schedule::beforeSharedAccess();
344  T rv = --data_;
345  FOLLY_TEST_DSCHED_VLOG(this << " pre-- -> " << std::hex << rv);
346  Schedule::afterSharedAccess(true);
347  return rv;
348  }
349 
350  T operator--(int /* postDummy */) noexcept {
351  Schedule::beforeSharedAccess();
352  T rv = data_--;
353  FOLLY_TEST_DSCHED_VLOG(this << " post-- -> " << std::hex << rv);
354  Schedule::afterSharedAccess(true);
355  return rv;
356  }
357 
359  Schedule::beforeSharedAccess();
360  T rv = (data_ += v);
362  this << " += " << std::hex << v << " -> " << std::hex << rv);
363  Schedule::afterSharedAccess(true);
364  return rv;
365  }
366 
367  T fetch_add(T v, std::memory_order mo = std::memory_order_seq_cst) noexcept {
368  Schedule::beforeSharedAccess();
369  T rv = data_.fetch_add(v, mo);
371  this << ".fetch_add(" << std::hex << v << ") -> " << std::hex << rv);
372  Schedule::afterSharedAccess(true);
373  return rv;
374  }
375 
377  Schedule::beforeSharedAccess();
378  T rv = (data_ -= v);
380  this << " -= " << std::hex << v << " -> " << std::hex << rv);
381  Schedule::afterSharedAccess(true);
382  return rv;
383  }
384 
385  T fetch_sub(T v, std::memory_order mo = std::memory_order_seq_cst) noexcept {
386  Schedule::beforeSharedAccess();
387  T rv = data_.fetch_sub(v, mo);
389  this << ".fetch_sub(" << std::hex << v << ") -> " << std::hex << rv);
390  Schedule::afterSharedAccess(true);
391  return rv;
392  }
393 
395  Schedule::beforeSharedAccess();
396  T rv = (data_ &= v);
398  this << " &= " << std::hex << v << " -> " << std::hex << rv);
399  Schedule::afterSharedAccess(true);
400  return rv;
401  }
402 
403  T fetch_and(T v, std::memory_order mo = std::memory_order_seq_cst) noexcept {
404  Schedule::beforeSharedAccess();
405  T rv = data_.fetch_and(v, mo);
407  this << ".fetch_and(" << std::hex << v << ") -> " << std::hex << rv);
408  Schedule::afterSharedAccess(true);
409  return rv;
410  }
411 
413  Schedule::beforeSharedAccess();
414  T rv = (data_ |= v);
416  this << " |= " << std::hex << v << " -> " << std::hex << rv);
417  Schedule::afterSharedAccess(true);
418  return rv;
419  }
420 
421  T fetch_or(T v, std::memory_order mo = std::memory_order_seq_cst) noexcept {
422  Schedule::beforeSharedAccess();
423  T rv = data_.fetch_or(v, mo);
425  this << ".fetch_or(" << std::hex << v << ") -> " << std::hex << rv);
426  Schedule::afterSharedAccess(true);
427  return rv;
428  }
429 
431  Schedule::beforeSharedAccess();
432  T rv = (data_ ^= v);
434  this << " ^= " << std::hex << v << " -> " << std::hex << rv);
435  Schedule::afterSharedAccess(true);
436  return rv;
437  }
438 
439  T fetch_xor(T v, std::memory_order mo = std::memory_order_seq_cst) noexcept {
440  Schedule::beforeSharedAccess();
441  T rv = data_.fetch_xor(v, mo);
443  this << ".fetch_xor(" << std::hex << v << ") -> " << std::hex << rv);
444  Schedule::afterSharedAccess(true);
445  return rv;
446  }
447 
450  return data_.load(std::memory_order_relaxed);
451  }
452 
453  private:
454  Atom<T> data_;
455 };
456 
457 template <typename T>
459 
460 /* Futex extensions for DeterministicSchedule based Futexes */
461 int futexWakeImpl(
463  int count,
464  uint32_t wakeMask);
467  uint32_t expected,
468  std::chrono::system_clock::time_point const* absSystemTime,
469  std::chrono::steady_clock::time_point const* absSteadyTime,
470  uint32_t waitMask);
471 
478 template <typename Integer>
480 template <typename Integer, typename Clock, typename Duration>
481 std::cv_status atomic_wait_until(
483  Integer,
484  const std::chrono::time_point<Clock, Duration>&) {
485  return std::cv_status::no_timeout;
486 }
487 template <typename Integer>
489 template <typename Integer>
491 
498  std::queue<sem_t*> waiters_;
499 
500  DeterministicMutex() = default;
501  ~DeterministicMutex() = default;
502  DeterministicMutex(DeterministicMutex const&) = delete;
503  DeterministicMutex& operator=(DeterministicMutex const&) = delete;
504 
505  void lock() {
506  FOLLY_TEST_DSCHED_VLOG(this << ".lock()");
508  while (!m.try_lock()) {
510  if (sem) {
511  waiters_.push(sem);
512  }
514  // Wait to be scheduled by unlock
516  }
518  }
519 
520  bool try_lock() {
522  bool rv = m.try_lock();
523  FOLLY_TEST_DSCHED_VLOG(this << ".try_lock() -> " << rv);
525  return rv;
526  }
527 
528  void unlock() {
529  FOLLY_TEST_DSCHED_VLOG(this << ".unlock()");
530  m.unlock();
532  if (!waiters_.empty()) {
533  sem_t* sem = waiters_.front();
535  waiters_.pop();
536  }
538  }
539 };
540 } // namespace test
541 
542 template <>
544 
545 } // namespace folly
static FOLLY_TLS unsigned tls_threadId
std::function< size_t(size_t)> scheduler_
auto v
#define FOLLY_TEST_DSCHED_VLOG(...)
T exchange(T v, std::memory_order mo=std::memory_order_seq_cst) noexcept
bool compare_exchange_strong(T &v0, T v1, std::memory_order success, std::memory_order failure) noexcept
static const int seed
constexpr DeterministicAtomicImpl(T v) noexcept
static FOLLY_TLS DeterministicSchedule * tls_sched
Atom< std::uint32_t > Futex
Definition: Futex.h:51
detail::FutexResult futexWaitImpl(const detail::Futex< ManualAtomic > *, uint32_t, std::chrono::system_clock::time_point const *, std::chrono::steady_clock::time_point const *, uint32_t)
T fetch_xor(T v, std::memory_order mo=std::memory_order_seq_cst) noexcept
#define SCOPE_EXIT
Definition: ScopeGuard.h:274
T load(std::memory_order mo=std::memory_order_seq_cst) const noexcept
folly::std T
internal::ArgsMatcher< InnerMatcher > Args(const InnerMatcher &matcher)
static std::thread thread(Func &&func, Args &&...args)
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
requires E e noexcept(noexcept(s.error(std::move(e))))
std::shared_ptr< folly::FunctionScheduler > scheduler
Definition: FilePoller.cpp:50
std::cv_status atomic_wait_until(const ManualAtomic< std::uintptr_t > *, std::uintptr_t, const std::chrono::time_point< Clock, Duration > &)
DeterministicSchedule(const std::function< size_t(size_t)> &scheduler)
T fetch_and(T v, std::memory_order mo=std::memory_order_seq_cst) noexcept
static std::function< size_t(size_t)> uniform(uint64_t seed)
Function< void()> Func
Definition: Executor.h:27
std::unordered_set< std::thread::id > active_
std::memory_order default_failure_memory_order(std::memory_order successMode)
Definition: AtomicUtils.h:24
std::function< void(uint64_t)> AuxChk
#define Atom
void atomic_notify_all(const DeterministicAtomic< Integer > *)
int(* Func)(unsigned *cpu, unsigned *node, void *unused)
Function pointer to a function with the same signature as getcpu(2).
static map< string, int > m
char a
int futexWakeImpl(const detail::Futex< ManualAtomic > *, int, uint32_t)
bool compare_exchange_strong(T &v0, T v1, std::memory_order mo=std::memory_order_seq_cst) noexcept
static std::function< size_t(size_t)> uniformSubset(uint64_t seed, size_t n=2, size_t m=64)
int * count
std::function< void(bool)> AuxAct
T fetch_sub(T v, std::memory_order mo=std::memory_order_seq_cst) noexcept
bool compare_exchange_weak(T &v0, T v1, std::memory_order mo=std::memory_order_seq_cst) noexcept
void store(T v, std::memory_order mo=std::memory_order_seq_cst) noexcept
std::mutex mutex
T fetch_add(T v, std::memory_order mo=std::memory_order_seq_cst) noexcept
const
Definition: upload.py:398
void atomic_wait(const DeterministicAtomic< Integer > *, Integer)
folly::Function< void()> child
Definition: AtFork.cpp:35
static void join(std::thread &child)
T fetch_or(T v, std::memory_order mo=std::memory_order_seq_cst) noexcept
std::unordered_map< std::thread::id, sem_t * > joins_
static thread_local AuxAct tls_aux_act
void atomic_notify_one(const ManualAtomic< std::uintptr_t > *)
static Getcpu::Func pickGetcpuFunc()
Returns the best getcpu implementation for Atom.
static int getcpu(unsigned *cpu, unsigned *node, void *unused)
StringPiece data_
bool compare_exchange_weak(T &v0, T v1, std::memory_order success, std::memory_order failure) noexcept