20 #include <boost/noncopyable.hpp> 22 #include <glog/logging.h> 28 #include <unordered_set> 42 #define FOLLY_TEST_DSCHED_VLOG(...) \ 45 VLOG(2) << std::hex << std::this_thread::get_id() << ": " \ 51 using AuxAct = std::function<void(bool)>;
52 using AuxChk = std::function<void(uint64_t)>;
83 const std::function<
size_t(
size_t)>&
scheduler);
103 static std::function<size_t(size_t)>
122 template <
typename Func,
typename...
Args>
127 auto child = std::thread(
130 sched->afterThreadCreate(sem);
131 beforeSharedAccess();
132 FOLLY_TEST_DSCHED_VLOG(
"running");
137 sched->beforeThreadExit();
145 sched->active_.insert(
child.get_id());
156 static void post(sem_t* sem);
160 static bool tryWait(sem_t* sem);
163 static void wait(sem_t* sem);
170 static int getcpu(
unsigned* cpu,
unsigned* node,
void* unused);
204 std::unordered_map<std::thread::id, sem_t*>
joins_;
228 template <
typename>
class Atom = std::atomic>
239 return data_.is_lock_free();
245 std::memory_order mo = std::memory_order_seq_cst)
noexcept {
246 return compare_exchange_strong(
252 std::memory_order success,
253 std::memory_order failure)
noexcept {
254 Schedule::beforeSharedAccess();
256 bool rv =
data_.compare_exchange_strong(v0, v1,
success, failure);
258 this <<
".compare_exchange_strong(" << std::hex << orig <<
", " 259 << std::hex << v1 <<
") -> " << rv <<
"," << std::hex << v0);
260 Schedule::afterSharedAccess(rv);
267 std::memory_order mo = std::memory_order_seq_cst)
noexcept {
268 return compare_exchange_weak(
274 std::memory_order success,
275 std::memory_order failure)
noexcept {
276 Schedule::beforeSharedAccess();
278 bool rv =
data_.compare_exchange_weak(v0, v1,
success, failure);
280 this <<
".compare_exchange_weak(" << std::hex << orig <<
", " 281 << std::hex << v1 <<
") -> " << rv <<
"," << std::hex << v0);
282 Schedule::afterSharedAccess(rv);
287 Schedule::beforeSharedAccess();
288 T rv =
data_.exchange(v, mo);
290 this <<
".exchange(" << std::hex << v <<
") -> " << std::hex << rv);
291 Schedule::afterSharedAccess(
true);
296 Schedule::beforeSharedAccess();
299 Schedule::afterSharedAccess(
true);
304 Schedule::beforeSharedAccess();
307 Schedule::afterSharedAccess(
true);
312 Schedule::beforeSharedAccess();
315 Schedule::afterSharedAccess(
true);
320 Schedule::beforeSharedAccess();
323 Schedule::afterSharedAccess(
true);
327 Schedule::beforeSharedAccess();
330 Schedule::afterSharedAccess(
true);
335 Schedule::beforeSharedAccess();
338 Schedule::afterSharedAccess(
true);
343 Schedule::beforeSharedAccess();
346 Schedule::afterSharedAccess(
true);
351 Schedule::beforeSharedAccess();
354 Schedule::afterSharedAccess(
true);
359 Schedule::beforeSharedAccess();
362 this <<
" += " << std::hex <<
v <<
" -> " << std::hex << rv);
363 Schedule::afterSharedAccess(
true);
368 Schedule::beforeSharedAccess();
369 T rv =
data_.fetch_add(v, mo);
371 this <<
".fetch_add(" << std::hex << v <<
") -> " << std::hex << rv);
372 Schedule::afterSharedAccess(
true);
377 Schedule::beforeSharedAccess();
380 this <<
" -= " << std::hex <<
v <<
" -> " << std::hex << rv);
381 Schedule::afterSharedAccess(
true);
386 Schedule::beforeSharedAccess();
387 T rv =
data_.fetch_sub(v, mo);
389 this <<
".fetch_sub(" << std::hex << v <<
") -> " << std::hex << rv);
390 Schedule::afterSharedAccess(
true);
395 Schedule::beforeSharedAccess();
398 this <<
" &= " << std::hex <<
v <<
" -> " << std::hex << rv);
399 Schedule::afterSharedAccess(
true);
404 Schedule::beforeSharedAccess();
405 T rv =
data_.fetch_and(v, mo);
407 this <<
".fetch_and(" << std::hex << v <<
") -> " << std::hex << rv);
408 Schedule::afterSharedAccess(
true);
413 Schedule::beforeSharedAccess();
416 this <<
" |= " << std::hex <<
v <<
" -> " << std::hex << rv);
417 Schedule::afterSharedAccess(
true);
422 Schedule::beforeSharedAccess();
423 T rv =
data_.fetch_or(v, mo);
425 this <<
".fetch_or(" << std::hex << v <<
") -> " << std::hex << rv);
426 Schedule::afterSharedAccess(
true);
431 Schedule::beforeSharedAccess();
434 this <<
" ^= " << std::hex <<
v <<
" -> " << std::hex << rv);
435 Schedule::afterSharedAccess(
true);
440 Schedule::beforeSharedAccess();
441 T rv =
data_.fetch_xor(v, mo);
443 this <<
".fetch_xor(" << std::hex << v <<
") -> " << std::hex << rv);
444 Schedule::afterSharedAccess(
true);
450 return data_.load(std::memory_order_relaxed);
457 template <
typename T>
468 std::chrono::system_clock::time_point
const* absSystemTime,
469 std::chrono::steady_clock::time_point
const* absSteadyTime,
478 template <
typename Integer>
480 template <
typename Integer,
typename Clock,
typename Duration>
484 const std::chrono::time_point<Clock, Duration>&) {
485 return std::cv_status::no_timeout;
487 template <
typename Integer>
489 template <
typename Integer>
508 while (!m.try_lock()) {
522 bool rv = m.try_lock();
532 if (!waiters_.empty()) {
533 sem_t* sem = waiters_.front();
static sem_t * descheduleCurrentThread()
static void setAuxAct(AuxAct &aux)
static FOLLY_TLS unsigned tls_threadId
std::function< size_t(size_t)> scheduler_
T operator&=(T v) noexcept
#define FOLLY_TEST_DSCHED_VLOG(...)
T exchange(T v, std::memory_order mo=std::memory_order_seq_cst) noexcept
bool compare_exchange_strong(T &v0, T v1, std::memory_order success, std::memory_order failure) noexcept
static void beforeSharedAccess()
bool is_lock_free() const noexcept
static bool tryWait(sem_t *sem)
constexpr DeterministicAtomicImpl(T v) noexcept
static FOLLY_TLS DeterministicSchedule * tls_sched
Atom< std::uint32_t > Futex
detail::FutexResult futexWaitImpl(const detail::Futex< ManualAtomic > *, uint32_t, std::chrono::system_clock::time_point const *, std::chrono::steady_clock::time_point const *, uint32_t)
T fetch_xor(T v, std::memory_order mo=std::memory_order_seq_cst) noexcept
sem_t * beforeThreadCreate()
T load(std::memory_order mo=std::memory_order_seq_cst) const noexcept
static void wait(sem_t *sem)
internal::ArgsMatcher< InnerMatcher > Args(const InnerMatcher &matcher)
static void afterSharedAccess()
static std::thread thread(Func &&func, Args &&...args)
—— Concurrent Priority Queue Implementation ——
requires E e noexcept(noexcept(s.error(std::move(e))))
static void post(sem_t *sem)
std::shared_ptr< folly::FunctionScheduler > scheduler
std::cv_status atomic_wait_until(const ManualAtomic< std::uintptr_t > *, std::uintptr_t, const std::chrono::time_point< Clock, Duration > &)
DeterministicSchedule(const std::function< size_t(size_t)> &scheduler)
T operator-=(T v) noexcept
T fetch_and(T v, std::memory_order mo=std::memory_order_seq_cst) noexcept
void afterThreadCreate(sem_t *)
static std::function< size_t(size_t)> uniform(uint64_t seed)
static void setAuxChk(AuxChk &aux)
T operator=(T v) noexcept
T operator++(int) noexcept
std::unordered_set< std::thread::id > active_
static size_t getRandNumber(size_t n)
std::memory_order default_failure_memory_order(std::memory_order successMode)
std::function< void(uint64_t)> AuxChk
T operator|=(T v) noexcept
void atomic_notify_all(const DeterministicAtomic< Integer > *)
int(* Func)(unsigned *cpu, unsigned *node, void *unused)
Function pointer to a function with the same signature as getcpu(2).
static map< string, int > m
static void reschedule(sem_t *sem)
int futexWakeImpl(const detail::Futex< ManualAtomic > *, int, uint32_t)
std::queue< sem_t * > waiters_
bool compare_exchange_strong(T &v0, T v1, std::memory_order mo=std::memory_order_seq_cst) noexcept
T operator^=(T v) noexcept
static std::function< size_t(size_t)> uniformSubset(uint64_t seed, size_t n=2, size_t m=64)
std::function< void(bool)> AuxAct
T operator+=(T v) noexcept
T fetch_sub(T v, std::memory_order mo=std::memory_order_seq_cst) noexcept
bool compare_exchange_weak(T &v0, T v1, std::memory_order mo=std::memory_order_seq_cst) noexcept
void store(T v, std::memory_order mo=std::memory_order_seq_cst) noexcept
static FOLLY_TLS sem_t * tls_sem
T fetch_add(T v, std::memory_order mo=std::memory_order_seq_cst) noexcept
void atomic_wait(const DeterministicAtomic< Integer > *, Integer)
folly::Function< void()> child
static void join(std::thread &child)
T operator--(int) noexcept
T fetch_or(T v, std::memory_order mo=std::memory_order_seq_cst) noexcept
std::unordered_map< std::thread::id, sem_t * > joins_
static thread_local AuxAct tls_aux_act
void atomic_notify_one(const ManualAtomic< std::uintptr_t > *)
static Getcpu::Func pickGetcpuFunc()
Returns the best getcpu implementation for Atom.
std::vector< sem_t * > sems_
static int getcpu(unsigned *cpu, unsigned *node, void *unused)
T load_direct() const noexcept
static void clearAuxChk()
bool compare_exchange_weak(T &v0, T v1, std::memory_order success, std::memory_order failure) noexcept