proxygen
FunctionScheduler.h
Go to the documentation of this file.
1 /*
2  * Copyright 2015-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <folly/Function.h>
20 #include <folly/Range.h>
21 #include <folly/hash/Hash.h>
22 #include <chrono>
23 #include <condition_variable>
24 #include <mutex>
25 #include <thread>
26 #include <unordered_map>
27 #include <vector>
28 
29 namespace folly {
30 
55  public:
58 
68  void setSteady(bool steady) {
69  steady_ = steady;
70  }
71 
72  /*
73  * Parameters to control the function interval.
74  *
75  * If isPoisson is true, then use std::poisson_distribution to pick the
76  * interval between each invocation of the function.
77  *
78  * If isPoisson os false, then always use fixed the interval specified to
79  * addFunction().
80  */
82  bool isPoisson;
83  double poissonMean;
84 
85  LatencyDistribution(bool poisson, double mean)
86  : isPoisson(poisson), poissonMean(mean) {}
87  };
88 
100  void addFunction(
101  Function<void()>&& cb,
102  std::chrono::milliseconds interval,
103  StringPiece nameID = StringPiece(),
104  std::chrono::milliseconds startDelay = std::chrono::milliseconds(0));
105 
106  /*
107  * Add a new function to the FunctionScheduler with a specified
108  * LatencyDistribution
109  */
110  void addFunction(
111  Function<void()>&& cb,
112  std::chrono::milliseconds interval,
113  const LatencyDistribution& latencyDistr,
114  StringPiece nameID = StringPiece(),
115  std::chrono::milliseconds startDelay = std::chrono::milliseconds(0));
116 
120  void addFunctionOnce(
121  Function<void()>&& cb,
122  StringPiece nameID = StringPiece(),
123  std::chrono::milliseconds startDelay = std::chrono::milliseconds(0));
124 
131  Function<void()>&& cb,
132  std::chrono::milliseconds minInterval,
133  std::chrono::milliseconds maxInterval,
134  StringPiece nameID,
135  std::chrono::milliseconds startDelay);
136 
145  Function<void()>&& cb,
146  std::chrono::milliseconds interval,
147  StringPiece nameID = StringPiece(),
148  std::chrono::milliseconds startDelay = std::chrono::milliseconds(0));
149 
159  using NextRunTimeFunc = Function<std::chrono::steady_clock::time_point(
160  std::chrono::steady_clock::time_point,
161  std::chrono::steady_clock::time_point)>;
162 
173  Function<void()>&& cb,
174  IntervalDistributionFunc&& intervalFunc,
175  const std::string& nameID,
176  const std::string& intervalDescr,
177  std::chrono::milliseconds startDelay);
178 
185  Function<void()>&& cb,
186  NextRunTimeFunc&& fn,
187  const std::string& nameID,
188  const std::string& intervalDescr,
189  std::chrono::milliseconds startDelay);
190 
196  bool cancelFunction(StringPiece nameID);
197  bool cancelFunctionAndWait(StringPiece nameID);
198 
202  void cancelAllFunctions();
204 
214  bool resetFunctionTimer(StringPiece nameID);
215 
221  bool start();
222 
229  bool shutdown();
230 
234  void setThreadName(StringPiece threadName);
235 
236  private:
237  struct RepeatFunc {
240  std::chrono::steady_clock::time_point nextRunTime;
242  std::chrono::milliseconds startDelay;
244  bool runOnce;
245 
247  Function<void()>&& cback,
248  IntervalDistributionFunc&& intervalFn,
249  const std::string& nameID,
250  const std::string& intervalDistDescription,
251  std::chrono::milliseconds delay,
252  bool once)
253  : RepeatFunc(
254  std::move(cback),
255  getNextRunTimeFunc(std::move(intervalFn)),
256  nameID,
257  intervalDistDescription,
258  delay,
259  once) {}
260 
262  Function<void()>&& cback,
263  NextRunTimeFunc&& nextRunTimeFn,
264  const std::string& nameID,
265  const std::string& intervalDistDescription,
266  std::chrono::milliseconds delay,
267  bool once)
268  : cb(std::move(cback)),
269  nextRunTimeFunc(std::move(nextRunTimeFn)),
270  nextRunTime(),
271  name(nameID),
272  startDelay(delay),
273  intervalDescr(intervalDistDescription),
274  runOnce(once) {}
275 
277  IntervalDistributionFunc&& intervalFn) {
278  return [intervalFn = std::move(intervalFn)](
279  std::chrono::steady_clock::time_point /* curNextRunTime */,
280  std::chrono::steady_clock::time_point curTime) mutable {
281  return curTime + intervalFn();
282  };
283  }
284 
285  std::chrono::steady_clock::time_point getNextRunTime() const {
286  return nextRunTime;
287  }
288  void setNextRunTimeStrict(std::chrono::steady_clock::time_point curTime) {
289  nextRunTime = nextRunTimeFunc(nextRunTime, curTime);
290  }
292  nextRunTime = nextRunTimeFunc(nextRunTime, nextRunTime);
293  }
294  void resetNextRunTime(std::chrono::steady_clock::time_point curTime) {
295  nextRunTime = curTime + startDelay;
296  }
297  void cancel() {
298  // Simply reset cb to an empty function.
299  cb = {};
300  }
301  bool isValid() const {
302  return bool(cb);
303  }
304  };
305 
306  struct RunTimeOrder {
308  const std::unique_ptr<RepeatFunc>& f1,
309  const std::unique_ptr<RepeatFunc>& f2) const {
310  return f1->getNextRunTime() > f2->getNextRunTime();
311  }
312  };
313 
314  typedef std::vector<std::unique_ptr<RepeatFunc>> FunctionHeap;
315  typedef std::unordered_map<StringPiece, RepeatFunc*, Hash> FunctionMap;
316 
317  void run();
318  void runOneFunction(
319  std::unique_lock<std::mutex>& lock,
320  std::chrono::steady_clock::time_point now);
321  void cancelFunction(const std::unique_lock<std::mutex>& lock, RepeatFunc* it);
322  void addFunctionToHeap(
323  const std::unique_lock<std::mutex>& lock,
324  std::unique_ptr<RepeatFunc> func);
325 
326  template <typename RepeatFuncNextRunTimeFunc>
328  Function<void()>&& cb,
329  RepeatFuncNextRunTimeFunc&& fn,
330  const std::string& nameID,
331  const std::string& intervalDescr,
332  std::chrono::milliseconds startDelay,
333  bool runOnce);
334 
335  void addFunctionInternal(
336  Function<void()>&& cb,
337  NextRunTimeFunc&& fn,
338  const std::string& nameID,
339  const std::string& intervalDescr,
340  std::chrono::milliseconds startDelay,
341  bool runOnce);
342  void addFunctionInternal(
343  Function<void()>&& cb,
345  const std::string& nameID,
346  const std::string& intervalDescr,
347  std::chrono::milliseconds startDelay,
348  bool runOnce);
349 
350  // Return true if the current function is being canceled
351  bool cancelAllFunctionsWithLock(std::unique_lock<std::mutex>& lock);
353  std::unique_lock<std::mutex>& lock,
354  StringPiece nameID);
355 
356  std::thread thread_;
357 
358  // Mutex to protect our member variables.
360  bool running_{false};
361 
362  // The functions to run.
363  // This is a heap, ordered by next run time.
364  FunctionHeap functions_;
365  FunctionMap functionsMap_;
367 
368  // The function currently being invoked by the running thread.
369  // This is null when the running thread is idle
371 
372  // Condition variable that is signalled whenever a new function is added
373  // or when the FunctionScheduler is stopped.
374  std::condition_variable runningCondvar_;
375 
377  bool steady_{false};
379 };
380 
381 } // namespace folly
void addFunctionOnce(Function< void()> &&cb, StringPiece nameID=StringPiece(), std::chrono::milliseconds startDelay=std::chrono::milliseconds(0))
void addFunction(Function< void()> &&cb, std::chrono::milliseconds interval, StringPiece nameID=StringPiece(), std::chrono::milliseconds startDelay=std::chrono::milliseconds(0))
std::chrono::steady_clock::time_point getNextRunTime() const
bool cancelFunctionWithLock(std::unique_lock< std::mutex > &lock, StringPiece nameID)
std::chrono::milliseconds startDelay
void addFunctionToHeap(const std::unique_lock< std::mutex > &lock, std::unique_ptr< RepeatFunc > func)
void addFunctionGenericNextRunTimeFunctor(Function< void()> &&cb, NextRunTimeFunc &&fn, const std::string &nameID, const std::string &intervalDescr, std::chrono::milliseconds startDelay)
std::chrono::steady_clock::time_point now()
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
STL namespace.
void addFunctionToHeapChecked(Function< void()> &&cb, RepeatFuncNextRunTimeFunc &&fn, const std::string &nameID, const std::string &intervalDescr, std::chrono::milliseconds startDelay, bool runOnce)
RepeatFunc(Function< void()> &&cback, IntervalDistributionFunc &&intervalFn, const std::string &nameID, const std::string &intervalDistDescription, std::chrono::milliseconds delay, bool once)
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
std::condition_variable runningCondvar_
void runOneFunction(std::unique_lock< std::mutex > &lock, std::chrono::steady_clock::time_point now)
bool operator()(const std::unique_ptr< RepeatFunc > &f1, const std::unique_ptr< RepeatFunc > &f2) const
bool resetFunctionTimer(StringPiece nameID)
std::chrono::steady_clock::time_point nextRunTime
bool cancelFunction(StringPiece nameID)
auto lock(Synchronized< D, M > &synchronized, Args &&...args)
void resetNextRunTime(std::chrono::steady_clock::time_point curTime)
std::vector< std::unique_ptr< RepeatFunc > > FunctionHeap
void addFunctionInternal(Function< void()> &&cb, NextRunTimeFunc &&fn, const std::string &nameID, const std::string &intervalDescr, std::chrono::milliseconds startDelay, bool runOnce)
void addFunctionUniformDistribution(Function< void()> &&cb, std::chrono::milliseconds minInterval, std::chrono::milliseconds maxInterval, StringPiece nameID, std::chrono::milliseconds startDelay)
bool cancelAllFunctionsWithLock(std::unique_lock< std::mutex > &lock)
std::mutex mutex
bool cancelFunctionAndWait(StringPiece nameID)
const char * string
Definition: Conv.cpp:212
void setNextRunTimeStrict(std::chrono::steady_clock::time_point curTime)
void addFunctionConsistentDelay(Function< void()> &&cb, std::chrono::milliseconds interval, StringPiece nameID=StringPiece(), std::chrono::milliseconds startDelay=std::chrono::milliseconds(0))
void setThreadName(StringPiece threadName)
Range< const char * > StringPiece
void addFunctionGenericDistribution(Function< void()> &&cb, IntervalDistributionFunc &&intervalFunc, const std::string &nameID, const std::string &intervalDescr, std::chrono::milliseconds startDelay)
static NextRunTimeFunc getNextRunTimeFunc(IntervalDistributionFunc &&intervalFn)
RepeatFunc(Function< void()> &&cback, NextRunTimeFunc &&nextRunTimeFn, const std::string &nameID, const std::string &intervalDistDescription, std::chrono::milliseconds delay, bool once)
void setSteady(bool steady)
std::unordered_map< StringPiece, RepeatFunc *, Hash > FunctionMap