29 #include <unordered_map> 30 #include <unordered_set> 33 #include <boost/intrusive/list.hpp> 34 #include <glog/logging.h> 55 template <
typename MessageT>
74 virtual uint32_t getSampleRate()
const = 0;
105 static constexpr
const char* kContextDataName{
"EventBase"};
149 :
public boost::intrusive::list_base_hook<
150 boost::intrusive::link_mode<boost::intrusive::auto_unlink>> {
154 virtual void runLoopCallback()
noexcept = 0;
165 typedef boost::intrusive::
166 list<LoopCallback, boost::intrusive::constant_time_size<false>>
178 : function_(
std::
move(function)) {}
196 : function_(
std::
move(function)) {}
221 explicit EventBase(
bool enableTimeMeasurement);
236 explicit EventBase(event_base* evb,
bool enableTimeMeasurement =
true);
259 [[deprecated(
"This should only be used in legacy unit tests")]]
bool 260 loopIgnoreKeepAlive();
272 bool loopOnce(
int flags = 0);
318 void terminateLoopSoon();
345 void runInLoop(
LoopCallback* callback,
bool thisIteration =
false);
361 void runInLoop(
Func c,
bool thisIteration =
false);
408 template <
typename T>
409 bool runInEventBaseThread(
void (*fn)(
T*),
T* arg);
426 bool runInEventBaseThread(
Func fn);
432 template <
typename T>
433 bool runInEventBaseThreadAndWait(
void (*fn)(
T*),
T* arg);
439 bool runInEventBaseThreadAndWait(
Func fn);
445 template <
typename T>
446 bool runImmediatelyOrRunInEventBaseThreadAndWait(
void (*fn)(
T*),
T* arg);
452 bool runImmediatelyOrRunInEventBaseThreadAndWait(
Func fn);
460 assert(enableTimeMeasurement_);
461 maxLatency_ = maxLatency;
462 maxLatencyCob_ =
std::move(maxLatencyCob);
469 void setLoadAvgMsec(std::chrono::milliseconds ms);
474 void resetLoadAvg(
double value = 0.0);
480 assert(enableTimeMeasurement_);
481 return avgLoopTime_.get();
488 return loopThread_.load(std::memory_order_relaxed) != std::thread::id();
494 void waitUntilRunning();
496 size_t getNotificationQueueSize()
const;
498 void setMaxReadAtOnce(
uint32_t maxAtOnce);
505 auto tid = loopThread_.load(std::memory_order_relaxed);
506 return tid == std::thread::id() || tid == std::this_thread::get_id();
510 return loopThread_.load(std::memory_order_relaxed) ==
511 std::this_thread::get_id();
519 void checkIsInEventBaseThread()
const;
522 checkIsInEventBaseThread();
530 return *wheelTimer_.get();
540 static const char* getLibeventVersion();
541 static const char* getLibeventMethod();
551 void bumpHandlingTime()
final;
556 : expCoeff_(-1.0 / timeInterval.
count()), value_(0.0) {
557 VLOG(11) <<
"expCoeff_ " << expCoeff_ <<
" " << __PRETTY_FUNCTION__;
560 void setTimeInterval(std::chrono::microseconds timeInterval);
561 void reset(
double value = 0.0);
564 std::chrono::microseconds total,
565 std::chrono::microseconds busy);
570 auto lcoeff = buffer_time_.count() * -expCoeff_;
571 return value_ * (1.0 - lcoeff) + lcoeff * busy_buffer_.count();
581 std::chrono::microseconds buffer_time_{0};
582 std::chrono::microseconds busy_buffer_{0};
583 std::size_t buffer_cnt_{0};
584 static constexpr std::chrono::milliseconds buffer_interval_{10};
587 void setObserver(
const std::shared_ptr<EventBaseObserver>& observer) {
588 assert(enableTimeMeasurement_);
589 observer_ = observer;
603 executionObserver_ = observer;
610 return executionObserver_;
632 ++loopKeepAliveCount_;
634 --loopKeepAliveCount_;
643 void attachTimeoutManager(
655 return isInEventBaseThread();
672 if (inRunningEventBaseThread()) {
673 loopKeepAliveCount_++;
675 loopKeepAliveCountAtomic_.fetch_add(1, std::memory_order_relaxed);
681 if (!inRunningEventBaseThread()) {
682 return add([
this] { loopKeepAliveCount_--; });
684 loopKeepAliveCount_--;
688 void applyLoopKeepAlive();
690 ssize_t loopKeepAliveCount();
696 bool nothingHandledYet()
const noexcept;
701 bool loopBody(
int flags = 0,
bool ignoreKeepAlive =
false);
704 bool runLoopCallbacks();
706 void initNotificationQueue();
734 std::unique_ptr<NotificationQueue<Func>>
queue_;
736 ssize_t loopKeepAliveCount_{0};
737 std::atomic<ssize_t> loopKeepAliveCountAtomic_{0};
738 bool loopKeepAliveActive_{
false};
767 bool invokingLoop_{
false};
784 template <
typename T>
793 template <
typename T>
795 return runInEventBaseThread([=] { fn(arg); });
798 template <
typename T>
800 return runInEventBaseThreadAndWait([=] { fn(arg); });
803 template <
typename T>
807 return runImmediatelyOrRunInEventBaseThreadAndWait([=] { fn(arg); });
bool keepAliveAcquire() override
void setObserver(const std::shared_ptr< EventBaseObserver > &observer)
std::chrono::microseconds maxLatency_
void runLoopCallback() noexceptoverride
std::unique_ptr< NotificationQueue< Func > > queue_
LoopCallbackList runBeforeLoopCallbacks_
bool isInTimeoutManagerThread() final
LoopCallbackList onDestructionCallbacks_
std::chrono::milliseconds timeout_type
void setExecutionObserver(ExecutionObserver *observer)
virtual void onEventBaseDestruction(EventBase &evb)=0
std::mutex onDestructionCallbacksMutex_
LoopCallbackList * runOnceCallbacks_
std::unordered_map< std::size_t, std::shared_ptr< void > > localStorage_
bool runInEventBaseThreadAndWait(void(*fn)(T *), T *arg)
std::chrono::steady_clock::time_point TimePoint
void setContextData(const RequestToken &val, std::unique_ptr< RequestData > data)
std::atomic< std::thread::id > loopThread_
constexpr detail::Map< Move > move
EventBase * getEventBase()
bool runImmediatelyOrRunInEventBaseThreadAndWait(void(*fn)(T *), T *arg)
void add(Cob fn) override
Implements the Executor interface.
double getAvgLoopTime() const
std::unordered_set< detail::EventBaseLocalBaseBase * > localStorageToDtor_
RequestEventBase(EventBase *eb)
—— Concurrent Priority Queue Implementation ——
requires E e noexcept(noexcept(s.error(std::move(e))))
void setMaxLatency(std::chrono::microseconds maxLatency, Func maxLatencyCob)
bool hasCallback() override
SmoothLoopTime(std::chrono::microseconds timeInterval)
bool inRunningEventBaseThread() const
bool isLoopCallbackScheduled() const
bool isInEventBaseThread() const
std::shared_ptr< EventBaseObserver > observer_
static UniquePtr newTimer(Args &&...args)
void cancelLoopCallback()
std::unique_ptr< VirtualEventBase > virtualEventBase_
constexpr auto data(C &c) -> decltype(c.data())
event_base * getLibeventBase() const
bool runInEventBaseThread(void(*fn)(T *), T *arg)
LoopCallback::List LoopCallbackList
void runLoopCallback() noexceptoverride
std::chrono::steady_clock::time_point startWork_
std::unique_ptr< HHWheelTimer, Destructor > UniquePtr
void dcheckIsInEventBaseThread() const
FunctionLoopCallback(Func &&function)
SmoothLoopTime maxLatencyLoopTime_
void keepAliveRelease() override
boost::intrusive::list< LoopCallback, boost::intrusive::constant_time_size< false > > List
std::atomic< bool > stop_
ExecutionObserver * executionObserver_
const bool enableTimeMeasurement_
virtual ~EventBaseLocalBaseBase()=default
std::shared_ptr< RequestContext > context_
void dampen(double factor)
uint64_t value(const typename LockFreeRingBuffer< T, Atom >::Cursor &rbcursor)
std::unique_ptr< FunctionRunner > fnRunner_
std::size_t latestLoopCnt_
HHWheelTimer::UniquePtr wheelTimer_
LoopCallbackList loopCallbacks_
ExecutionObserver * getExecutionObserver()
RequestData * getContextData(const RequestToken &val)
SmoothLoopTime avgLoopTime_
static RequestContext * get()
void drive() override
Implements the DrivableExecutor interface.
folly::once_flag virtualEventBaseInitFlag_
uint32_t observerSampleCount_
StackFunctionLoopCallback(Func &&function)
const std::shared_ptr< EventBaseObserver > & getObserver()