proxygen
ObserverManager.h
Go to the documentation of this file.
1 /*
2  * Copyright 2016-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17 
18 #include <glog/logging.h>
19 
22 #include <folly/futures/Future.h>
24 
25 namespace folly {
26 namespace observer_detail {
27 
53  public:
54  static size_t getVersion() {
55  auto instance = getInstance();
56 
57  if (!instance) {
58  return 1;
59  }
60 
61  return instance->version_;
62  }
63 
64  static bool inManagerThread() {
65  return inManagerThread_;
66  }
67 
68  static void
69  scheduleRefresh(Core::Ptr core, size_t minVersion, bool force = false) {
70  if (core->getVersion() >= minVersion) {
71  return;
72  }
73 
74  auto instance = getInstance();
75 
76  if (!instance) {
77  return;
78  }
79 
80  SharedMutexReadPriority::ReadHolder rh(instance->versionMutex_);
81 
82  // TSAN assumes that the thread that locks the mutex must
83  // be the one that unlocks it. However, we are passing ownership of
84  // the read lock into the lambda, and the thread that performs the async
85  // work will be the one that unlocks it. To avoid noise with TSAN,
86  // annotate that the thread has released the mutex, and then annotate
87  // the async thread as acquiring the mutex.
89  &instance->versionMutex_,
91  __FILE__,
92  __LINE__);
93 
94  instance->scheduleCurrent([core = std::move(core),
95  instancePtr = instance.get(),
96  rh = std::move(rh),
97  force]() {
98  // Make TSAN know that the current thread owns the read lock now.
100  &instancePtr->versionMutex_,
102  __FILE__,
103  __LINE__);
104 
105  core->refresh(instancePtr->version_, force);
106  });
107  }
108 
109  static void scheduleRefreshNewVersion(Core::WeakPtr coreWeak) {
110  auto instance = getInstance();
111 
112  if (!instance) {
113  return;
114  }
115 
116  instance->scheduleNext(std::move(coreWeak));
117  }
118 
119  static void initCore(Core::Ptr core) {
120  DCHECK(core->getVersion() == 0);
121 
122  auto instance = getInstance();
123  if (!instance) {
124  throw std::logic_error("ObserverManager requested during shutdown");
125  }
126 
128  SCOPE_EXIT {
130  };
131 
132  SharedMutexReadPriority::ReadHolder rh(instance->versionMutex_);
133 
134  core->refresh(instance->version_, false);
135  }
136 
138  public:
139  using DependencySet = std::unordered_set<Core::Ptr>;
140  struct Dependencies {
141  explicit Dependencies(const Core& core_) : core(core_) {}
142 
144  const Core& core;
145  };
146 
147  explicit DependencyRecorder(const Core& core) : dependencies_(core) {
148  DCHECK(inManagerThread());
149 
152  }
153 
154  static void markDependency(Core::Ptr dependency) {
155  DCHECK(inManagerThread());
156  DCHECK(currentDependencies_);
157 
158  currentDependencies_->dependencies.insert(std::move(dependency));
159  }
160 
161  static void markRefreshDependency(const Core& core) {
162  if (!currentDependencies_) {
163  return;
164  }
165 
166  if (auto instance = getInstance()) {
167  instance->cycleDetector_.withLock([&](CycleDetector& cycleDetector) {
168  bool hasCycle =
169  !cycleDetector.addEdge(&currentDependencies_->core, &core);
170  if (hasCycle) {
171  throw std::logic_error("Observer cycle detected.");
172  }
173  });
174  }
175  }
176 
177  static void unmarkRefreshDependency(const Core& core) {
178  if (!currentDependencies_) {
179  return;
180  }
181 
182  if (auto instance = getInstance()) {
183  instance->cycleDetector_.withLock([&](CycleDetector& cycleDetector) {
184  cycleDetector.removeEdge(&currentDependencies_->core, &core);
185  });
186  }
187  }
188 
192  previousDepedencies_ = nullptr;
193 
195  }
196 
199  release();
200  }
201  }
202 
203  private:
206 
208  };
209 
211 
212  private:
213  ObserverManager();
214 
215  struct Singleton;
216 
217  void scheduleCurrent(Function<void()>);
219 
220  class CurrentQueue;
221  class NextQueue;
222 
223  std::unique_ptr<CurrentQueue> currentQueue_;
224  std::unique_ptr<NextQueue> nextQueue_;
225 
226  static std::shared_ptr<ObserverManager> getInstance();
227  static FOLLY_TLS bool inManagerThread_;
228 
239  std::atomic<size_t> version_{1};
240 
243 };
244 } // namespace observer_detail
245 } // namespace folly
static FOLLY_ALWAYS_INLINE void annotate_rwlock_released(void const volatile *const addr, annotate_rwlock_level const w, char const *const f, int const l)
static void scheduleRefreshNewVersion(Core::WeakPtr coreWeak)
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
#define SCOPE_EXIT
Definition: ScopeGuard.h:274
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
folly::Synchronized< CycleDetector, std::mutex > cycleDetector_
std::unique_ptr< CurrentQueue > currentQueue_
std::shared_ptr< Core > Ptr
Definition: Core.h:40
static void initCore(Core::Ptr core)
std::weak_ptr< Core > WeakPtr
Definition: Core.h:41
T exchange(T &obj, U &&new_value)
Definition: Utility.h:120
std::unique_ptr< NextQueue > nextQueue_
static std::shared_ptr< ObserverManager > getInstance()
static FOLLY_ALWAYS_INLINE void annotate_rwlock_acquired(void const volatile *const addr, annotate_rwlock_level const w, char const *const f, int const l)
void swap(SwapTrackingAlloc< T > &, SwapTrackingAlloc< T > &)
Definition: F14TestUtil.h:414
static void scheduleRefresh(Core::Ptr core, size_t minVersion, bool force=false)