26 using std::chrono::milliseconds;
27 using std::chrono::steady_clock;
33 struct ConsistentDelayFunctor {
36 explicit ConsistentDelayFunctor(milliseconds interval)
37 : constInterval(interval) {
38 if (interval < milliseconds::zero()) {
39 throw std::invalid_argument(
41 "time interval must be non-negative");
45 steady_clock::time_point operator()(
46 steady_clock::time_point curNextRunTime,
47 steady_clock::time_point curTime)
const {
48 auto intervalsPassed = (curTime - curNextRunTime) / constInterval;
49 return (intervalsPassed + 1) * constInterval + curNextRunTime;
53 struct ConstIntervalFunctor {
56 explicit ConstIntervalFunctor(milliseconds interval)
58 if (interval < milliseconds::zero()) {
59 throw std::invalid_argument(
61 "time interval must be non-negative");
65 milliseconds operator()()
const {
70 struct PoissonDistributionFunctor {
74 explicit PoissonDistributionFunctor(
double meanPoissonMs)
75 : poissonRandom(meanPoissonMs) {
76 if (meanPoissonMs < 0.0) {
77 throw std::invalid_argument(
79 "Poisson mean interval must be non-negative");
83 milliseconds operator()() {
88 struct UniformDistributionFunctor {
90 std::uniform_int_distribution<milliseconds::rep>
dist;
92 UniformDistributionFunctor(milliseconds minInterval, milliseconds maxInterval)
93 : generator(
Random::rand32()),
94 dist(minInterval.
count(), maxInterval.
count()) {
95 if (minInterval > maxInterval) {
96 throw std::invalid_argument(
98 "min time interval must be less or equal than max interval");
100 if (minInterval < milliseconds::zero()) {
101 throw std::invalid_argument(
102 "FunctionScheduler: " 103 "time interval must be non-negative");
107 milliseconds operator()() {
108 return milliseconds(
dist(generator));
123 milliseconds interval,
125 milliseconds startDelay) {
128 ConstIntervalFunctor(interval),
130 to<std::string>(interval.count(),
"ms"),
137 milliseconds interval,
140 milliseconds startDelay) {
144 PoissonDistributionFunctor(latencyDistr.
poissonMean),
146 to<std::string>(latencyDistr.
poissonMean,
"ms (Poisson mean)"),
150 addFunction(
std::move(cb), interval, nameID, startDelay);
157 milliseconds startDelay) {
160 ConstIntervalFunctor(milliseconds::zero()),
169 milliseconds minInterval,
170 milliseconds maxInterval,
172 milliseconds startDelay) {
175 UniformDistributionFunctor(minInterval, maxInterval),
178 "[", minInterval.count(),
" , ", maxInterval.count(),
"] ms"),
185 milliseconds interval,
187 milliseconds startDelay) {
190 ConsistentDelayFunctor(interval),
192 to<std::string>(interval.count(),
"ms"),
202 milliseconds startDelay) {
217 milliseconds startDelay) {
227 template <
typename RepeatFuncNextRunTimeFunc>
230 RepeatFuncNextRunTimeFunc&& fn,
233 milliseconds startDelay,
236 throw std::invalid_argument(
237 "FunctionScheduler: Scheduled function must be set");
240 throw std::invalid_argument(
241 "FunctionScheduler: " 242 "interval distribution or next run time function must be set");
244 if (startDelay < milliseconds::zero()) {
245 throw std::invalid_argument(
246 "FunctionScheduler: start delay must be non-negative");
249 std::unique_lock<std::mutex> l(
mutex_);
250 auto it = functionsMap_.find(nameID);
252 if (it != functionsMap_.end() && it->second->isValid()) {
253 throw std::invalid_argument(to<std::string>(
254 "FunctionScheduler: a function named \"", nameID,
"\" already exists"));
257 if (currentFunction_ && currentFunction_->name == nameID) {
258 throw std::invalid_argument(to<std::string>(
259 "FunctionScheduler: a function named \"", nameID,
"\" already exists"));
264 std::make_unique<RepeatFunc>(
266 std::forward<RepeatFuncNextRunTimeFunc>(fn),
278 milliseconds startDelay,
280 return addFunctionToHeapChecked(
289 milliseconds startDelay,
291 return addFunctionToHeapChecked(
296 std::unique_lock<std::mutex>&
lock,
298 CHECK_EQ(lock.owns_lock(),
true);
299 if (currentFunction_ && currentFunction_->name == nameID) {
300 functionsMap_.erase(currentFunction_->name);
303 currentFunction_ =
nullptr;
304 cancellingCurrentFunction_ =
true;
311 std::unique_lock<std::mutex> l(
mutex_);
312 if (cancelFunctionWithLock(l, nameID)) {
315 auto it = functionsMap_.find(nameID);
316 if (it != functionsMap_.end() && it->second->isValid()) {
317 cancelFunction(l, it->second);
325 std::unique_lock<std::mutex> l(
mutex_);
327 if (cancelFunctionWithLock(l, nameID)) {
328 runningCondvar_.wait(l, [
this]() {
return !cancellingCurrentFunction_; });
332 auto it = functionsMap_.find(nameID);
333 if (it != functionsMap_.end() && it->second->isValid()) {
334 cancelFunction(l, it->second);
341 const std::unique_lock<std::mutex>& l,
344 DCHECK(l.mutex() == &
mutex_);
345 DCHECK(l.owns_lock());
346 functionsMap_.erase(it->
name);
351 std::unique_lock<std::mutex>&
lock) {
352 CHECK_EQ(lock.owns_lock(),
true);
354 functionsMap_.clear();
355 if (currentFunction_) {
356 cancellingCurrentFunction_ =
true;
358 currentFunction_ =
nullptr;
359 return cancellingCurrentFunction_;
363 std::unique_lock<std::mutex> l(
mutex_);
364 cancelAllFunctionsWithLock(l);
368 std::unique_lock<std::mutex> l(
mutex_);
369 if (cancelAllFunctionsWithLock(l)) {
370 runningCondvar_.wait(l, [
this]() {
return !cancellingCurrentFunction_; });
375 std::unique_lock<std::mutex> l(
mutex_);
376 if (currentFunction_ && currentFunction_->name == nameID) {
377 if (cancellingCurrentFunction_ || currentFunction_->runOnce) {
387 auto it = functionsMap_.find(nameID);
388 if (it != functionsMap_.end() && it->second->isValid()) {
391 std::make_heap(functions_.begin(), functions_.end(), fnCmp_);
392 runningCondvar_.notify_one();
400 std::unique_lock<std::mutex> l(
mutex_);
405 VLOG(1) <<
"Starting FunctionScheduler with " << functions_.size()
410 for (
const auto&
f : functions_) {
411 f->resetNextRunTime(
now);
412 VLOG(1) <<
" - func: " << (
f->name.empty() ?
"(anon)" :
f->name.c_str())
413 <<
", period = " <<
f->intervalDescr
414 <<
", delay = " <<
f->startDelay.count() <<
"ms";
416 std::make_heap(functions_.begin(), functions_.end(), fnCmp_);
418 thread_ = std::thread([&] { this->
run(); });
426 std::lock_guard<std::mutex>
g(
mutex_);
432 runningCondvar_.notify_one();
441 if (!threadName_.empty()) {
448 if (functions_.empty()) {
449 runningCondvar_.wait(lock);
456 std::pop_heap(functions_.begin(), functions_.end(), fnCmp_);
460 if (!functions_.back()->isValid()) {
461 functions_.pop_back();
465 auto sleepTime = functions_.back()->getNextRunTime() -
now;
466 if (sleepTime < milliseconds::zero()) {
468 runOneFunction(lock,
now);
469 runningCondvar_.notify_all();
473 std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
474 runningCondvar_.wait_for(lock, sleepTime);
480 std::unique_lock<std::mutex>&
lock,
481 steady_clock::time_point
now) {
482 DCHECK(lock.mutex() == &
mutex_);
483 DCHECK(lock.owns_lock());
490 auto func =
std::move(functions_.back());
491 functions_.pop_back();
493 VLOG(5) << func->name <<
"function has been canceled while waiting";
496 currentFunction_ = func.get();
500 func->setNextRunTimeSteady();
507 func->setNextRunTimeStrict(now);
515 VLOG(5) <<
"Now running " << func->name;
517 }
catch (
const std::exception& ex) {
518 LOG(ERROR) <<
"Error running the scheduled function <" << func->name
525 if (!currentFunction_) {
528 cancellingCurrentFunction_ =
false;
531 if (currentFunction_->runOnce) {
533 functionsMap_.erase(currentFunction_->name);
534 currentFunction_ =
nullptr;
544 currentFunction_ =
nullptr;
547 std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
552 const std::unique_lock<std::mutex>&
lock,
553 std::unique_ptr<RepeatFunc> func) {
555 DCHECK(lock.mutex() == &
mutex_);
556 DCHECK(lock.owns_lock());
559 functionsMap_[functions_.back()->name] = functions_.back().get();
562 std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
565 runningCondvar_.notify_one();
570 std::unique_lock<std::mutex> l(
mutex_);
571 threadName_ = threadName.
str();
void addFunctionOnce(Function< void()> &&cb, StringPiece nameID=StringPiece(), std::chrono::milliseconds startDelay=std::chrono::milliseconds(0))
void addFunction(Function< void()> &&cb, std::chrono::milliseconds interval, StringPiece nameID=StringPiece(), std::chrono::milliseconds startDelay=std::chrono::milliseconds(0))
std::default_random_engine generator
bool cancelFunctionWithLock(std::unique_lock< std::mutex > &lock, StringPiece nameID)
void addFunctionToHeap(const std::unique_lock< std::mutex > &lock, std::unique_ptr< RepeatFunc > func)
void addFunctionGenericNextRunTimeFunctor(Function< void()> &&cb, NextRunTimeFunc &&fn, const std::string &nameID, const std::string &intervalDescr, std::chrono::milliseconds startDelay)
fbstring exceptionStr(const std::exception &e)
std::chrono::steady_clock::time_point now()
constexpr detail::Map< Move > move
void addFunctionToHeapChecked(Function< void()> &&cb, RepeatFuncNextRunTimeFunc &&fn, const std::string &nameID, const std::string &intervalDescr, std::chrono::milliseconds startDelay, bool runOnce)
—— Concurrent Priority Queue Implementation ——
void cancelAllFunctions()
void runOneFunction(std::unique_lock< std::mutex > &lock, std::chrono::steady_clock::time_point now)
bool resetFunctionTimer(StringPiece nameID)
bool cancelFunction(StringPiece nameID)
static void run(EventBaseManager *ebm, EventBase *eb, folly::Baton<> *stop, const StringPiece &name)
void cancelAllFunctionsAndWait()
auto lock(Synchronized< D, M > &synchronized, Args &&...args)
std::uniform_int_distribution< milliseconds::rep > dist
const milliseconds constInterval
void addFunctionInternal(Function< void()> &&cb, NextRunTimeFunc &&fn, const std::string &nameID, const std::string &intervalDescr, std::chrono::milliseconds startDelay, bool runOnce)
bool setThreadName(std::thread::id tid, StringPiece name)
void addFunctionUniformDistribution(Function< void()> &&cb, std::chrono::milliseconds minInterval, std::chrono::milliseconds maxInterval, StringPiece nameID, std::chrono::milliseconds startDelay)
bool cancelAllFunctionsWithLock(std::unique_lock< std::mutex > &lock)
bool cancelFunctionAndWait(StringPiece nameID)
std::poisson_distribution< int > poissonRandom
void addFunctionConsistentDelay(Function< void()> &&cb, std::chrono::milliseconds interval, StringPiece nameID=StringPiece(), std::chrono::milliseconds startDelay=std::chrono::milliseconds(0))
void setThreadName(StringPiece threadName)
void addFunctionGenericDistribution(Function< void()> &&cb, IntervalDistributionFunc &&intervalFunc, const std::string &nameID, const std::string &intervalDescr, std::chrono::milliseconds startDelay)