proxygen
ThreadWheelTimekeeper.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2014-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
17 
18 #include <folly/Singleton.h>
19 #include <folly/futures/Future.h>
20 #include <future>
21 
22 namespace folly {
23 
24 namespace {
25 Singleton<ThreadWheelTimekeeper> timekeeperSingleton_;
26 
27 // Our Callback object for HHWheelTimer
28 struct WTCallback : public std::enable_shared_from_this<WTCallback>,
30  struct PrivateConstructorTag {};
31 
32  public:
33  WTCallback(PrivateConstructorTag, EventBase* base) : base_(base) {}
34 
35  // Only allow creation by this factory, to ensure heap allocation.
36  static std::shared_ptr<WTCallback> create(EventBase* base) {
37  // optimization opportunity: memory pool
38  auto cob = std::make_shared<WTCallback>(PrivateConstructorTag{}, base);
39  // Capture shared_ptr of cob in lambda so that Core inside Promise will
40  // hold a ref count to it. The ref count will be released when Core goes
41  // away which happens when both Promise and Future go away
42  cob->promise_.setInterruptHandler(
43  [cob](exception_wrapper ew) { cob->interruptHandler(std::move(ew)); });
44  return cob;
45  }
46 
47  Future<Unit> getFuture() {
48  return promise_.getFuture();
49  }
50 
51  FOLLY_NODISCARD Promise<Unit> stealPromise() {
52  // Don't need promise anymore. Break the circular reference as promise_
53  // is holding a ref count to us via Core. Core won't go away until both
54  // Promise and Future go away.
55  return std::move(promise_);
56  }
57 
58  protected:
60  Promise<Unit> promise_;
61 
62  void timeoutExpired() noexcept override {
63  base_ = nullptr;
64  // Don't need Promise anymore, break the circular reference
65  auto promise = stealPromise();
66  if (!promise.isFulfilled()) {
67  promise.setValue();
68  }
69  }
70 
71  void callbackCanceled() noexcept override {
72  base_ = nullptr;
73  // Don't need Promise anymore, break the circular reference
74  auto promise = stealPromise();
75  if (!promise.isFulfilled()) {
76  promise.setException(FutureNoTimekeeper{});
77  }
78  }
79 
80  void interruptHandler(exception_wrapper ew) {
81  auto rBase = base_.rlock();
82  if (!*rBase) {
83  return;
84  }
85  // Capture shared_ptr of self in lambda, if we don't do this, object
86  // may go away before the lambda is executed from event base thread.
87  // This is not racing with timeoutExpired anymore because this is called
88  // through Future, which means Core is still alive and keeping a ref count
89  // on us, so what timeouExpired is doing won't make the object go away
90  (*rBase)->runInEventBaseThread(
91  [me = shared_from_this(), ew = std::move(ew)]() mutable {
92  me->cancelTimeout();
93  // Don't need Promise anymore, break the circular reference
94  auto promise = me->stealPromise();
95  if (!promise.isFulfilled()) {
96  promise.setException(std::move(ew));
97  }
98  });
99  }
100 };
101 
102 } // namespace
103 
105  : thread_([this] { eventBase_.loopForever(); }),
106  wheelTimer_(
107  HHWheelTimer::newTimer(&eventBase_, std::chrono::milliseconds(1))) {
110  // 15 characters max
111  eventBase_.setName("FutureTimekeepr");
112  });
113 }
114 
117  wheelTimer_->cancelAll();
119  });
120  thread_.join();
121 }
122 
124  auto cob = WTCallback::create(&eventBase_);
125  auto f = cob->getFuture();
126  //
127  // Even shared_ptr of cob is captured in lambda this is still somewhat *racy*
128  // because it will be released once timeout is scheduled. So technically there
129  // is no gurantee that EventBase thread can safely call timeout callback.
130  // However due to fact that we are having circular reference here:
131  // WTCallback->Promise->Core->WTCallbak, so three of them won't go away until
132  // we break the circular reference. The break happens either in
133  // WTCallback::timeoutExpired or WTCallback::interruptHandler. Former means
134  // timeout callback is being safely executed. Latter captures shared_ptr of
135  // WTCallback again in another lambda for canceling timeout. The moment
136  // canceling timeout is executed in EventBase thread, the actual timeout
137  // callback has either been executed, or will never be executed. So we are
138  // fine here.
139  //
141  [this, cob, dur] { wheelTimer_->scheduleTimeout(cob.get(), dur); })) {
142  // Release promise to break the circular reference. Because if
143  // scheduleTimeout fails, there is nothing to *promise*. Internally
144  // Core would automatically set an exception result when Promise is
145  // destructed before fulfilling.
146  // This is either called from EventBase thread, or here.
147  // They are somewhat racy but given the rare chance this could fail,
148  // I don't see it is introducing any problem yet.
149  auto promise = cob->stealPromise();
150  if (!promise.isFulfilled()) {
151  promise.setException(FutureNoTimekeeper{});
152  }
153  }
154  return f;
155 }
156 
157 namespace detail {
158 
159 std::shared_ptr<Timekeeper> getTimekeeperSingleton() {
160  return timekeeperSingleton_.try_get();
161 }
162 
163 } // namespace detail
164 
165 } // namespace folly
auto f
std::shared_ptr< Timekeeper > getTimekeeperSingleton()
#define FOLLY_NODISCARD
Definition: Portability.h:64
bool runInEventBaseThreadAndWait(void(*fn)(T *), T *arg)
Definition: EventBase.h:799
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
requires E e noexcept(noexcept(s.error(std::move(e))))
ThreadWheelTimekeeper()
But it doesn&#39;t have to be a singleton.
std::chrono::milliseconds Duration
Definition: Types.h:36
static UniquePtr newTimer(Args &&...args)
Definition: HHWheelTimer.h:61
folly::Synchronized< EventBase * > base_
void terminateLoopSoon()
Definition: EventBase.cpp:493
bool runInEventBaseThread(void(*fn)(T *), T *arg)
Definition: EventBase.h:794
Promise< Unit > promise_
void setName(const std::string &name)
Definition: EventBase.cpp:740
void waitUntilRunning()
Definition: EventBase.cpp:249
Future< Unit > after(Duration) override
HHWheelTimer::UniquePtr wheelTimer_