16 #ifndef __STDC_FORMAT_MACROS 17 #define __STDC_FORMAT_MACROS 96 std::lock_guard<std::mutex>
lock(libevent_mutex_);
103 event_set(&ev, 0, 0,
nullptr,
nullptr);
110 evb_ = event_base_new();
114 LOG(ERROR) <<
"EventBase(): Failed to init event base.";
117 VLOG(5) <<
"EventBase(): Created.";
142 LOG(ERROR) <<
"EventBase(): Pass nullptr as event base.";
143 throw std::invalid_argument(
"EventBase(): event base cannot be nullptr");
149 std::future<void> virtualEventBaseDestroyFuture;
162 if (virtualEventBaseDestroyFuture.valid()) {
163 virtualEventBaseDestroyFuture.get();
180 LOG(ERROR) <<
"~EventBase(): Unable to drain notification queue";
186 std::lock_guard<std::mutex>
lock(libevent_mutex_);
187 event_base_free(
evb_);
191 storage->onEventBaseDestruction(*
this);
194 VLOG(5) <<
"EventBase(): Destroyed.";
206 auto evbTid =
loopThread_.load(std::memory_order_relaxed);
207 if (evbTid == std::thread::id()) {
213 auto curTid = std::this_thread::get_id();
214 CHECK(evbTid == curTid)
215 <<
"This logic must be executed in the event base thread. " 216 <<
"Event base thread name: \"" 218 <<
"\", current thread name: \"" 226 std::chrono::microseconds
us = std::chrono::milliseconds(ms);
227 if (ms > std::chrono::milliseconds::zero()) {
231 LOG(ERROR) <<
"non-positive arg to setLoadAvgMsec()";
242 std::chrono::steady_clock::time_point* prev) {
246 return std::chrono::duration_cast<std::chrono::milliseconds>(result);
272 return loopBody(flags | EVLOOP_ONCE);
276 VLOG(5) <<
"EventBase(): Starting loop.";
279 <<
"Your code just tried to loop over an event base from inside another " 280 <<
"event base loop. Since libevent is not reentrant, this leads to " 281 <<
"undefined behavior in opt builds. Please fix immediately. For the " 282 <<
"common case of an inner function that needs to do some synchronous " 283 <<
"computation on an event-base, replace getEventBase() by a new, " 284 <<
"stack-allocated EvenBase.";
291 bool ranLoopCallbacks;
292 bool blocking = !(flags & EVLOOP_NONBLOCK);
293 bool once = (flags & EVLOOP_ONCE);
296 std::chrono::steady_clock::time_point prev;
297 std::chrono::steady_clock::time_point idleStart = {};
298 std::chrono::microseconds busy;
299 std::chrono::microseconds idle;
301 loopThread_.store(std::this_thread::get_id(), std::memory_order_release);
303 if (!
name_.empty()) {
312 while (!
stop_.load(std::memory_order_relaxed)) {
313 if (!ignoreKeepAlive) {
322 while (!callbacks.empty()) {
323 auto* item = &callbacks.front();
324 callbacks.pop_front();
325 item->runLoopCallback();
331 res = event_base_loop(
evb_, EVLOOP_ONCE);
333 res = event_base_loop(
evb_, EVLOOP_ONCE | EVLOOP_NONBLOCK);
340 busy = std::chrono::duration_cast<std::chrono::microseconds>(
342 idle = std::chrono::duration_cast<std::chrono::microseconds>(
344 auto loop_time = busy + idle;
352 observer_->loopSample(busy.count(), idle.count());
356 VLOG(11) <<
"EventBase " <<
this <<
" did not timeout " 357 <<
" loop time guess: " << loop_time.count()
358 <<
" idle time: " << idle.count()
359 <<
" busy time: " << busy.count()
367 if ((
maxLatency_ > std::chrono::microseconds::zero()) &&
378 VLOG(11) <<
"EventBase " <<
this <<
" did not timeout";
384 if (res != 0 && !ranLoopCallbacks) {
396 VLOG(11) <<
"EventBase " <<
this 405 stop_.store(
false, std::memory_order_relaxed);
408 LOG(ERROR) <<
"EventBase: -- error in event loop, res = " << res;
410 }
else if (res == 1) {
411 VLOG(5) <<
"EventBase: ran out of events (exiting loop)!";
412 }
else if (res > 1) {
413 LOG(ERROR) <<
"EventBase: unknown event loop result = " << res;
419 VLOG(5) <<
"EventBase(): Done with loop.";
481 VLOG(11) <<
"EventBase " <<
this <<
" " << __PRETTY_FUNCTION__
488 VLOG(11) <<
"EventBase " <<
this <<
" " << __PRETTY_FUNCTION__
489 <<
" (loop) startWork_ " <<
startWork_.time_since_epoch().count();
494 VLOG(5) <<
"EventBase(): Received terminateLoopSoon() command.";
497 stop_.store(
true, std::memory_order_relaxed);
501 event_base_loopbreak(
evb_);
513 queue_->putMessage(
nullptr);
560 LOG(ERROR) <<
"EventBase " <<
this 561 <<
": Scheduling nullptr callbacks is not allowed";
573 }
catch (
const std::exception& ex) {
574 LOG(ERROR) <<
"EventBase " <<
this <<
": failed to schedule function " 575 <<
"for EventBase thread: " << ex.what();
584 LOG(ERROR) <<
"EventBase " <<
this <<
": Waiting in the event loop is not " 627 while (!currentCallbacks.empty()) {
629 currentCallbacks.pop_front();
642 queue_ = std::make_unique<NotificationQueue<Func>>();
647 fnRunner_ = std::make_unique<FunctionRunner>();
658 fnRunner_->startConsumingInternal(
this,
queue_.get());
662 std::chrono::microseconds timeInterval) {
663 expCoeff_ = -1.0 / timeInterval.count();
664 VLOG(11) <<
"expCoeff_ " << expCoeff_ <<
" " << __PRETTY_FUNCTION__;
672 std::chrono::microseconds total,
673 std::chrono::microseconds busy) {
674 if ((buffer_time_ + total) > buffer_interval_ && buffer_cnt_ > 0) {
677 double coeff = exp(buffer_time_.count() * expCoeff_);
679 value_ * coeff + (1.0 - coeff) * (busy_buffer_.count() / buffer_cnt_);
680 buffer_time_ = std::chrono::microseconds{0};
681 busy_buffer_ = std::chrono::microseconds{0};
684 buffer_time_ += total;
685 busy_buffer_ += busy;
696 assert(ev->ev_base ==
nullptr);
708 ev->ev_base =
nullptr;
717 tv.tv_sec = long(timeout.count() / 1000LL);
718 tv.tv_usec = long((timeout.count() % 1000LL) * 1000LL);
724 if (event_add(ev, &tv) < 0) {
725 LOG(ERROR) <<
"EventBase: failed to schedule timeout: " <<
errnoStr(errno);
755 auto duration = timeout -
now();
758 std::chrono::duration_cast<std::chrono::milliseconds>(duration));
762 return event_get_version();
765 return event_get_method();
const std::string & getName()
std::chrono::microseconds maxLatency_
void messageAvailable(Func &&msg) noexceptoverride
std::unique_ptr< NotificationQueue< Func > > queue_
LoopCallbackList runBeforeLoopCallbacks_
LoopCallbackList onDestructionCallbacks_
static constexpr std::chrono::milliseconds buffer_interval_
std::chrono::milliseconds timeout_type
Optional< std::string > getThreadName(std::thread::id id)
size_t getNotificationQueueSize() const
std::mutex onDestructionCallbacksMutex_
LoopCallbackList * runOnceCallbacks_
bool runInEventBaseThreadAndWait(void(*fn)(T *), T *arg)
std::chrono::steady_clock::time_point TimePoint
static std::mutex libevent_mutex_
bool loopBody(int flags=0, bool ignoreKeepAlive=false)
std::atomic< std::thread::id > loopThread_
constexpr detail::Map< Move > move
std::chrono::steady_clock::time_point now()
bool runImmediatelyOrRunInEventBaseThreadAndWait(void(*fn)(T *), T *arg)
void applyLoopKeepAlive()
EventBase * getEventBase()
std::unordered_set< detail::EventBaseLocalBaseBase * > localStorageToDtor_
void setTimeInterval(std::chrono::microseconds timeInterval)
virtual void runLoopCallback() noexcept=0
—— Concurrent Priority Queue Implementation ——
requires E e noexcept(noexcept(s.error(std::move(e))))
struct event * getEvent()
void detachTimeoutManager(AsyncTimeout *obj) final
bool scheduleTimeout(AsyncTimeout *obj, TimeoutManager::timeout_type timeout) final
void scheduleAt(Func &&fn, TimePoint const &timeout) override
auto event_ref_flags(struct event *ev) -> decltype(std::ref(ev->ev_flags))
static const char * getLibeventVersion()
FOLLY_ALWAYS_INLINE void call_once(basic_once_flag< Mutex, Atom > &flag, F &&f, Args &&...args)
virtual TimePoint now()
Get this executor's notion of time. Must be threadsafe.
bool inRunningEventBaseThread() const
constexpr std::decay< T >::type copy(T &&value) noexcept(noexcept(typename std::decay< T >::type(std::forward< T >(value))))
void bumpHandlingTime() final
FOLLY_ALWAYS_INLINE void wait(const WaitOptions &opt=wait_options()) noexcept
ssize_t loopKeepAliveCount()
void addSample(std::chrono::microseconds total, std::chrono::microseconds busy)
bool isInEventBaseThread() const
static std::shared_ptr< RequestContext > saveContext()
std::shared_ptr< EventBaseObserver > observer_
void runInLoop(LoopCallback *callback, bool thisIteration=false)
static const char * getLibeventMethod()
void checkIsInEventBaseThread() const
void cancelLoopCallback()
void reset(double value=0.0)
bool loopOnce(int flags=0)
std::unique_ptr< VirtualEventBase > virtualEventBase_
EventBase * getEventBase() override
Implements the IOExecutor interface.
event_base * getLibeventBase() const
bool runInEventBaseThread(void(*fn)(T *), T *arg)
auto lock(Synchronized< D, M > &synchronized, Args &&...args)
static bool isEventRegistered(const struct event *ev)
LoopCallback::List LoopCallbackList
void cancelTimeout(AsyncTimeout *obj) final
void scheduleTimeoutFn(F fn, std::chrono::milliseconds timeout)
std::chrono::steady_clock::time_point startWork_
void dcheckIsInEventBaseThread() const
FOLLY_CPP14_CONSTEXPR Value value_or(U &&dflt) const &
fbstring errnoStr(int err)
SmoothLoopTime maxLatencyLoopTime_
void attachTimeoutManager(AsyncTimeout *obj, TimeoutManager::InternalEnum internal) final
std::atomic< bool > stop_
ExecutionObserver * executionObserver_
bool setThreadName(std::thread::id tid, StringPiece name)
void setLoadAvgMsec(std::chrono::milliseconds ms)
ssize_t loopKeepAliveCount_
void setMaxReadAtOnce(uint32_t maxAtOnce)
const bool enableTimeMeasurement_
std::shared_ptr< RequestContext > context_
void dampen(double factor)
void setName(const std::string &name)
uint64_t value(const typename LockFreeRingBuffer< T, Atom >::Cursor &rbcursor)
std::unique_ptr< FunctionRunner > fnRunner_
std::size_t latestLoopCnt_
std::atomic< ssize_t > loopKeepAliveCountAtomic_
NotificationQueue * queue_
LoopCallbackList loopCallbacks_
void runOnDestruction(LoopCallback *callback)
void throwSystemError(Args &&...args)
bool loopKeepAliveActive_
static std::chrono::milliseconds getTimeDelta(std::chrono::steady_clock::time_point *prev)
void runBeforeLoop(LoopCallback *callback)
SmoothLoopTime avgLoopTime_
folly::VirtualEventBase & getVirtualEventBase()
bool nothingHandledYet() const noexcept
folly::once_flag virtualEventBaseInitFlag_
uint32_t observerSampleCount_
void resetLoadAvg(double value=0.0)
void initNotificationQueue()
static unordered_set< string > us
bool loopIgnoreKeepAlive()