proxygen
EventBase.h
Go to the documentation of this file.
1 /*
2  * Copyright 2014-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17 
18 #include <atomic>
19 #include <cerrno>
20 #include <cmath>
21 #include <cstdlib>
22 #include <functional>
23 #include <list>
24 #include <memory>
25 #include <mutex>
26 #include <queue>
27 #include <set>
28 #include <stack>
29 #include <unordered_map>
30 #include <unordered_set>
31 #include <utility>
32 
33 #include <boost/intrusive/list.hpp>
34 #include <glog/logging.h>
35 
36 #include <folly/Executor.h>
37 #include <folly/Function.h>
38 #include <folly/Portability.h>
39 #include <folly/ScopeGuard.h>
47 #include <folly/io/async/Request.h>
51 
52 namespace folly {
53 
54 using Cob = Func; // defined in folly/Executor.h
55 template <typename MessageT>
57 
58 namespace detail {
59 class EventBaseLocalBase;
60 
62  public:
63  virtual void onEventBaseDestruction(EventBase& evb) = 0;
64  virtual ~EventBaseLocalBaseBase() = default;
65 };
66 } // namespace detail
67 template <typename T>
69 
71  public:
72  virtual ~EventBaseObserver() = default;
73 
74  virtual uint32_t getSampleRate() const = 0;
75 
76  virtual void loopSample(int64_t busyTime, int64_t idleTime) = 0;
77 };
78 
79 // Helper class that sets and retrieves the EventBase associated with a given
80 // request via RequestContext. See Request.h for that mechanism.
81 class RequestEventBase : public RequestData {
82  public:
83  static EventBase* get() {
84  auto data = dynamic_cast<RequestEventBase*>(
85  RequestContext::get()->getContextData(kContextDataName));
86  if (!data) {
87  return nullptr;
88  }
89  return data->eb_;
90  }
91 
92  static void set(EventBase* eb) {
94  kContextDataName,
95  std::unique_ptr<RequestEventBase>(new RequestEventBase(eb)));
96  }
97 
98  bool hasCallback() override {
99  return false;
100  }
101 
102  private:
103  explicit RequestEventBase(EventBase* eb) : eb_(eb) {}
105  static constexpr const char* kContextDataName{"EventBase"};
106 };
107 
108 class VirtualEventBase;
109 
128 class EventBase : private boost::noncopyable,
129  public TimeoutManager,
130  public DrivableExecutor,
131  public IOExecutor,
132  public SequencedExecutor,
133  public ScheduledExecutor {
134  public:
136 
149  : public boost::intrusive::list_base_hook<
150  boost::intrusive::link_mode<boost::intrusive::auto_unlink>> {
151  public:
152  virtual ~LoopCallback() = default;
153 
154  virtual void runLoopCallback() noexcept = 0;
156  context_.reset();
157  unlink();
158  }
159 
160  bool isLoopCallbackScheduled() const {
161  return is_linked();
162  }
163 
164  private:
165  typedef boost::intrusive::
166  list<LoopCallback, boost::intrusive::constant_time_size<false>>
168 
169  // EventBase needs access to LoopCallbackList (and therefore to hook_)
170  friend class EventBase;
171  friend class VirtualEventBase;
172  std::shared_ptr<RequestContext> context_;
173  };
174 
176  public:
177  explicit FunctionLoopCallback(Func&& function)
178  : function_(std::move(function)) {}
179 
180  void runLoopCallback() noexcept override {
181  function_();
182  delete this;
183  }
184 
185  private:
187  };
188 
189  // Like FunctionLoopCallback, but saves one allocation. Use with caution.
190  //
191  // The caller is responsible for maintaining the lifetime of this callback
192  // until after the point at which the contained function is called.
194  public:
195  explicit StackFunctionLoopCallback(Func&& function)
196  : function_(std::move(function)) {}
197  void runLoopCallback() noexcept override {
198  Func(std::move(function_))();
199  }
200 
201  private:
203  };
204 
210  EventBase() : EventBase(true) {}
211 
221  explicit EventBase(bool enableTimeMeasurement);
222 
236  explicit EventBase(event_base* evb, bool enableTimeMeasurement = true);
237  ~EventBase() override;
238 
254  bool loop();
255 
259  [[deprecated("This should only be used in legacy unit tests")]] bool
260  loopIgnoreKeepAlive();
261 
272  bool loopOnce(int flags = 0);
273 
290  void loopForever();
291 
318  void terminateLoopSoon();
319 
345  void runInLoop(LoopCallback* callback, bool thisIteration = false);
346 
361  void runInLoop(Func c, bool thisIteration = false);
362 
373  void runOnDestruction(LoopCallback* callback);
374 
380  void runBeforeLoop(LoopCallback* callback);
381 
408  template <typename T>
409  bool runInEventBaseThread(void (*fn)(T*), T* arg);
410 
426  bool runInEventBaseThread(Func fn);
427 
428  /*
429  * Like runInEventBaseThread, but the caller waits for the callback to be
430  * executed.
431  */
432  template <typename T>
433  bool runInEventBaseThreadAndWait(void (*fn)(T*), T* arg);
434 
435  /*
436  * Like runInEventBaseThread, but the caller waits for the callback to be
437  * executed.
438  */
439  bool runInEventBaseThreadAndWait(Func fn);
440 
441  /*
442  * Like runInEventBaseThreadAndWait, except if the caller is already in the
443  * event base thread, the functor is simply run inline.
444  */
445  template <typename T>
446  bool runImmediatelyOrRunInEventBaseThreadAndWait(void (*fn)(T*), T* arg);
447 
448  /*
449  * Like runInEventBaseThreadAndWait, except if the caller is already in the
450  * event base thread, the functor is simply run inline.
451  */
452  bool runImmediatelyOrRunInEventBaseThreadAndWait(Func fn);
453 
459  void setMaxLatency(std::chrono::microseconds maxLatency, Func maxLatencyCob) {
460  assert(enableTimeMeasurement_);
461  maxLatency_ = maxLatency;
462  maxLatencyCob_ = std::move(maxLatencyCob);
463  }
464 
469  void setLoadAvgMsec(std::chrono::milliseconds ms);
470 
474  void resetLoadAvg(double value = 0.0);
475 
479  double getAvgLoopTime() const {
480  assert(enableTimeMeasurement_);
481  return avgLoopTime_.get();
482  }
483 
487  bool isRunning() const {
488  return loopThread_.load(std::memory_order_relaxed) != std::thread::id();
489  }
490 
494  void waitUntilRunning();
495 
496  size_t getNotificationQueueSize() const;
497 
498  void setMaxReadAtOnce(uint32_t maxAtOnce);
499 
504  bool isInEventBaseThread() const {
505  auto tid = loopThread_.load(std::memory_order_relaxed);
506  return tid == std::thread::id() || tid == std::this_thread::get_id();
507  }
508 
510  return loopThread_.load(std::memory_order_relaxed) ==
511  std::this_thread::get_id();
512  }
513 
519  void checkIsInEventBaseThread() const;
521  if (kIsDebug) {
522  checkIsInEventBaseThread();
523  }
524  }
525 
527  if (!wheelTimer_) {
528  wheelTimer_ = HHWheelTimer::newTimer(this);
529  }
530  return *wheelTimer_.get();
531  }
532 
533  // --------- interface to underlying libevent base ------------
534  // Avoid using these functions if possible. These functions are not
535  // guaranteed to always be present if we ever provide alternative EventBase
536  // implementations that do not use libevent internally.
537  event_base* getLibeventBase() const {
538  return evb_;
539  }
540  static const char* getLibeventVersion();
541  static const char* getLibeventMethod();
542 
551  void bumpHandlingTime() final;
552 
554  public:
555  explicit SmoothLoopTime(std::chrono::microseconds timeInterval)
556  : expCoeff_(-1.0 / timeInterval.count()), value_(0.0) {
557  VLOG(11) << "expCoeff_ " << expCoeff_ << " " << __PRETTY_FUNCTION__;
558  }
559 
560  void setTimeInterval(std::chrono::microseconds timeInterval);
561  void reset(double value = 0.0);
562 
563  void addSample(
564  std::chrono::microseconds total,
565  std::chrono::microseconds busy);
566 
567  double get() const {
568  // Add the outstanding buffered times linearly, to avoid
569  // expensive exponentiation
570  auto lcoeff = buffer_time_.count() * -expCoeff_;
571  return value_ * (1.0 - lcoeff) + lcoeff * busy_buffer_.count();
572  }
573 
574  void dampen(double factor) {
575  value_ *= factor;
576  }
577 
578  private:
579  double expCoeff_;
580  double value_;
581  std::chrono::microseconds buffer_time_{0};
582  std::chrono::microseconds busy_buffer_{0};
583  std::size_t buffer_cnt_{0};
584  static constexpr std::chrono::milliseconds buffer_interval_{10};
585  };
586 
587  void setObserver(const std::shared_ptr<EventBaseObserver>& observer) {
588  assert(enableTimeMeasurement_);
589  observer_ = observer;
590  }
591 
592  const std::shared_ptr<EventBaseObserver>& getObserver() {
593  return observer_;
594  }
595 
603  executionObserver_ = observer;
604  }
605 
610  return executionObserver_;
611  }
612 
616  void setName(const std::string& name);
617 
621  const std::string& getName();
622 
624  void add(Cob fn) override {
625  // runInEventBaseThread() takes a const&,
626  // so no point in doing std::move here.
627  runInEventBaseThread(std::move(fn));
628  }
629 
631  void drive() override {
632  ++loopKeepAliveCount_;
633  SCOPE_EXIT {
634  --loopKeepAliveCount_;
635  };
636  loopOnce();
637  }
638 
639  // Implements the ScheduledExecutor interface
640  void scheduleAt(Func&& fn, TimePoint const& timeout) override;
641 
642  // TimeoutManager
643  void attachTimeoutManager(
644  AsyncTimeout* obj,
645  TimeoutManager::InternalEnum internal) final;
646 
647  void detachTimeoutManager(AsyncTimeout* obj) final;
648 
649  bool scheduleTimeout(AsyncTimeout* obj, TimeoutManager::timeout_type timeout)
650  final;
651 
652  void cancelTimeout(AsyncTimeout* obj) final;
653 
655  return isInEventBaseThread();
656  }
657 
658  // Returns a VirtualEventBase attached to this EventBase. Can be used to
659  // pass to APIs which expect VirtualEventBase. This VirtualEventBase will be
660  // destroyed together with the EventBase.
661  //
662  // Any number of VirtualEventBases instances may be independently constructed,
663  // which are backed by this EventBase. This method should be only used if you
664  // don't need to manage the life time of the VirtualEventBase used.
665  folly::VirtualEventBase& getVirtualEventBase();
666 
668  EventBase* getEventBase() override;
669 
670  protected:
671  bool keepAliveAcquire() override {
672  if (inRunningEventBaseThread()) {
673  loopKeepAliveCount_++;
674  } else {
675  loopKeepAliveCountAtomic_.fetch_add(1, std::memory_order_relaxed);
676  }
677  return true;
678  }
679 
680  void keepAliveRelease() override {
681  if (!inRunningEventBaseThread()) {
682  return add([this] { loopKeepAliveCount_--; });
683  }
684  loopKeepAliveCount_--;
685  }
686 
687  private:
688  void applyLoopKeepAlive();
689 
690  ssize_t loopKeepAliveCount();
691 
692  /*
693  * Helper function that tells us whether we have already handled
694  * some event/timeout/callback in this loop iteration.
695  */
696  bool nothingHandledYet() const noexcept;
697 
699  class FunctionRunner;
700 
701  bool loopBody(int flags = 0, bool ignoreKeepAlive = false);
702 
703  // executes any callbacks queued by runInLoop(); returns false if none found
704  bool runLoopCallbacks();
705 
706  void initNotificationQueue();
707 
708  // should only be accessed through public getter
710 
711  LoopCallbackList loopCallbacks_;
712  LoopCallbackList runBeforeLoopCallbacks_;
713  LoopCallbackList onDestructionCallbacks_;
714 
715  // This will be null most of the time, but point to currentCallbacks
716  // if we are in the middle of running loop callbacks, such that
717  // runInLoop(..., true) will always run in the current loop
718  // iteration.
719  LoopCallbackList* runOnceCallbacks_;
720 
721  // stop_ is set by terminateLoopSoon() and is used by the main loop
722  // to determine if it should exit
723  std::atomic<bool> stop_;
724 
725  // The ID of the thread running the main loop.
726  // std::thread::id{} if loop is not running.
727  std::atomic<std::thread::id> loopThread_;
728 
729  // pointer to underlying event_base class doing the heavy lifting
730  event_base* evb_;
731 
732  // A notification queue for runInEventBaseThread() to use
733  // to send function requests to the EventBase thread.
734  std::unique_ptr<NotificationQueue<Func>> queue_;
735  std::unique_ptr<FunctionRunner> fnRunner_;
736  ssize_t loopKeepAliveCount_{0};
737  std::atomic<ssize_t> loopKeepAliveCountAtomic_{0};
738  bool loopKeepAliveActive_{false};
739 
740  // limit for latency in microseconds (0 disables)
741  std::chrono::microseconds maxLatency_;
742 
743  // exponentially-smoothed average loop time for latency-limiting
745 
746  // smoothed loop time used to invoke latency callbacks; differs from
747  // avgLoopTime_ in that it's scaled down after triggering a callback
748  // to reduce spamminess
750 
751  // callback called when latency limit is exceeded
753 
754  // Enables/disables time measurements in loopBody(). if disabled, the
755  // following functionality that relies on time-measurement, will not
756  // be supported: avg loop time, observer and max latency.
758 
759  // Wrap-around loop counter to detect beginning of each loop
760  std::size_t nextLoopCnt_;
761  std::size_t latestLoopCnt_;
762  std::chrono::steady_clock::time_point startWork_;
763  // Prevent undefined behavior from invoking event_base_loop() reentrantly.
764  // This is needed since many projects use libevent-1.4, which lacks commit
765  // b557b175c00dc462c1fce25f6e7dd67121d2c001 from
766  // https://github.com/libevent/libevent/.
767  bool invokingLoop_{false};
768 
769  // Observer to export counters
770  std::shared_ptr<EventBaseObserver> observer_;
772 
773  // EventHandler's execution observer.
775 
776  // Name of the thread running this EventBase
778 
779  // allow runOnDestruction() to be called from any threads
781 
782  // see EventBaseLocal
784  template <typename T>
785  friend class EventBaseLocal;
786  std::unordered_map<std::size_t, std::shared_ptr<void>> localStorage_;
787  std::unordered_set<detail::EventBaseLocalBaseBase*> localStorageToDtor_;
788 
790  std::unique_ptr<VirtualEventBase> virtualEventBase_;
791 };
792 
793 template <typename T>
794 bool EventBase::runInEventBaseThread(void (*fn)(T*), T* arg) {
795  return runInEventBaseThread([=] { fn(arg); });
796 }
797 
798 template <typename T>
799 bool EventBase::runInEventBaseThreadAndWait(void (*fn)(T*), T* arg) {
800  return runInEventBaseThreadAndWait([=] { fn(arg); });
801 }
802 
803 template <typename T>
805  void (*fn)(T*),
806  T* arg) {
807  return runImmediatelyOrRunInEventBaseThreadAndWait([=] { fn(arg); });
808 }
809 
810 } // namespace folly
bool keepAliveAcquire() override
Definition: EventBase.h:671
void setObserver(const std::shared_ptr< EventBaseObserver > &observer)
Definition: EventBase.h:587
std::chrono::microseconds maxLatency_
Definition: EventBase.h:741
void runLoopCallback() noexceptoverride
Definition: EventBase.h:180
flags
Definition: http_parser.h:127
std::unique_ptr< NotificationQueue< Func > > queue_
Definition: EventBase.h:734
LoopCallbackList runBeforeLoopCallbacks_
Definition: EventBase.h:712
bool isInTimeoutManagerThread() final
Definition: EventBase.h:654
LoopCallbackList onDestructionCallbacks_
Definition: EventBase.h:713
std::chrono::milliseconds timeout_type
void setExecutionObserver(ExecutionObserver *observer)
Definition: EventBase.h:602
virtual void onEventBaseDestruction(EventBase &evb)=0
std::mutex onDestructionCallbacksMutex_
Definition: EventBase.h:780
constexpr auto kIsDebug
Definition: Portability.h:264
LoopCallbackList * runOnceCallbacks_
Definition: EventBase.h:719
auto add
Definition: BaseTest.cpp:70
std::unordered_map< std::size_t, std::shared_ptr< void > > localStorage_
Definition: EventBase.h:786
bool runInEventBaseThreadAndWait(void(*fn)(T *), T *arg)
Definition: EventBase.h:799
std::chrono::steady_clock::time_point TimePoint
void setContextData(const RequestToken &val, std::unique_ptr< RequestData > data)
Definition: Request.cpp:129
std::atomic< std::thread::id > loopThread_
Definition: EventBase.h:727
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
EventBase * getEventBase()
bool runImmediatelyOrRunInEventBaseThreadAndWait(void(*fn)(T *), T *arg)
Definition: EventBase.h:804
void add(Cob fn) override
Implements the Executor interface.
Definition: EventBase.h:624
STL namespace.
double getAvgLoopTime() const
Definition: EventBase.h:479
#define SCOPE_EXIT
Definition: ScopeGuard.h:274
std::unordered_set< detail::EventBaseLocalBaseBase * > localStorageToDtor_
Definition: EventBase.h:787
folly::std T
RequestEventBase(EventBase *eb)
Definition: EventBase.h:103
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
std::string name_
Definition: EventBase.h:777
requires E e noexcept(noexcept(s.error(std::move(e))))
void setMaxLatency(std::chrono::microseconds maxLatency, Func maxLatencyCob)
Definition: EventBase.h:459
bool hasCallback() override
Definition: EventBase.h:98
SmoothLoopTime(std::chrono::microseconds timeInterval)
Definition: EventBase.h:555
const char * name
Definition: http_parser.c:437
bool inRunningEventBaseThread() const
Definition: EventBase.h:509
bool isLoopCallbackScheduled() const
Definition: EventBase.h:160
bool isInEventBaseThread() const
Definition: EventBase.h:504
Function< void()> Func
Definition: Executor.h:27
std::shared_ptr< EventBaseObserver > observer_
Definition: EventBase.h:770
static UniquePtr newTimer(Args &&...args)
Definition: HHWheelTimer.h:61
EventBase * evb_
std::unique_ptr< VirtualEventBase > virtualEventBase_
Definition: EventBase.h:790
constexpr auto data(C &c) -> decltype(c.data())
Definition: Access.h:71
event_base * getLibeventBase() const
Definition: EventBase.h:537
bool runInEventBaseThread(void(*fn)(T *), T *arg)
Definition: EventBase.h:794
LoopCallback::List LoopCallbackList
Definition: EventBase.h:698
event_base * evb_
Definition: EventBase.h:730
void runLoopCallback() noexceptoverride
Definition: EventBase.h:197
std::chrono::steady_clock::time_point startWork_
Definition: EventBase.h:762
std::unique_ptr< HHWheelTimer, Destructor > UniquePtr
Definition: HHWheelTimer.h:57
void dcheckIsInEventBaseThread() const
Definition: EventBase.h:520
SmoothLoopTime maxLatencyLoopTime_
Definition: EventBase.h:749
void keepAliveRelease() override
Definition: EventBase.h:680
boost::intrusive::list< LoopCallback, boost::intrusive::constant_time_size< false > > List
Definition: EventBase.h:167
int * count
std::atomic< bool > stop_
Definition: EventBase.h:723
ExecutionObserver * executionObserver_
Definition: EventBase.h:774
void loop(int iters)
bool isRunning() const
Definition: EventBase.h:487
const bool enableTimeMeasurement_
Definition: EventBase.h:757
std::mutex mutex
std::size_t nextLoopCnt_
Definition: EventBase.h:760
const char * string
Definition: Conv.cpp:212
std::shared_ptr< RequestContext > context_
Definition: EventBase.h:172
void dampen(double factor)
Definition: EventBase.h:574
uint64_t value(const typename LockFreeRingBuffer< T, Atom >::Cursor &rbcursor)
std::unique_ptr< FunctionRunner > fnRunner_
Definition: EventBase.h:735
std::size_t latestLoopCnt_
Definition: EventBase.h:761
HHWheelTimer::UniquePtr wheelTimer_
Definition: EventBase.h:709
LoopCallbackList loopCallbacks_
Definition: EventBase.h:711
ExecutionObserver * getExecutionObserver()
Definition: EventBase.h:609
RequestData * getContextData(const RequestToken &val)
Definition: Request.cpp:151
char c
SmoothLoopTime avgLoopTime_
Definition: EventBase.h:744
static RequestContext * get()
Definition: Request.cpp:290
void drive() override
Implements the DrivableExecutor interface.
Definition: EventBase.h:631
HHWheelTimer & timer()
Definition: EventBase.h:526
folly::once_flag virtualEventBaseInitFlag_
Definition: EventBase.h:789
uint32_t observerSampleCount_
Definition: EventBase.h:771
const std::shared_ptr< EventBaseObserver > & getObserver()
Definition: EventBase.h:592