proxygen
Core.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2016-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include <folly/ExceptionString.h>
21 
22 namespace folly {
23 namespace observer_detail {
24 
27  return data_.copy();
28  }
29 
31 
33 
34  if (version_ >= version) {
35  return data_.copy();
36  }
37 
39 
40  DCHECK_GE(version_, version);
41  return data_.copy();
42 }
43 
44 size_t Core::refresh(size_t version, bool force) {
46 
48  SCOPE_EXIT {
50  };
51 
52  if (version_ >= version) {
53  return versionLastChange_;
54  }
55 
56  {
57  std::lock_guard<std::mutex> lgRefresh(refreshMutex_);
58 
59  // Recheck in case this code was already refreshed
60  if (version_ >= version) {
61  return versionLastChange_;
62  }
63 
64  bool needRefresh = force || version_ == 0;
65 
66  ObserverManager::DependencyRecorder dependencyRecorder(*this);
67 
68  // This can be run in parallel, but we expect most updates to propagate
69  // bottom to top.
70  dependencies_.withRLock([&](const Dependencies& dependencies) {
71  for (const auto& dependency : dependencies) {
72  try {
73  if (dependency->refresh(version) > version_) {
74  needRefresh = true;
75  break;
76  }
77  } catch (...) {
78  LOG(ERROR) << "Exception while checking dependencies for updates: "
79  << exceptionStr(std::current_exception());
80 
81  needRefresh = true;
82  break;
83  }
84  }
85  });
86 
87  if (!needRefresh) {
88  version_ = version;
89  return versionLastChange_;
90  }
91 
92  try {
93  {
94  VersionedData newData{creator_(), version};
95  if (!newData.data) {
96  throw std::logic_error("Observer creator returned nullptr.");
97  }
98  data_.swap(newData);
99  }
100 
102  } catch (...) {
103  LOG(ERROR) << "Exception while refreshing Observer: "
104  << exceptionStr(std::current_exception());
105 
106  if (version_ == 0) {
107  // Re-throw exception if this is the first time we run creator
108  throw;
109  }
110  }
111 
112  version_ = version;
113 
114  if (versionLastChange_ != version) {
115  return versionLastChange_;
116  }
117 
118  auto newDependencies = dependencyRecorder.release();
119  dependencies_.withWLock([&](Dependencies& dependencies) {
120  for (const auto& dependency : newDependencies) {
121  if (!dependencies.count(dependency)) {
122  dependency->addDependent(this->shared_from_this());
123  }
124  }
125 
126  for (const auto& dependency : dependencies) {
127  if (!newDependencies.count(dependency)) {
128  dependency->removeStaleDependents();
129  }
130  }
131 
132  dependencies = std::move(newDependencies);
133  });
134  }
135 
136  auto dependents = dependents_.copy();
137 
138  for (const auto& dependentWeak : dependents) {
139  if (auto dependent = dependentWeak.lock()) {
140  ObserverManager::scheduleRefresh(std::move(dependent), version);
141  }
142  }
143 
144  return versionLastChange_;
145 }
146 
147 Core::Core(folly::Function<std::shared_ptr<const void>()> creator)
148  : creator_(std::move(creator)) {}
149 
151  dependencies_.withWLock([](const Dependencies& dependencies) {
152  for (const auto& dependecy : dependencies) {
153  dependecy->removeStaleDependents();
154  }
155  });
156 }
157 
158 Core::Ptr Core::create(folly::Function<std::shared_ptr<const void>()> creator) {
159  auto core = Core::Ptr(new Core(std::move(creator)));
160  return core;
161 }
162 
164  dependents_.withWLock([&](Dependents& dependents) {
165  dependents.push_back(std::move(dependent));
166  });
167 }
168 
170  // This is inefficient, the assumption is that we won't have many dependents
171  dependents_.withWLock([](Dependents& dependents) {
172  for (size_t i = 0; i < dependents.size(); ++i) {
173  if (dependents[i].expired()) {
174  std::swap(dependents[i], dependents.back());
175  dependents.pop_back();
176  --i;
177  }
178  }
179  });
180 }
181 } // namespace observer_detail
182 } // namespace folly
std::mutex refreshMutex_
Definition: Core.h:112
Core(folly::Function< std::shared_ptr< const void >()> creator)
Definition: Core.cpp:147
VersionedData getData()
Definition: Core.cpp:25
fbstring exceptionStr(const std::exception &e)
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
std::atomic< size_t > versionLastChange_
Definition: Core.h:106
STL namespace.
#define SCOPE_EXIT
Definition: ScopeGuard.h:274
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
folly::Synchronized< Dependents > dependents_
Definition: Core.h:102
ProtocolVersion version
folly::Synchronized< Dependencies > dependencies_
Definition: Core.h:103
std::shared_ptr< Core > Ptr
Definition: Core.h:40
std::unordered_set< Ptr > Dependencies
Definition: Core.h:100
size_t refresh(size_t version, bool force=false)
Definition: Core.cpp:44
void addDependent(Core::WeakPtr dependent)
Definition: Core.cpp:163
folly::Function< std::shared_ptr< const void >)> creator_
Definition: Core.h:110
folly::Synchronized< VersionedData > data_
Definition: Core.h:108
std::weak_ptr< Core > WeakPtr
Definition: Core.h:41
std::vector< WeakPtr > Dependents
Definition: Core.h:99
void swap(SwapTrackingAlloc< T > &, SwapTrackingAlloc< T > &)
Definition: F14TestUtil.h:414
static void scheduleRefresh(Core::Ptr core, size_t minVersion, bool force=false)
std::atomic< size_t > version_
Definition: Core.h:105
static Ptr create(folly::Function< std::shared_ptr< const void >()> creator)
Definition: Core.cpp:158
void copy(T *target) const
Definition: Synchronized.h:721