18 #include <glog/logging.h> 26 namespace observer_detail {
61 return instance->version_;
70 if (core->getVersion() >= minVersion) {
80 SharedMutexReadPriority::ReadHolder rh(instance->versionMutex_);
89 &instance->versionMutex_,
94 instance->scheduleCurrent([core =
std::move(core),
95 instancePtr = instance.get(),
100 &instancePtr->versionMutex_,
105 core->refresh(instancePtr->version_, force);
116 instance->scheduleNext(
std::move(coreWeak));
120 DCHECK(core->getVersion() == 0);
124 throw std::logic_error(
"ObserverManager requested during shutdown");
132 SharedMutexReadPriority::ReadHolder rh(instance->versionMutex_);
134 core->refresh(instance->version_,
false);
167 instance->cycleDetector_.withLock([&](
CycleDetector& cycleDetector) {
171 throw std::logic_error(
"Observer cycle detected.");
183 instance->cycleDetector_.withLock([&](
CycleDetector& cycleDetector) {
226 static std::shared_ptr<ObserverManager>
getInstance();
SharedMutexReadPriority versionMutex_
static bool inManagerThread()
Dependencies dependencies_
static FOLLY_ALWAYS_INLINE void annotate_rwlock_released(void const volatile *const addr, annotate_rwlock_level const w, char const *const f, int const l)
static void scheduleRefreshNewVersion(Core::WeakPtr coreWeak)
static FOLLY_TLS bool inManagerThread_
DependencySet dependencies
constexpr detail::Map< Move > move
std::unordered_set< Core::Ptr > DependencySet
void scheduleCurrent(Function< void()>)
—— Concurrent Priority Queue Implementation ——
folly::Synchronized< CycleDetector, std::mutex > cycleDetector_
Dependencies(const Core &core_)
std::unique_ptr< CurrentQueue > currentQueue_
DependencyRecorder(const Core &core)
Dependencies * previousDepedencies_
void removeEdge(NodeId from, NodeId to)
static void markRefreshDependency(const Core &core)
std::shared_ptr< Core > Ptr
static void initCore(Core::Ptr core)
static void markDependency(Core::Ptr dependency)
static FOLLY_TLS Dependencies * currentDependencies_
std::weak_ptr< Core > WeakPtr
T exchange(T &obj, U &&new_value)
std::unique_ptr< NextQueue > nextQueue_
static std::shared_ptr< ObserverManager > getInstance()
static void unmarkRefreshDependency(const Core &core)
static FOLLY_ALWAYS_INLINE void annotate_rwlock_acquired(void const volatile *const addr, annotate_rwlock_level const w, char const *const f, int const l)
static size_t getVersion()
void swap(SwapTrackingAlloc< T > &, SwapTrackingAlloc< T > &)
void scheduleNext(Core::WeakPtr)
static void scheduleRefresh(Core::Ptr core, size_t minVersion, bool force=false)
bool addEdge(NodeId from, NodeId to)
std::atomic< size_t > version_