proxygen
FunctionScheduler.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2015-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include <random>
20 
21 #include <folly/Conv.h>
22 #include <folly/Random.h>
23 #include <folly/String.h>
25 
26 using std::chrono::milliseconds;
27 using std::chrono::steady_clock;
28 
29 namespace folly {
30 
31 namespace {
32 
33 struct ConsistentDelayFunctor {
34  const milliseconds constInterval;
35 
36  explicit ConsistentDelayFunctor(milliseconds interval)
37  : constInterval(interval) {
38  if (interval < milliseconds::zero()) {
39  throw std::invalid_argument(
40  "FunctionScheduler: "
41  "time interval must be non-negative");
42  }
43  }
44 
45  steady_clock::time_point operator()(
46  steady_clock::time_point curNextRunTime,
47  steady_clock::time_point curTime) const {
48  auto intervalsPassed = (curTime - curNextRunTime) / constInterval;
49  return (intervalsPassed + 1) * constInterval + curNextRunTime;
50  }
51 };
52 
53 struct ConstIntervalFunctor {
54  const milliseconds constInterval;
55 
56  explicit ConstIntervalFunctor(milliseconds interval)
57  : constInterval(interval) {
58  if (interval < milliseconds::zero()) {
59  throw std::invalid_argument(
60  "FunctionScheduler: "
61  "time interval must be non-negative");
62  }
63  }
64 
65  milliseconds operator()() const {
66  return constInterval;
67  }
68 };
69 
70 struct PoissonDistributionFunctor {
71  std::default_random_engine generator;
72  std::poisson_distribution<int> poissonRandom;
73 
74  explicit PoissonDistributionFunctor(double meanPoissonMs)
75  : poissonRandom(meanPoissonMs) {
76  if (meanPoissonMs < 0.0) {
77  throw std::invalid_argument(
78  "FunctionScheduler: "
79  "Poisson mean interval must be non-negative");
80  }
81  }
82 
83  milliseconds operator()() {
84  return milliseconds(poissonRandom(generator));
85  }
86 };
87 
88 struct UniformDistributionFunctor {
89  std::default_random_engine generator;
90  std::uniform_int_distribution<milliseconds::rep> dist;
91 
92  UniformDistributionFunctor(milliseconds minInterval, milliseconds maxInterval)
93  : generator(Random::rand32()),
94  dist(minInterval.count(), maxInterval.count()) {
95  if (minInterval > maxInterval) {
96  throw std::invalid_argument(
97  "FunctionScheduler: "
98  "min time interval must be less or equal than max interval");
99  }
100  if (minInterval < milliseconds::zero()) {
101  throw std::invalid_argument(
102  "FunctionScheduler: "
103  "time interval must be non-negative");
104  }
105  }
106 
107  milliseconds operator()() {
108  return milliseconds(dist(generator));
109  }
110 };
111 
112 } // namespace
113 
115 
117  // make sure to stop the thread (if running)
118  shutdown();
119 }
120 
122  Function<void()>&& cb,
123  milliseconds interval,
124  StringPiece nameID,
125  milliseconds startDelay) {
126  addFunctionInternal(
127  std::move(cb),
128  ConstIntervalFunctor(interval),
129  nameID.str(),
130  to<std::string>(interval.count(), "ms"),
131  startDelay,
132  false /*runOnce*/);
133 }
134 
136  Function<void()>&& cb,
137  milliseconds interval,
138  const LatencyDistribution& latencyDistr,
139  StringPiece nameID,
140  milliseconds startDelay) {
141  if (latencyDistr.isPoisson) {
142  addFunctionInternal(
143  std::move(cb),
144  PoissonDistributionFunctor(latencyDistr.poissonMean),
145  nameID.str(),
146  to<std::string>(latencyDistr.poissonMean, "ms (Poisson mean)"),
147  startDelay,
148  false /*runOnce*/);
149  } else {
150  addFunction(std::move(cb), interval, nameID, startDelay);
151  }
152 }
153 
155  Function<void()>&& cb,
156  StringPiece nameID,
157  milliseconds startDelay) {
158  addFunctionInternal(
159  std::move(cb),
160  ConstIntervalFunctor(milliseconds::zero()),
161  nameID.str(),
162  "once",
163  startDelay,
164  true /*runOnce*/);
165 }
166 
168  Function<void()>&& cb,
169  milliseconds minInterval,
170  milliseconds maxInterval,
171  StringPiece nameID,
172  milliseconds startDelay) {
173  addFunctionInternal(
174  std::move(cb),
175  UniformDistributionFunctor(minInterval, maxInterval),
176  nameID.str(),
177  to<std::string>(
178  "[", minInterval.count(), " , ", maxInterval.count(), "] ms"),
179  startDelay,
180  false /*runOnce*/);
181 }
182 
184  Function<void()>&& cb,
185  milliseconds interval,
186  StringPiece nameID,
187  milliseconds startDelay) {
188  addFunctionInternal(
189  std::move(cb),
190  ConsistentDelayFunctor(interval),
191  nameID.str(),
192  to<std::string>(interval.count(), "ms"),
193  startDelay,
194  false /*runOnce*/);
195 }
196 
198  Function<void()>&& cb,
199  IntervalDistributionFunc&& intervalFunc,
200  const std::string& nameID,
201  const std::string& intervalDescr,
202  milliseconds startDelay) {
203  addFunctionInternal(
204  std::move(cb),
205  std::move(intervalFunc),
206  nameID,
207  intervalDescr,
208  startDelay,
209  false /*runOnce*/);
210 }
211 
213  Function<void()>&& cb,
214  NextRunTimeFunc&& fn,
215  const std::string& nameID,
216  const std::string& intervalDescr,
217  milliseconds startDelay) {
218  addFunctionInternal(
219  std::move(cb),
220  std::move(fn),
221  nameID,
222  intervalDescr,
223  startDelay,
224  false /*runOnce*/);
225 }
226 
227 template <typename RepeatFuncNextRunTimeFunc>
229  Function<void()>&& cb,
230  RepeatFuncNextRunTimeFunc&& fn,
231  const std::string& nameID,
232  const std::string& intervalDescr,
233  milliseconds startDelay,
234  bool runOnce) {
235  if (!cb) {
236  throw std::invalid_argument(
237  "FunctionScheduler: Scheduled function must be set");
238  }
239  if (!fn) {
240  throw std::invalid_argument(
241  "FunctionScheduler: "
242  "interval distribution or next run time function must be set");
243  }
244  if (startDelay < milliseconds::zero()) {
245  throw std::invalid_argument(
246  "FunctionScheduler: start delay must be non-negative");
247  }
248 
249  std::unique_lock<std::mutex> l(mutex_);
250  auto it = functionsMap_.find(nameID);
251  // check if the nameID is unique
252  if (it != functionsMap_.end() && it->second->isValid()) {
253  throw std::invalid_argument(to<std::string>(
254  "FunctionScheduler: a function named \"", nameID, "\" already exists"));
255  }
256 
257  if (currentFunction_ && currentFunction_->name == nameID) {
258  throw std::invalid_argument(to<std::string>(
259  "FunctionScheduler: a function named \"", nameID, "\" already exists"));
260  }
261 
262  addFunctionToHeap(
263  l,
264  std::make_unique<RepeatFunc>(
265  std::move(cb),
266  std::forward<RepeatFuncNextRunTimeFunc>(fn),
267  nameID,
268  intervalDescr,
269  startDelay,
270  runOnce));
271 }
272 
274  Function<void()>&& cb,
275  NextRunTimeFunc&& fn,
276  const std::string& nameID,
277  const std::string& intervalDescr,
278  milliseconds startDelay,
279  bool runOnce) {
280  return addFunctionToHeapChecked(
281  std::move(cb), std::move(fn), nameID, intervalDescr, startDelay, runOnce);
282 }
283 
285  Function<void()>&& cb,
287  const std::string& nameID,
288  const std::string& intervalDescr,
289  milliseconds startDelay,
290  bool runOnce) {
291  return addFunctionToHeapChecked(
292  std::move(cb), std::move(fn), nameID, intervalDescr, startDelay, runOnce);
293 }
294 
296  std::unique_lock<std::mutex>& lock,
297  StringPiece nameID) {
298  CHECK_EQ(lock.owns_lock(), true);
299  if (currentFunction_ && currentFunction_->name == nameID) {
300  functionsMap_.erase(currentFunction_->name);
301  // This function is currently being run. Clear currentFunction_
302  // The running thread will see this and won't reschedule the function.
303  currentFunction_ = nullptr;
304  cancellingCurrentFunction_ = true;
305  return true;
306  }
307  return false;
308 }
309 
311  std::unique_lock<std::mutex> l(mutex_);
312  if (cancelFunctionWithLock(l, nameID)) {
313  return true;
314  }
315  auto it = functionsMap_.find(nameID);
316  if (it != functionsMap_.end() && it->second->isValid()) {
317  cancelFunction(l, it->second);
318  return true;
319  }
320 
321  return false;
322 }
323 
325  std::unique_lock<std::mutex> l(mutex_);
326 
327  if (cancelFunctionWithLock(l, nameID)) {
328  runningCondvar_.wait(l, [this]() { return !cancellingCurrentFunction_; });
329  return true;
330  }
331 
332  auto it = functionsMap_.find(nameID);
333  if (it != functionsMap_.end() && it->second->isValid()) {
334  cancelFunction(l, it->second);
335  return true;
336  }
337  return false;
338 }
339 
341  const std::unique_lock<std::mutex>& l,
342  RepeatFunc* it) {
343  // This function should only be called with mutex_ already locked.
344  DCHECK(l.mutex() == &mutex_);
345  DCHECK(l.owns_lock());
346  functionsMap_.erase(it->name);
347  it->cancel();
348 }
349 
351  std::unique_lock<std::mutex>& lock) {
352  CHECK_EQ(lock.owns_lock(), true);
353  functions_.clear();
354  functionsMap_.clear();
355  if (currentFunction_) {
356  cancellingCurrentFunction_ = true;
357  }
358  currentFunction_ = nullptr;
359  return cancellingCurrentFunction_;
360 }
361 
363  std::unique_lock<std::mutex> l(mutex_);
364  cancelAllFunctionsWithLock(l);
365 }
366 
368  std::unique_lock<std::mutex> l(mutex_);
369  if (cancelAllFunctionsWithLock(l)) {
370  runningCondvar_.wait(l, [this]() { return !cancellingCurrentFunction_; });
371  }
372 }
373 
375  std::unique_lock<std::mutex> l(mutex_);
376  if (currentFunction_ && currentFunction_->name == nameID) {
377  if (cancellingCurrentFunction_ || currentFunction_->runOnce) {
378  return false;
379  }
380  currentFunction_->resetNextRunTime(steady_clock::now());
381  return true;
382  }
383 
384  // Since __adjust_heap() isn't a part of the standard API, there's no way to
385  // fix the heap ordering if we adjust the key (nextRunTime) for the existing
386  // RepeatFunc. Instead, we just cancel it and add an identical object.
387  auto it = functionsMap_.find(nameID);
388  if (it != functionsMap_.end() && it->second->isValid()) {
389  if (running_) {
390  it->second->resetNextRunTime(steady_clock::now());
391  std::make_heap(functions_.begin(), functions_.end(), fnCmp_);
392  runningCondvar_.notify_one();
393  }
394  return true;
395  }
396  return false;
397 }
398 
400  std::unique_lock<std::mutex> l(mutex_);
401  if (running_) {
402  return false;
403  }
404 
405  VLOG(1) << "Starting FunctionScheduler with " << functions_.size()
406  << " functions.";
407  auto now = steady_clock::now();
408  // Reset the next run time. for all functions.
409  // note: this is needed since one can shutdown() and start() again
410  for (const auto& f : functions_) {
411  f->resetNextRunTime(now);
412  VLOG(1) << " - func: " << (f->name.empty() ? "(anon)" : f->name.c_str())
413  << ", period = " << f->intervalDescr
414  << ", delay = " << f->startDelay.count() << "ms";
415  }
416  std::make_heap(functions_.begin(), functions_.end(), fnCmp_);
417 
418  thread_ = std::thread([&] { this->run(); });
419  running_ = true;
420 
421  return true;
422 }
423 
425  {
426  std::lock_guard<std::mutex> g(mutex_);
427  if (!running_) {
428  return false;
429  }
430 
431  running_ = false;
432  runningCondvar_.notify_one();
433  }
434  thread_.join();
435  return true;
436 }
437 
439  std::unique_lock<std::mutex> lock(mutex_);
440 
441  if (!threadName_.empty()) {
442  folly::setThreadName(threadName_);
443  }
444 
445  while (running_) {
446  // If we have nothing to run, wait until a function is added or until we
447  // are stopped.
448  if (functions_.empty()) {
449  runningCondvar_.wait(lock);
450  continue;
451  }
452 
453  auto now = steady_clock::now();
454 
455  // Move the next function to run to the end of functions_
456  std::pop_heap(functions_.begin(), functions_.end(), fnCmp_);
457 
458  // Check to see if the function was cancelled.
459  // If so, just remove it and continue around the loop.
460  if (!functions_.back()->isValid()) {
461  functions_.pop_back();
462  continue;
463  }
464 
465  auto sleepTime = functions_.back()->getNextRunTime() - now;
466  if (sleepTime < milliseconds::zero()) {
467  // We need to run this function now
468  runOneFunction(lock, now);
469  runningCondvar_.notify_all();
470  } else {
471  // Re-add the function to the heap, and wait until we actually
472  // need to run it.
473  std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
474  runningCondvar_.wait_for(lock, sleepTime);
475  }
476  }
477 }
478 
480  std::unique_lock<std::mutex>& lock,
481  steady_clock::time_point now) {
482  DCHECK(lock.mutex() == &mutex_);
483  DCHECK(lock.owns_lock());
484 
485  // The function to run will be at the end of functions_ already.
486  //
487  // Fully remove it from functions_ now.
488  // We need to release mutex_ while we invoke this function, and we need to
489  // maintain the heap property on functions_ while mutex_ is unlocked.
490  auto func = std::move(functions_.back());
491  functions_.pop_back();
492  if (!func->cb) {
493  VLOG(5) << func->name << "function has been canceled while waiting";
494  return;
495  }
496  currentFunction_ = func.get();
497  // Update the function's next run time.
498  if (steady_) {
499  // This allows scheduler to catch up
500  func->setNextRunTimeSteady();
501  } else {
502  // Note that we set nextRunTime based on the current time where we started
503  // the function call, rather than the time when the function finishes.
504  // This ensures that we call the function once every time interval, as
505  // opposed to waiting time interval seconds between calls. (These can be
506  // different if the function takes a significant amount of time to run.)
507  func->setNextRunTimeStrict(now);
508  }
509 
510  // Release the lock while we invoke the user's function
511  lock.unlock();
512 
513  // Invoke the function
514  try {
515  VLOG(5) << "Now running " << func->name;
516  func->cb();
517  } catch (const std::exception& ex) {
518  LOG(ERROR) << "Error running the scheduled function <" << func->name
519  << ">: " << exceptionStr(ex);
520  }
521 
522  // Re-acquire the lock
523  lock.lock();
524 
525  if (!currentFunction_) {
526  // The function was cancelled while we were running it.
527  // We shouldn't reschedule it;
528  cancellingCurrentFunction_ = false;
529  return;
530  }
531  if (currentFunction_->runOnce) {
532  // Don't reschedule if the function only needed to run once.
533  functionsMap_.erase(currentFunction_->name);
534  currentFunction_ = nullptr;
535  return;
536  }
537 
538  // Re-insert the function into our functions_ heap.
539  // We only maintain the heap property while running_ is set. (running_ may
540  // have been cleared while we were invoking the user's function.)
541  functions_.push_back(std::move(func));
542 
543  // Clear currentFunction_
544  currentFunction_ = nullptr;
545 
546  if (running_) {
547  std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
548  }
549 }
550 
552  const std::unique_lock<std::mutex>& lock,
553  std::unique_ptr<RepeatFunc> func) {
554  // This function should only be called with mutex_ already locked.
555  DCHECK(lock.mutex() == &mutex_);
556  DCHECK(lock.owns_lock());
557 
558  functions_.push_back(std::move(func));
559  functionsMap_[functions_.back()->name] = functions_.back().get();
560  if (running_) {
561  functions_.back()->resetNextRunTime(steady_clock::now());
562  std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
563  // Signal the running thread to wake up and see if it needs to change
564  // its current scheduling decision.
565  runningCondvar_.notify_one();
566  }
567 }
568 
570  std::unique_lock<std::mutex> l(mutex_);
571  threadName_ = threadName.str();
572 }
573 
574 } // namespace folly
void addFunctionOnce(Function< void()> &&cb, StringPiece nameID=StringPiece(), std::chrono::milliseconds startDelay=std::chrono::milliseconds(0))
void addFunction(Function< void()> &&cb, std::chrono::milliseconds interval, StringPiece nameID=StringPiece(), std::chrono::milliseconds startDelay=std::chrono::milliseconds(0))
auto f
std::string str() const
Definition: Range.h:591
std::default_random_engine generator
bool cancelFunctionWithLock(std::unique_lock< std::mutex > &lock, StringPiece nameID)
void addFunctionToHeap(const std::unique_lock< std::mutex > &lock, std::unique_ptr< RepeatFunc > func)
void addFunctionGenericNextRunTimeFunctor(Function< void()> &&cb, NextRunTimeFunc &&fn, const std::string &nameID, const std::string &intervalDescr, std::chrono::milliseconds startDelay)
fbstring exceptionStr(const std::exception &e)
std::chrono::steady_clock::time_point now()
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
void addFunctionToHeapChecked(Function< void()> &&cb, RepeatFuncNextRunTimeFunc &&fn, const std::string &nameID, const std::string &intervalDescr, std::chrono::milliseconds startDelay, bool runOnce)
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
void runOneFunction(std::unique_lock< std::mutex > &lock, std::chrono::steady_clock::time_point now)
std::mutex mutex_
bool resetFunctionTimer(StringPiece nameID)
bool cancelFunction(StringPiece nameID)
static void run(EventBaseManager *ebm, EventBase *eb, folly::Baton<> *stop, const StringPiece &name)
auto lock(Synchronized< D, M > &synchronized, Args &&...args)
std::uniform_int_distribution< milliseconds::rep > dist
void shutdown(Counter &)
const milliseconds constInterval
void addFunctionInternal(Function< void()> &&cb, NextRunTimeFunc &&fn, const std::string &nameID, const std::string &intervalDescr, std::chrono::milliseconds startDelay, bool runOnce)
int * count
bool setThreadName(std::thread::id tid, StringPiece name)
Definition: ThreadName.cpp:109
void addFunctionUniformDistribution(Function< void()> &&cb, std::chrono::milliseconds minInterval, std::chrono::milliseconds maxInterval, StringPiece nameID, std::chrono::milliseconds startDelay)
bool cancelAllFunctionsWithLock(std::unique_lock< std::mutex > &lock)
bool cancelFunctionAndWait(StringPiece nameID)
const char * string
Definition: Conv.cpp:212
g_t g(f_t)
std::poisson_distribution< int > poissonRandom
void addFunctionConsistentDelay(Function< void()> &&cb, std::chrono::milliseconds interval, StringPiece nameID=StringPiece(), std::chrono::milliseconds startDelay=std::chrono::milliseconds(0))
void setThreadName(StringPiece threadName)
void addFunctionGenericDistribution(Function< void()> &&cb, IntervalDistributionFunc &&intervalFunc, const std::string &nameID, const std::string &intervalDescr, std::chrono::milliseconds startDelay)