proxygen
LRUPersistentCache-inl.h
Go to the documentation of this file.
1 /*
2  * Copyright 2017-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17 
18 #include <atomic>
19 #include <cerrno>
20 #include <folly/DynamicConverter.h>
21 #include <folly/FileUtil.h>
22 #include <folly/json.h>
23 #include <folly/ScopeGuard.h>
26 #include <functional>
27 
28 namespace wangle {
29 
30 template <typename K, typename V, typename MutexT>
32  std::size_t cacheCapacity,
33  std::chrono::milliseconds syncInterval,
34  int nSyncRetries,
35  std::unique_ptr<CachePersistence<K, V>> persistence)
37  nullptr,
38  cacheCapacity,
39  syncInterval,
40  nSyncRetries,
41  std::move(persistence)) {
42 }
43 
44 template <typename K, typename V, typename MutexT>
46  std::shared_ptr<folly::Executor> executor,
47  std::size_t cacheCapacity,
48  std::chrono::milliseconds syncInterval,
49  int nSyncRetries,
50  std::unique_ptr<CachePersistence<K, V>> persistence)
51  : cache_(cacheCapacity),
52  syncInterval_(syncInterval),
53  nSyncRetries_(nSyncRetries),
54  executor_(std::move(executor)) {
55  // load the cache. be silent if load fails, we just drop the cache
56  // and start from scratch.
57  if (persistence) {
58  setPersistenceHelper(std::move(persistence), true);
59  }
60  if (!executor_) {
61  // start the syncer thread. done at the end of construction so that the
62  // cache is fully initialized before being passed to the syncer thread.
63  syncer_ =
65  }
66 }
67 
68 template<typename K, typename V, typename MutexT>
70  if (executor_) {
71  // In executor mode, each task holds a weak_ptr to the cache itself. No need
72  // to notify them the cache is dying. We are done here. Alternatively we may
73  // want to do a final sync upon destruction.
74  return;
75  }
76  {
77  // tell syncer to wake up and quit
78  std::lock_guard<std::mutex> lock(stopSyncerMutex_);
79  stopSyncer_ = true;
80  if (syncOnDestroy_) {
81  // Sync on the thread of the destroyer.
82  oneShotSync();
83  }
84  stopSyncerCV_.notify_all();
85  }
86  syncer_.join();
87 }
88 
89 template <typename K, typename V, typename MutexT>
90 void LRUPersistentCache<K, V, MutexT>::put(const K& key, const V& val) {
91  cache_.put(key, val);
92  if (executor_) {
93  if (!executorScheduled_.test_and_set()) {
95  syncInterval_) {
96  // Do not schedule more than once during a syncInterval_ period
97  return;
98  }
99  std::weak_ptr<LRUPersistentCache<K, V, MutexT>> weakSelf =
100  this->shared_from_this();
102  executor_->add([self = std::move(weakSelf)]() {
103  if (auto sharedSelf = self.lock()) {
104  sharedSelf->oneShotSync();
105  }
106  });
107  }
108  }
109 }
110 
111 template<typename K, typename V, typename MutexT>
114  if (!persistence_) {
115  return false;
116  }
117  return cache_.hasChangedSince(persistence_->getLastPersistedVersion());
118 }
119 
120 template<typename K, typename V, typename MutexT>
122  folly::setThreadName("lru-sync-thread");
123 
124  auto self = static_cast<LRUPersistentCache<K, V, MutexT>*>(arg);
125  self->sync();
126  return nullptr;
127 }
128 
129 template <typename K, typename V, typename MutexT>
131  executorScheduled_.clear();
132  auto persistence = getPersistence();
133  if (persistence && !syncNow(*persistence)) {
134  // track failures and give up if we tried too many times
135  ++nSyncTries_;
136  if (nSyncTries_ == nSyncRetries_) {
137  persistence->setPersistedVersion(cache_.getVersion());
138  nSyncTries_ = 0;
139  }
140  } else {
141  nSyncTries_ = 0;
142  }
143 }
144 
145 template<typename K, typename V, typename MutexT>
147  // keep running as long the destructor signals to stop or
148  // there are pending updates that are not synced yet
149  std::unique_lock<std::mutex> stopSyncerLock(stopSyncerMutex_);
150  int nSyncFailures = 0;
151  while (true) {
152  auto persistence = getPersistence();
153  if (stopSyncer_) {
154  if (!persistence ||
155  !cache_.hasChangedSince(persistence->getLastPersistedVersion())) {
156  break;
157  }
158  }
159 
160  if (persistence && !syncNow(*persistence)) {
161  // track failures and give up if we tried too many times
162  ++nSyncFailures;
163  if (nSyncFailures == nSyncRetries_) {
164  persistence->setPersistedVersion(cache_.getVersion());
165  nSyncFailures = 0;
166  }
167  } else {
168  nSyncFailures = 0;
169  }
170 
171  if (!stopSyncer_) {
172  stopSyncerCV_.wait_for(stopSyncerLock, syncInterval_);
173  }
174  }
175 }
176 
177 template<typename K, typename V, typename MutexT>
179  CachePersistence<K, V>& persistence ) {
180  // check if we need to sync. There is a chance that someone can
181  // update cache_ between this check and the convert below, but that
182  // is ok. The persistence layer would have needed to update anyway
183  // and will just get the latest version.
184  if (!cache_.hasChangedSince(persistence.getLastPersistedVersion())) {
185  // nothing to do
186  return true;
187  }
188 
189  // serialize the current contents of cache under lock
190  auto serializedCacheAndVersion = cache_.convertToKeyValuePairs();
191  if (!serializedCacheAndVersion) {
192  LOG(ERROR) << "Failed to convert cache for serialization.";
193  return false;
194  }
195 
196  auto& kvPairs = std::get<0>(serializedCacheAndVersion.value());
197  auto& version = std::get<1>(serializedCacheAndVersion.value());
198  auto persisted =
199  persistence.persistVersionedData(std::move(kvPairs), version);
200 
201  return persisted;
202 }
203 
204 template<typename K, typename V, typename MutexT>
205 std::shared_ptr<CachePersistence<K, V>>
208  return persistence_;
209 }
210 
211 template<typename K, typename V, typename MutexT>
213  std::unique_ptr<CachePersistence<K, V>> persistence,
214  bool syncVersion) noexcept {
216  persistence_ = std::move(persistence);
217  // load the persistence data into memory
218  if (persistence_) {
219  auto version = load(*persistence_);
220  if (syncVersion) {
221  persistence_->setPersistedVersion(version);
222  }
223  }
224 }
225 
226 template<typename K ,typename V, typename MutexT>
228  std::unique_ptr<CachePersistence<K, V>> persistence) {
229  // note that we don't set the persisted version on the persistence like we
230  // do in the constructor since we want any deltas that were in memory but
231  // not in the persistence layer to sync back.
232  setPersistenceHelper(std::move(persistence), false);
233 }
234 
235 template<typename K, typename V, typename MutexT>
237  CachePersistence<K, V>& persistence) noexcept {
238  auto kvPairs = persistence.load();
239  if (!kvPairs) {
240  return false;
241  }
242  return cache_.loadData(kvPairs.value());
243 }
244 
245 }
std::shared_ptr< CachePersistence< K, V > > persistence_
void put(const K &key, const V &val) override
std::condition_variable stopSyncerCV_
uint64_t CacheDataVersion
std::chrono::steady_clock::time_point now()
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
STL namespace.
double val
Definition: String.cpp:273
std::shared_ptr< folly::Executor > executor_
requires E e noexcept(noexcept(s.error(std::move(e))))
std::atomic_flag executorScheduled_
#define nullptr
Definition: http_parser.c:41
PUSHMI_INLINE_VAR constexpr __adl::get_executor_fn executor
std::chrono::steady_clock::time_point lastExecutorScheduleTime_
ProtocolVersion version
void setPersistenceHelper(std::unique_ptr< CachePersistence< K, V >> persistence, bool syncVersion) noexcept
void setPersistence(std::unique_ptr< CachePersistence< K, V >> persistence)
bool persistVersionedData(const folly::dynamic &kvPairs, const CacheDataVersion &version)
LRUInMemoryCache< K, V, MutexT > cache_
auto lock(SynchronizedLocker...lockersIn) -> std::tuple< typename SynchronizedLocker::LockedPtr... >
Definition: Synchronized.h:871
const std::chrono::milliseconds syncInterval_
bool syncNow(CachePersistence< K, V > &persistence)
bool setThreadName(std::thread::id tid, StringPiece name)
Definition: ThreadName.cpp:109
std::shared_ptr< CachePersistence< K, V > > getPersistence()
LRUPersistentCache(std::size_t cacheCapacity, std::chrono::milliseconds syncInterval=client::persistence::DEFAULT_CACHE_SYNC_INTERVAL, int nSyncRetries=client::persistence::DEFAULT_CACHE_SYNC_RETRIES, std::unique_ptr< CachePersistence< K, V >> persistence=nullptr)
static void * syncThreadMain(void *arg)
CacheDataVersion load(CachePersistence< K, V > &persistence) noexcept
virtual CacheDataVersion getLastPersistedVersion() const