proxygen
EventBase.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2014-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #ifndef __STDC_FORMAT_MACROS
17 #define __STDC_FORMAT_MACROS
18 #endif
19 
21 
22 #include <fcntl.h>
23 
24 #include <memory>
25 #include <mutex>
26 #include <thread>
27 
28 #include <folly/Memory.h>
29 #include <folly/String.h>
35 
36 namespace folly {
37 
38 /*
39  * EventBase::FunctionRunner
40  */
41 
43  : public NotificationQueue<EventBase::Func>::Consumer {
44  public:
45  void messageAvailable(Func&& msg) noexcept override {
46  // In libevent2, internal events do not break the loop.
47  // Most users would expect loop(), followed by runInEventBaseThread(),
48  // to break the loop and check if it should exit or not.
49  // To have similar bejaviour to libevent1.4, tell the loop to break here.
50  // Note that loop() may still continue to loop, but it will also check the
51  // stop_ flag as well as runInLoop callbacks, etc.
52  event_base_loopbreak(getEventBase()->evb_);
53 
54  if (!msg) {
55  // terminateLoopSoon() sends a null message just to
56  // wake up the loop. We can ignore these messages.
57  return;
58  }
59  msg();
60  }
61 };
62 
63 // The interface used to libevent is not thread-safe. Calls to
64 // event_init() and event_base_free() directly modify an internal
65 // global 'current_base', so a mutex is required to protect this.
66 //
67 // event_init() should only ever be called once. Subsequent calls
68 // should be made to event_base_new(). We can recognise that
69 // event_init() has already been called by simply inspecting current_base.
71 
72 /*
73  * EventBase methods
74  */
75 
76 EventBase::EventBase(bool enableTimeMeasurement)
78  stop_(false),
79  loopThread_(),
80  queue_(nullptr),
82  maxLatency_(0),
83  avgLoopTime_(std::chrono::seconds(2)),
85  enableTimeMeasurement_(enableTimeMeasurement),
87  std::size_t(-40)) // Early wrap-around so bugs will manifest soon
88  ,
90  startWork_(),
94  struct event ev;
95  {
96  std::lock_guard<std::mutex> lock(libevent_mutex_);
97 
98  // The value 'current_base' (libevent 1) or
99  // 'event_global_current_base_' (libevent 2) is filled in by event_set(),
100  // allowing examination of its value without an explicit reference here.
101  // If ev.ev_base is nullptr, then event_init() must be called, otherwise
102  // call event_base_new().
103  event_set(&ev, 0, 0, nullptr, nullptr);
104  if (!ev.ev_base) {
105  evb_ = event_init();
106  }
107  }
108 
109  if (ev.ev_base) {
110  evb_ = event_base_new();
111  }
112 
113  if (UNLIKELY(evb_ == nullptr)) {
114  LOG(ERROR) << "EventBase(): Failed to init event base.";
115  folly::throwSystemError("error in EventBase::EventBase()");
116  }
117  VLOG(5) << "EventBase(): Created.";
119 }
120 
121 // takes ownership of the event_base
122 EventBase::EventBase(event_base* evb, bool enableTimeMeasurement)
124  stop_(false),
125  loopThread_(),
126  evb_(evb),
127  queue_(nullptr),
129  maxLatency_(0),
130  avgLoopTime_(std::chrono::seconds(2)),
132  enableTimeMeasurement_(enableTimeMeasurement),
133  nextLoopCnt_(
134  std::size_t(-40)) // Early wrap-around so bugs will manifest soon
135  ,
137  startWork_(),
141  if (UNLIKELY(evb_ == nullptr)) {
142  LOG(ERROR) << "EventBase(): Pass nullptr as event base.";
143  throw std::invalid_argument("EventBase(): event base cannot be nullptr");
144  }
146 }
147 
149  std::future<void> virtualEventBaseDestroyFuture;
150  if (virtualEventBase_) {
151  virtualEventBaseDestroyFuture = virtualEventBase_->destroy();
152  }
153 
154  // Keep looping until all keep-alive handles are released. Each keep-alive
155  // handle signals that some external code will still schedule some work on
156  // this EventBase (so it's not safe to destroy it).
157  while (loopKeepAliveCount() > 0) {
159  loopOnce();
160  }
161 
162  if (virtualEventBaseDestroyFuture.valid()) {
163  virtualEventBaseDestroyFuture.get();
164  }
165 
166  // Call all destruction callbacks, before we start cleaning up our state.
167  while (!onDestructionCallbacks_.empty()) {
168  LoopCallback* callback = &onDestructionCallbacks_.front();
169  onDestructionCallbacks_.pop_front();
170  callback->runLoopCallback();
171  }
172 
174 
175  DCHECK_EQ(0u, runBeforeLoopCallbacks_.size());
176 
177  (void)runLoopCallbacks();
178 
179  if (!fnRunner_->consumeUntilDrained()) {
180  LOG(ERROR) << "~EventBase(): Unable to drain notification queue";
181  }
182 
183  // Stop consumer before deleting NotificationQueue
184  fnRunner_->stopConsuming();
185  {
186  std::lock_guard<std::mutex> lock(libevent_mutex_);
187  event_base_free(evb_);
188  }
189 
190  for (auto storage : localStorageToDtor_) {
191  storage->onEventBaseDestruction(*this);
192  }
193 
194  VLOG(5) << "EventBase(): Destroyed.";
195 }
196 
198  return queue_->size();
199 }
200 
202  fnRunner_->setMaxReadAtOnce(maxAtOnce);
203 }
204 
206  auto evbTid = loopThread_.load(std::memory_order_relaxed);
207  if (evbTid == std::thread::id()) {
208  return;
209  }
210 
211  // Using getThreadName(evbTid) instead of name_ will work also if
212  // the thread name is set outside of EventBase (and name_ is empty).
213  auto curTid = std::this_thread::get_id();
214  CHECK(evbTid == curTid)
215  << "This logic must be executed in the event base thread. "
216  << "Event base thread name: \""
217  << folly::getThreadName(evbTid).value_or("")
218  << "\", current thread name: \""
219  << folly::getThreadName(curTid).value_or("") << "\"";
220 }
221 
222 // Set smoothing coefficient for loop load average; input is # of milliseconds
223 // for exp(-1) decay.
224 void EventBase::setLoadAvgMsec(std::chrono::milliseconds ms) {
225  assert(enableTimeMeasurement_);
226  std::chrono::microseconds us = std::chrono::milliseconds(ms);
227  if (ms > std::chrono::milliseconds::zero()) {
230  } else {
231  LOG(ERROR) << "non-positive arg to setLoadAvgMsec()";
232  }
233 }
234 
236  assert(enableTimeMeasurement_);
237  avgLoopTime_.reset(value);
238  maxLatencyLoopTime_.reset(value);
239 }
240 
241 static std::chrono::milliseconds getTimeDelta(
242  std::chrono::steady_clock::time_point* prev) {
243  auto result = std::chrono::steady_clock::now() - *prev;
245 
246  return std::chrono::duration_cast<std::chrono::milliseconds>(result);
247 }
248 
250  while (!isRunning()) {
252  }
253 }
254 
255 // enters the event_base loop -- will only exit when forced to
257  return loopBody();
258 }
259 
261  if (loopKeepAliveActive_) {
262  // Make sure NotificationQueue is not counted as one of the readers
263  // (otherwise loopBody won't return until terminateLoopSoon is called).
264  fnRunner_->stopConsuming();
265  fnRunner_->startConsumingInternal(this, queue_.get());
266  loopKeepAliveActive_ = false;
267  }
268  return loopBody(0, true);
269 }
270 
272  return loopBody(flags | EVLOOP_ONCE);
273 }
274 
275 bool EventBase::loopBody(int flags, bool ignoreKeepAlive) {
276  VLOG(5) << "EventBase(): Starting loop.";
277 
278  DCHECK(!invokingLoop_)
279  << "Your code just tried to loop over an event base from inside another "
280  << "event base loop. Since libevent is not reentrant, this leads to "
281  << "undefined behavior in opt builds. Please fix immediately. For the "
282  << "common case of an inner function that needs to do some synchronous "
283  << "computation on an event-base, replace getEventBase() by a new, "
284  << "stack-allocated EvenBase.";
285  invokingLoop_ = true;
286  SCOPE_EXIT {
287  invokingLoop_ = false;
288  };
289 
290  int res = 0;
291  bool ranLoopCallbacks;
292  bool blocking = !(flags & EVLOOP_NONBLOCK);
293  bool once = (flags & EVLOOP_ONCE);
294 
295  // time-measurement variables.
296  std::chrono::steady_clock::time_point prev;
297  std::chrono::steady_clock::time_point idleStart = {};
298  std::chrono::microseconds busy;
299  std::chrono::microseconds idle;
300 
301  loopThread_.store(std::this_thread::get_id(), std::memory_order_release);
302 
303  if (!name_.empty()) {
305  }
306 
309  idleStart = prev;
310  }
311 
312  while (!stop_.load(std::memory_order_relaxed)) {
313  if (!ignoreKeepAlive) {
315  }
316  ++nextLoopCnt_;
317 
318  // Run the before loop callbacks
319  LoopCallbackList callbacks;
320  callbacks.swap(runBeforeLoopCallbacks_);
321 
322  while (!callbacks.empty()) {
323  auto* item = &callbacks.front();
324  callbacks.pop_front();
325  item->runLoopCallback();
326  }
327 
328  // nobody can add loop callbacks from within this thread if
329  // we don't have to handle anything to start with...
330  if (blocking && loopCallbacks_.empty()) {
331  res = event_base_loop(evb_, EVLOOP_ONCE);
332  } else {
333  res = event_base_loop(evb_, EVLOOP_ONCE | EVLOOP_NONBLOCK);
334  }
335 
336  ranLoopCallbacks = runLoopCallbacks();
337 
340  busy = std::chrono::duration_cast<std::chrono::microseconds>(
341  now - startWork_);
342  idle = std::chrono::duration_cast<std::chrono::microseconds>(
343  startWork_ - idleStart);
344  auto loop_time = busy + idle;
345 
346  avgLoopTime_.addSample(loop_time, busy);
347  maxLatencyLoopTime_.addSample(loop_time, busy);
348 
349  if (observer_) {
350  if (observerSampleCount_++ == observer_->getSampleRate()) {
352  observer_->loopSample(busy.count(), idle.count());
353  }
354  }
355 
356  VLOG(11) << "EventBase " << this << " did not timeout "
357  << " loop time guess: " << loop_time.count()
358  << " idle time: " << idle.count()
359  << " busy time: " << busy.count()
360  << " avgLoopTime: " << avgLoopTime_.get()
361  << " maxLatencyLoopTime: " << maxLatencyLoopTime_.get()
362  << " maxLatency_: " << maxLatency_.count() << "us"
363  << " notificationQueueSize: " << getNotificationQueueSize()
364  << " nothingHandledYet(): " << nothingHandledYet();
365 
366  // see if our average loop time has exceeded our limit
367  if ((maxLatency_ > std::chrono::microseconds::zero()) &&
368  (maxLatencyLoopTime_.get() > double(maxLatency_.count()))) {
369  maxLatencyCob_();
370  // back off temporarily -- don't keep spamming maxLatencyCob_
371  // if we're only a bit over the limit
373  }
374 
375  // Our loop run did real work; reset the idle timer
376  idleStart = now;
377  } else {
378  VLOG(11) << "EventBase " << this << " did not timeout";
379  }
380 
381  // If the event loop indicate that there were no more events, and
382  // we also didn't have any loop callbacks to run, there is nothing left to
383  // do.
384  if (res != 0 && !ranLoopCallbacks) {
385  // Since Notification Queue is marked 'internal' some events may not have
386  // run. Run them manually if so, and continue looping.
387  //
388  if (getNotificationQueueSize() > 0) {
389  fnRunner_->handlerReady(0);
390  } else {
391  break;
392  }
393  }
394 
396  VLOG(11) << "EventBase " << this
397  << " loop time: " << getTimeDelta(&prev).count();
398  }
399 
400  if (once) {
401  break;
402  }
403  }
404  // Reset stop_ so loop() can be called again
405  stop_.store(false, std::memory_order_relaxed);
406 
407  if (res < 0) {
408  LOG(ERROR) << "EventBase: -- error in event loop, res = " << res;
409  return false;
410  } else if (res == 1) {
411  VLOG(5) << "EventBase: ran out of events (exiting loop)!";
412  } else if (res > 1) {
413  LOG(ERROR) << "EventBase: unknown event loop result = " << res;
414  return false;
415  }
416 
417  loopThread_.store({}, std::memory_order_release);
418 
419  VLOG(5) << "EventBase(): Done with loop.";
420  return true;
421 }
422 
424  if (loopKeepAliveCountAtomic_.load(std::memory_order_relaxed)) {
426  loopKeepAliveCountAtomic_.exchange(0, std::memory_order_relaxed);
427  }
428  DCHECK_GE(loopKeepAliveCount_, 0);
429 
430  return loopKeepAliveCount_;
431 }
432 
434  auto keepAliveCount = loopKeepAliveCount();
435  // Make sure default VirtualEventBase won't hold EventBase::loop() forever.
436  if (virtualEventBase_ && virtualEventBase_->keepAliveCount() == 1) {
437  --keepAliveCount;
438  }
439 
440  if (loopKeepAliveActive_ && keepAliveCount == 0) {
441  // Restore the notification queue internal flag
442  fnRunner_->stopConsuming();
443  fnRunner_->startConsumingInternal(this, queue_.get());
444  loopKeepAliveActive_ = false;
445  } else if (!loopKeepAliveActive_ && keepAliveCount > 0) {
446  // Update the notification queue event to treat it as a normal
447  // (non-internal) event. The notification queue event always remains
448  // installed, and the main loop won't exit with it installed.
449  fnRunner_->stopConsuming();
450  fnRunner_->startConsuming(this, queue_.get());
451  loopKeepAliveActive_ = true;
452  }
453 }
454 
456  bool ret;
457  {
458  SCOPE_EXIT {
460  };
461  // Make sure notification queue events are treated as normal events.
462  // We can't use loopKeepAlive() here since LoopKeepAlive token can only be
463  // released inside a loop.
465  SCOPE_EXIT {
467  };
468  ret = loop();
469  }
470 
471  if (!ret) {
472  folly::throwSystemError("error in EventBase::loopForever()");
473  }
474 }
475 
477  if (!enableTimeMeasurement_) {
478  return;
479  }
480 
481  VLOG(11) << "EventBase " << this << " " << __PRETTY_FUNCTION__
482  << " (loop) latest " << latestLoopCnt_ << " next " << nextLoopCnt_;
483  if (nothingHandledYet()) {
485  // set the time
487 
488  VLOG(11) << "EventBase " << this << " " << __PRETTY_FUNCTION__
489  << " (loop) startWork_ " << startWork_.time_since_epoch().count();
490  }
491 }
492 
494  VLOG(5) << "EventBase(): Received terminateLoopSoon() command.";
495 
496  // Set stop to true, so the event loop will know to exit.
497  stop_.store(true, std::memory_order_relaxed);
498 
499  // Call event_base_loopbreak() so that libevent will exit the next time
500  // around the loop.
501  event_base_loopbreak(evb_);
502 
503  // If terminateLoopSoon() is called from another thread,
504  // the EventBase thread might be stuck waiting for events.
505  // In this case, it won't wake up and notice that stop_ is set until it
506  // receives another event. Send an empty frame to the notification queue
507  // so that the event loop will wake up even if there are no other events.
508  //
509  // We don't care about the return value of trySendFrame(). If it fails
510  // this likely means the EventBase already has lots of events waiting
511  // anyway.
512  try {
513  queue_->putMessage(nullptr);
514  } catch (...) {
515  // We don't care if putMessage() fails. This likely means
516  // the EventBase already has lots of events waiting anyway.
517  }
518 }
519 
520 void EventBase::runInLoop(LoopCallback* callback, bool thisIteration) {
522  callback->cancelLoopCallback();
524  if (runOnceCallbacks_ != nullptr && thisIteration) {
525  runOnceCallbacks_->push_back(*callback);
526  } else {
527  loopCallbacks_.push_back(*callback);
528  }
529 }
530 
531 void EventBase::runInLoop(Func cob, bool thisIteration) {
533  auto wrapper = new FunctionLoopCallback(std::move(cob));
534  wrapper->context_ = RequestContext::saveContext();
535  if (runOnceCallbacks_ != nullptr && thisIteration) {
536  runOnceCallbacks_->push_back(*wrapper);
537  } else {
538  loopCallbacks_.push_back(*wrapper);
539  }
540 }
541 
543  std::lock_guard<std::mutex> lg(onDestructionCallbacksMutex_);
544  callback->cancelLoopCallback();
545  onDestructionCallbacks_.push_back(*callback);
546 }
547 
550  callback->cancelLoopCallback();
551  runBeforeLoopCallbacks_.push_back(*callback);
552 }
553 
555  // Send the message.
556  // It will be received by the FunctionRunner in the EventBase's thread.
557 
558  // We try not to schedule nullptr callbacks
559  if (!fn) {
560  LOG(ERROR) << "EventBase " << this
561  << ": Scheduling nullptr callbacks is not allowed";
562  return false;
563  }
564 
565  // Short-circuit if we are already in our event base
566  if (inRunningEventBaseThread()) {
567  runInLoop(std::move(fn));
568  return true;
569  }
570 
571  try {
572  queue_->putMessage(std::move(fn));
573  } catch (const std::exception& ex) {
574  LOG(ERROR) << "EventBase " << this << ": failed to schedule function "
575  << "for EventBase thread: " << ex.what();
576  return false;
577  }
578 
579  return true;
580 }
581 
583  if (inRunningEventBaseThread()) {
584  LOG(ERROR) << "EventBase " << this << ": Waiting in the event loop is not "
585  << "allowed";
586  return false;
587  }
588 
589  Baton<> ready;
590  runInEventBaseThread([&ready, fn = std::move(fn)]() mutable {
591  SCOPE_EXIT {
592  ready.post();
593  };
594  // A trick to force the stored functor to be executed and then destructed
595  // before posting the baton and waking the waiting thread.
596  copy(std::move(fn))();
597  });
598  ready.wait();
599 
600  return true;
601 }
602 
604  if (isInEventBaseThread()) {
605  fn();
606  return true;
607  } else {
609  }
610 }
611 
614  if (!loopCallbacks_.empty()) {
615  // Swap the loopCallbacks_ list with a temporary list on our stack.
616  // This way we will only run callbacks scheduled at the time
617  // runLoopCallbacks() was invoked.
618  //
619  // If any of these callbacks in turn call runInLoop() to schedule more
620  // callbacks, those new callbacks won't be run until the next iteration
621  // around the event loop. This prevents runInLoop() callbacks from being
622  // able to start file descriptor and timeout based events.
623  LoopCallbackList currentCallbacks;
624  currentCallbacks.swap(loopCallbacks_);
625  runOnceCallbacks_ = &currentCallbacks;
626 
627  while (!currentCallbacks.empty()) {
628  LoopCallback* callback = &currentCallbacks.front();
629  currentCallbacks.pop_front();
631  callback->runLoopCallback();
632  }
633 
634  runOnceCallbacks_ = nullptr;
635  return true;
636  }
637  return false;
638 }
639 
641  // Infinite size queue
642  queue_ = std::make_unique<NotificationQueue<Func>>();
643 
644  // We allocate fnRunner_ separately, rather than declaring it directly
645  // as a member of EventBase solely so that we don't need to include
646  // NotificationQueue.h from EventBase.h
647  fnRunner_ = std::make_unique<FunctionRunner>();
648 
649  // Mark this as an internal event, so event_base_loop() will return if
650  // there are no other events besides this one installed.
651  //
652  // Most callers don't care about the internal notification queue used by
653  // EventBase. The queue is always installed, so if we did count the queue as
654  // an active event, loop() would never exit with no more events to process.
655  // Users can use loopForever() if they do care about the notification queue.
656  // (This is useful for EventBase threads that do nothing but process
657  // runInEventBaseThread() notifications.)
658  fnRunner_->startConsumingInternal(this, queue_.get());
659 }
660 
662  std::chrono::microseconds timeInterval) {
663  expCoeff_ = -1.0 / timeInterval.count();
664  VLOG(11) << "expCoeff_ " << expCoeff_ << " " << __PRETTY_FUNCTION__;
665 }
666 
668  value_ = value;
669 }
670 
672  std::chrono::microseconds total,
673  std::chrono::microseconds busy) {
674  if ((buffer_time_ + total) > buffer_interval_ && buffer_cnt_ > 0) {
675  // See https://en.wikipedia.org/wiki/Exponential_smoothing for
676  // more info on this calculation.
677  double coeff = exp(buffer_time_.count() * expCoeff_);
678  value_ =
679  value_ * coeff + (1.0 - coeff) * (busy_buffer_.count() / buffer_cnt_);
680  buffer_time_ = std::chrono::microseconds{0};
681  busy_buffer_ = std::chrono::microseconds{0};
682  buffer_cnt_ = 0;
683  }
684  buffer_time_ += total;
685  busy_buffer_ += busy;
686  buffer_cnt_++;
687 }
688 
690  VLOG(11) << "latest " << latestLoopCnt_ << " next " << nextLoopCnt_;
691  return (nextLoopCnt_ != latestLoopCnt_);
692 }
693 
695  struct event* ev = obj->getEvent();
696  assert(ev->ev_base == nullptr);
697 
698  event_base_set(getLibeventBase(), ev);
699  if (internal == AsyncTimeout::InternalEnum::INTERNAL) {
700  // Set the EVLIST_INTERNAL flag
701  event_ref_flags(ev) |= EVLIST_INTERNAL;
702  }
703 }
704 
706  cancelTimeout(obj);
707  struct event* ev = obj->getEvent();
708  ev->ev_base = nullptr;
709 }
710 
712  AsyncTimeout* obj,
715  // Set up the timeval and add the event
716  struct timeval tv;
717  tv.tv_sec = long(timeout.count() / 1000LL);
718  tv.tv_usec = long((timeout.count() % 1000LL) * 1000LL);
719 
720  struct event* ev = obj->getEvent();
721 
722  DCHECK(ev->ev_base);
723 
724  if (event_add(ev, &tv) < 0) {
725  LOG(ERROR) << "EventBase: failed to schedule timeout: " << errnoStr(errno);
726  return false;
727  }
728 
729  return true;
730 }
731 
734  struct event* ev = obj->getEvent();
736  event_del(ev);
737  }
738 }
739 
742  name_ = name;
743 
744  if (isRunning()) {
745  setThreadName(loopThread_.load(std::memory_order_relaxed), name_);
746  }
747 }
748 
751  return name_;
752 }
753 
754 void EventBase::scheduleAt(Func&& fn, TimePoint const& timeout) {
755  auto duration = timeout - now();
757  std::move(fn),
758  std::chrono::duration_cast<std::chrono::milliseconds>(duration));
759 }
760 
762  return event_get_version();
763 }
765  return event_get_method();
766 }
767 
770  virtualEventBase_ = std::make_unique<VirtualEventBase>(*this);
771  });
772 
773  return *virtualEventBase_;
774 }
775 
777  return this;
778 }
779 
780 constexpr std::chrono::milliseconds EventBase::SmoothLoopTime::buffer_interval_;
781 } // namespace folly
chrono
Definition: CMakeCache.txt:563
const std::string & getName()
Definition: EventBase.cpp:749
std::chrono::microseconds maxLatency_
Definition: EventBase.h:741
void messageAvailable(Func &&msg) noexceptoverride
Definition: EventBase.cpp:45
flags
Definition: http_parser.h:127
std::unique_ptr< NotificationQueue< Func > > queue_
Definition: EventBase.h:734
LoopCallbackList runBeforeLoopCallbacks_
Definition: EventBase.h:712
LoopCallbackList onDestructionCallbacks_
Definition: EventBase.h:713
static constexpr std::chrono::milliseconds buffer_interval_
Definition: EventBase.h:584
std::chrono::milliseconds timeout_type
Optional< std::string > getThreadName(std::thread::id id)
Definition: ThreadName.cpp:90
size_t getNotificationQueueSize() const
Definition: EventBase.cpp:197
std::mutex onDestructionCallbacksMutex_
Definition: EventBase.h:780
LoopCallbackList * runOnceCallbacks_
Definition: EventBase.h:719
bool runInEventBaseThreadAndWait(void(*fn)(T *), T *arg)
Definition: EventBase.h:799
std::chrono::steady_clock::time_point TimePoint
~EventBase() override
Definition: EventBase.cpp:148
static std::mutex libevent_mutex_
Definition: EventBase.cpp:70
bool loopBody(int flags=0, bool ignoreKeepAlive=false)
Definition: EventBase.cpp:275
std::atomic< std::thread::id > loopThread_
Definition: EventBase.h:727
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
std::chrono::steady_clock::time_point now()
bool runImmediatelyOrRunInEventBaseThreadAndWait(void(*fn)(T *), T *arg)
Definition: EventBase.h:804
bool runLoopCallbacks()
Definition: EventBase.cpp:612
STL namespace.
void applyLoopKeepAlive()
Definition: EventBase.cpp:433
#define SCOPE_EXIT
Definition: ScopeGuard.h:274
std::unordered_set< detail::EventBaseLocalBaseBase * > localStorageToDtor_
Definition: EventBase.h:787
void setTimeInterval(std::chrono::microseconds timeInterval)
Definition: EventBase.cpp:661
virtual void runLoopCallback() noexcept=0
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
std::string name_
Definition: EventBase.h:777
requires E e noexcept(noexcept(s.error(std::move(e))))
struct event * getEvent()
Definition: AsyncTimeout.h:150
#define nullptr
Definition: http_parser.c:41
void detachTimeoutManager(AsyncTimeout *obj) final
Definition: EventBase.cpp:705
bool scheduleTimeout(AsyncTimeout *obj, TimeoutManager::timeout_type timeout) final
Definition: EventBase.cpp:711
void scheduleAt(Func &&fn, TimePoint const &timeout) override
Definition: EventBase.cpp:754
auto event_ref_flags(struct event *ev) -> decltype(std::ref(ev->ev_flags))
Definition: EventUtil.h:41
static const char * getLibeventVersion()
Definition: EventBase.cpp:761
FOLLY_ALWAYS_INLINE void call_once(basic_once_flag< Mutex, Atom > &flag, F &&f, Args &&...args)
Definition: CallOnce.h:56
const char * name
Definition: http_parser.c:437
virtual TimePoint now()
Get this executor&#39;s notion of time. Must be threadsafe.
bool inRunningEventBaseThread() const
Definition: EventBase.h:509
constexpr std::decay< T >::type copy(T &&value) noexcept(noexcept(typename std::decay< T >::type(std::forward< T >(value))))
Definition: Utility.h:72
void bumpHandlingTime() final
Definition: EventBase.cpp:476
FOLLY_ALWAYS_INLINE void wait(const WaitOptions &opt=wait_options()) noexcept
Definition: Baton.h:170
ssize_t loopKeepAliveCount()
Definition: EventBase.cpp:423
void addSample(std::chrono::microseconds total, std::chrono::microseconds busy)
Definition: EventBase.cpp:671
bool isInEventBaseThread() const
Definition: EventBase.h:504
static std::shared_ptr< RequestContext > saveContext()
Definition: Request.h:196
std::shared_ptr< EventBaseObserver > observer_
Definition: EventBase.h:770
void runInLoop(LoopCallback *callback, bool thisIteration=false)
Definition: EventBase.cpp:520
static const char * getLibeventMethod()
Definition: EventBase.cpp:764
void checkIsInEventBaseThread() const
Definition: EventBase.cpp:205
void reset(double value=0.0)
Definition: EventBase.cpp:667
bool loopOnce(int flags=0)
Definition: EventBase.cpp:271
std::unique_ptr< VirtualEventBase > virtualEventBase_
Definition: EventBase.h:790
void terminateLoopSoon()
Definition: EventBase.cpp:493
EventBase * getEventBase() override
Implements the IOExecutor interface.
Definition: EventBase.cpp:776
event_base * getLibeventBase() const
Definition: EventBase.h:537
bool runInEventBaseThread(void(*fn)(T *), T *arg)
Definition: EventBase.h:794
auto lock(Synchronized< D, M > &synchronized, Args &&...args)
static bool isEventRegistered(const struct event *ev)
Definition: EventUtil.h:51
LoopCallback::List LoopCallbackList
Definition: EventBase.h:698
event_base * evb_
Definition: EventBase.h:730
void cancelTimeout(AsyncTimeout *obj) final
Definition: EventBase.cpp:732
void scheduleTimeoutFn(F fn, std::chrono::milliseconds timeout)
Definition: HHWheelTimer.h:229
std::chrono::steady_clock::time_point startWork_
Definition: EventBase.h:762
void dcheckIsInEventBaseThread() const
Definition: EventBase.h:520
void post() noexcept
Definition: Baton.h:123
FOLLY_CPP14_CONSTEXPR Value value_or(U &&dflt) const &
Definition: Optional.h:330
fbstring errnoStr(int err)
Definition: String.cpp:463
SmoothLoopTime maxLatencyLoopTime_
Definition: EventBase.h:749
void attachTimeoutManager(AsyncTimeout *obj, TimeoutManager::InternalEnum internal) final
Definition: EventBase.cpp:694
std::atomic< bool > stop_
Definition: EventBase.h:723
ExecutionObserver * executionObserver_
Definition: EventBase.h:774
bool setThreadName(std::thread::id tid, StringPiece name)
Definition: ThreadName.cpp:109
void setLoadAvgMsec(std::chrono::milliseconds ms)
Definition: EventBase.cpp:224
bool isRunning() const
Definition: EventBase.h:487
ssize_t loopKeepAliveCount_
Definition: EventBase.h:736
void setMaxReadAtOnce(uint32_t maxAtOnce)
Definition: EventBase.cpp:201
const bool enableTimeMeasurement_
Definition: EventBase.h:757
std::mutex mutex
std::size_t nextLoopCnt_
Definition: EventBase.h:760
const char * string
Definition: Conv.cpp:212
std::shared_ptr< RequestContext > context_
Definition: EventBase.h:172
const
Definition: upload.py:398
void dampen(double factor)
Definition: EventBase.h:574
void setName(const std::string &name)
Definition: EventBase.cpp:740
uint64_t value(const typename LockFreeRingBuffer< T, Atom >::Cursor &rbcursor)
std::unique_ptr< FunctionRunner > fnRunner_
Definition: EventBase.h:735
std::size_t latestLoopCnt_
Definition: EventBase.h:761
std::atomic< ssize_t > loopKeepAliveCountAtomic_
Definition: EventBase.h:737
void waitUntilRunning()
Definition: EventBase.cpp:249
LoopCallbackList loopCallbacks_
Definition: EventBase.h:711
void runOnDestruction(LoopCallback *callback)
Definition: EventBase.cpp:542
void throwSystemError(Args &&...args)
Definition: Exception.h:76
bool loopKeepAliveActive_
Definition: EventBase.h:738
static std::chrono::milliseconds getTimeDelta(std::chrono::steady_clock::time_point *prev)
Definition: EventBase.cpp:241
#define UNLIKELY(x)
Definition: Likely.h:48
void runBeforeLoop(LoopCallback *callback)
Definition: EventBase.cpp:548
SmoothLoopTime avgLoopTime_
Definition: EventBase.h:744
folly::VirtualEventBase & getVirtualEventBase()
Definition: EventBase.cpp:768
bool nothingHandledYet() const noexcept
Definition: EventBase.cpp:689
HHWheelTimer & timer()
Definition: EventBase.h:526
folly::once_flag virtualEventBaseInitFlag_
Definition: EventBase.h:789
uint32_t observerSampleCount_
Definition: EventBase.h:771
void resetLoadAvg(double value=0.0)
Definition: EventBase.cpp:235
void initNotificationQueue()
Definition: EventBase.cpp:640
static unordered_set< string > us
bool loopIgnoreKeepAlive()
Definition: EventBase.cpp:260