proxygen
FiberManagerMap.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2015-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
17 
18 #include <memory>
19 #include <unordered_map>
20 
21 #include <folly/Synchronized.h>
22 #include <folly/ThreadLocal.h>
23 
24 namespace folly {
25 namespace fibers {
26 
27 namespace {
28 
29 template <typename EventBaseT>
30 class EventBaseOnDestructionCallback : public EventBase::LoopCallback {
31  public:
32  explicit EventBaseOnDestructionCallback(EventBaseT& evb) : evb_(evb) {}
33  void runLoopCallback() noexcept override;
34 
35  private:
36  EventBaseT& evb_;
37 };
38 
39 template <typename EventBaseT>
40 class GlobalCache {
41  public:
42  static FiberManager& get(EventBaseT& evb, const FiberManager::Options& opts) {
43  return instance().getImpl(evb, opts);
44  }
45 
46  static std::unique_ptr<FiberManager> erase(EventBaseT& evb) {
47  return instance().eraseImpl(evb);
48  }
49 
50  private:
51  GlobalCache() {}
52 
53  // Leak this intentionally. During shutdown, we may call getFiberManager,
54  // and want access to the fiber managers during that time.
55  static GlobalCache& instance() {
56  static auto ret = new GlobalCache();
57  return *ret;
58  }
59 
60  FiberManager& getImpl(EventBaseT& evb, const FiberManager::Options& opts) {
61  std::lock_guard<std::mutex> lg(mutex_);
62 
63  auto& fmPtrRef = map_[&evb];
64 
65  if (!fmPtrRef) {
66  auto loopController = std::make_unique<EventBaseLoopController>();
67  loopController->attachEventBase(evb);
68  evb.runOnDestruction(new EventBaseOnDestructionCallback<EventBaseT>(evb));
69 
70  fmPtrRef =
71  std::make_unique<FiberManager>(std::move(loopController), opts);
72  }
73 
74  return *fmPtrRef;
75  }
76 
77  std::unique_ptr<FiberManager> eraseImpl(EventBaseT& evb) {
78  std::lock_guard<std::mutex> lg(mutex_);
79 
80  DCHECK_EQ(map_.count(&evb), 1u);
81 
82  auto ret = std::move(map_[&evb]);
83  map_.erase(&evb);
84  return ret;
85  }
86 
88  std::unordered_map<EventBaseT*, std::unique_ptr<FiberManager>> map_;
89 };
90 
91 constexpr size_t kEraseListMaxSize = 64;
92 
93 template <typename EventBaseT>
94 class ThreadLocalCache {
95  public:
96  static FiberManager& get(EventBaseT& evb, const FiberManager::Options& opts) {
97  return instance()->getImpl(evb, opts);
98  }
99 
100  static void erase(EventBaseT& evb) {
101  for (auto& localInstance : instance().accessAllThreads()) {
102  localInstance.eraseInfo_.withWLock([&](auto& info) {
103  if (info.eraseList.size() >= kEraseListMaxSize) {
104  info.eraseAll = true;
105  } else {
106  info.eraseList.push_back(&evb);
107  }
108  localInstance.eraseRequested_ = true;
109  });
110  }
111  }
112 
113  private:
114  ThreadLocalCache() {}
115 
116  struct ThreadLocalCacheTag {};
117  using ThreadThreadLocalCache =
118  ThreadLocal<ThreadLocalCache, ThreadLocalCacheTag>;
119 
120  // Leak this intentionally. During shutdown, we may call getFiberManager,
121  // and want access to the fiber managers during that time.
122  static ThreadThreadLocalCache& instance() {
123  static auto ret =
124  new ThreadThreadLocalCache([]() { return new ThreadLocalCache(); });
125  return *ret;
126  }
127 
128  FiberManager& getImpl(EventBaseT& evb, const FiberManager::Options& opts) {
129  eraseImpl();
130 
131  auto& fmPtrRef = map_[&evb];
132  if (!fmPtrRef) {
133  fmPtrRef = &GlobalCache<EventBaseT>::get(evb, opts);
134  }
135 
136  DCHECK(fmPtrRef != nullptr);
137 
138  return *fmPtrRef;
139  }
140 
141  void eraseImpl() {
142  if (!eraseRequested_.load()) {
143  return;
144  }
145 
146  eraseInfo_.withWLock([&](auto& info) {
147  if (info.eraseAll) {
148  map_.clear();
149  } else {
150  for (auto evbPtr : info.eraseList) {
151  map_.erase(evbPtr);
152  }
153  }
154 
155  info.eraseList.clear();
156  info.eraseAll = false;
157  eraseRequested_ = false;
158  });
159  }
160 
161  std::unordered_map<EventBaseT*, FiberManager*> map_;
162  std::atomic<bool> eraseRequested_{false};
163 
164  struct EraseInfo {
165  bool eraseAll{false};
166  std::vector<EventBaseT*> eraseList;
167  };
168 
170 };
171 
172 template <typename EventBaseT>
173 void EventBaseOnDestructionCallback<EventBaseT>::runLoopCallback() noexcept {
174  auto fm = GlobalCache<EventBaseT>::erase(evb_);
175  DCHECK(fm.get() != nullptr);
176  ThreadLocalCache<EventBaseT>::erase(evb_);
177 
178  delete this;
179 }
180 
181 } // namespace
182 
184  EventBase& evb,
185  const FiberManager::Options& opts) {
186  return ThreadLocalCache<EventBase>::get(evb, opts);
187 }
188 
190  VirtualEventBase& evb,
191  const FiberManager::Options& opts) {
192  return ThreadLocalCache<VirtualEventBase>::get(evb, opts);
193 }
194 } // namespace fibers
195 } // namespace folly
def info()
Definition: deadlock.py:447
std::mutex mutex_
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
folly::Synchronized< EraseInfo > eraseInfo_
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
requires E e noexcept(noexcept(s.error(std::move(e))))
Single-threaded task execution engine.
EventBaseT & evb_
std::vector< EventBaseT * > eraseList
bool eraseAll
std::atomic< bool > eraseRequested_
std::mutex mutex
std::unordered_map< EventBaseT *, std::unique_ptr< FiberManager > > map_
PUSHMI_INLINE_VAR constexpr detail::get_fn< T > get
Definition: submit.h:391
FiberManager & getFiberManager(EventBase &evb, const FiberManager::Options &opts)