proxygen
ObserverManager.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2016-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
17 
18 #include <folly/ExceptionString.h>
19 #include <folly/Format.h>
20 #include <folly/MPMCQueue.h>
21 #include <folly/Range.h>
22 #include <folly/Singleton.h>
25 
26 namespace folly {
27 namespace observer_detail {
28 
29 FOLLY_TLS bool ObserverManager::inManagerThread_{false};
30 FOLLY_TLS ObserverManager::DependencyRecorder::Dependencies*
32 
34  observer_manager_pool_size,
35  4,
36  "How many internal threads ObserverManager should use");
37 
38 static constexpr StringPiece kObserverManagerThreadNamePrefix{"ObserverMngr"};
39 
40 namespace {
41 constexpr size_t kCurrentQueueSize{10 * 1024};
42 constexpr size_t kNextQueueSize{10 * 1024};
43 } // namespace
44 
46  public:
47  CurrentQueue() : queue_(kCurrentQueueSize) {
48  if (FLAGS_observer_manager_pool_size < 1) {
49  LOG(ERROR) << "--observer_manager_pool_size should be >= 1";
50  FLAGS_observer_manager_pool_size = 1;
51  }
52  for (int32_t i = 0; i < FLAGS_observer_manager_pool_size; ++i) {
53  threads_.emplace_back([this, i]() {
57 
58  while (true) {
59  Function<void()> task;
60  queue_.blockingRead(task);
61 
62  if (!task) {
63  return;
64  }
65 
66  try {
67  task();
68  } catch (...) {
69  LOG(ERROR) << "Exception while running CurrentQueue task: "
70  << exceptionStr(std::current_exception());
71  }
72  }
73  });
74  }
75  }
76 
78  for (size_t i = 0; i < threads_.size(); ++i) {
79  queue_.blockingWrite(nullptr);
80  }
81 
82  for (auto& thread : threads_) {
83  thread.join();
84  }
85 
86  CHECK(queue_.isEmpty());
87  }
88 
89  void add(Function<void()> task) {
91  if (!queue_.write(std::move(task))) {
92  throw std::runtime_error("Too many Observers scheduled for update.");
93  }
94  } else {
95  queue_.blockingWrite(std::move(task));
96  }
97  }
98 
99  private:
101  std::vector<std::thread> threads_;
102 };
103 
105  public:
106  explicit NextQueue(ObserverManager& manager)
107  : manager_(manager), queue_(kNextQueueSize) {
108  thread_ = std::thread([&]() {
109  Core::WeakPtr queueCoreWeak;
110 
111  while (true) {
112  queue_.blockingRead(queueCoreWeak);
113  if (stop_) {
114  return;
115  }
116 
117  std::vector<Core::Ptr> cores;
118  {
119  auto queueCore = queueCoreWeak.lock();
120  if (!queueCore) {
121  continue;
122  }
123  cores.emplace_back(std::move(queueCore));
124  }
125 
126  {
127  SharedMutexReadPriority::WriteHolder wh(manager_.versionMutex_);
128 
129  // We can't pick more tasks from the queue after we bumped the
130  // version, so we have to do this while holding the lock.
131  while (cores.size() < kNextQueueSize && queue_.read(queueCoreWeak)) {
132  if (stop_) {
133  return;
134  }
135  if (auto queueCore = queueCoreWeak.lock()) {
136  cores.emplace_back(std::move(queueCore));
137  }
138  }
139 
140  ++manager_.version_;
141  }
142 
143  for (auto& core : cores) {
144  manager_.scheduleRefresh(std::move(core), manager_.version_, true);
145  }
146  }
147  });
148  }
149 
150  void add(Core::WeakPtr core) {
151  queue_.blockingWrite(std::move(core));
152  }
153 
155  stop_ = true;
156  // Write to the queue to notify the thread.
157  queue_.blockingWrite(Core::WeakPtr());
158  thread_.join();
159  }
160 
161  private:
164  std::thread thread_;
165  std::atomic<bool> stop_{false};
166 };
167 
169  currentQueue_ = std::make_unique<CurrentQueue>();
170  nextQueue_ = std::make_unique<NextQueue>(*this);
171 }
172 
174  // Destroy NextQueue, before the rest of this object, since it expects
175  // ObserverManager to be alive.
176  nextQueue_.reset();
177  currentQueue_.reset();
178 }
179 
181  currentQueue_->add(std::move(task));
182 }
183 
185  nextQueue_->add(std::move(core));
186 }
187 
190  // MSVC 2015 doesn't let us access ObserverManager's constructor if we
191  // try to use a lambda to initialize instance, so we have to create
192  // an actual function instead.
194  return new ObserverManager();
195  }
196 };
197 
199  createManager);
200 
201 std::shared_ptr<ObserverManager> ObserverManager::getInstance() {
202  return Singleton::instance.try_get();
203 }
204 } // namespace observer_detail
205 } // namespace folly
std::string sformat(StringPiece fmt, Args &&...args)
Definition: Format.h:280
fbstring exceptionStr(const std::exception &e)
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
std::unique_ptr< CurrentQueue > currentQueue_
static constexpr StringPiece kObserverManagerThreadNamePrefix
std::weak_ptr< Core > WeakPtr
Definition: Core.h:41
bool setThreadName(std::thread::id tid, StringPiece name)
Definition: ThreadName.cpp:109
std::unique_ptr< NextQueue > nextQueue_
DEFINE_int32(observer_manager_pool_size, 4,"How many internal threads ObserverManager should use")
static std::shared_ptr< ObserverManager > getInstance()
static folly::Singleton< ObserverManager > instance