proxygen
folly::observer_detail::ObserverManager Class Reference

#include <ObserverManager.h>

Classes

class  CurrentQueue
 
class  DependencyRecorder
 
class  NextQueue
 
struct  Singleton
 

Public Member Functions

 ~ObserverManager ()
 

Static Public Member Functions

static size_t getVersion ()
 
static bool inManagerThread ()
 
static void scheduleRefresh (Core::Ptr core, size_t minVersion, bool force=false)
 
static void scheduleRefreshNewVersion (Core::WeakPtr coreWeak)
 
static void initCore (Core::Ptr core)
 

Private Types

using CycleDetector = GraphCycleDetector< const Core * >
 

Private Member Functions

 ObserverManager ()
 
void scheduleCurrent (Function< void()>)
 
void scheduleNext (Core::WeakPtr)
 

Static Private Member Functions

static std::shared_ptr< ObserverManagergetInstance ()
 

Private Attributes

std::unique_ptr< CurrentQueuecurrentQueue_
 
std::unique_ptr< NextQueuenextQueue_
 
SharedMutexReadPriority versionMutex_
 
std::atomic< size_t > version_ {1}
 
folly::Synchronized< CycleDetector, std::mutexcycleDetector_
 

Static Private Attributes

static FOLLY_TLS bool inManagerThread_ {false}
 

Detailed Description

ObserverManager is a singleton which controls the re-computation of all Observers. Such re-computation always happens on the thread pool owned by ObserverManager.

ObserverManager has global current version. All existing Observers may have their version be less (yet to be updated) or equal (up to date) to the global current version.

ObserverManager::CurrentQueue contains all of the Observers which need to be updated to the global current version. Those updates are peformed on the ObserverManager's thread pool, until the queue is empty. If some Observer is updated, all of its dependents are added to ObserverManager::CurrentQueue to be updated.

If some leaf Observer (i.e. created from Observable) is updated, then current version of the ObserverManager should be bumped. All such updated leaf Observers are added to the ObserverManager::NextQueue.

Only when ObserverManager::CurrentQueue is empty, the global current version is bumped and all updates from the ObserverManager::NextQueue are performed. If leaf Observer gets updated more then once before being picked from the ObserverManager::NextQueue, then only the last update is processed.

Definition at line 52 of file ObserverManager.h.

Member Typedef Documentation

Constructor & Destructor Documentation

folly::observer_detail::ObserverManager::~ObserverManager ( )

Definition at line 173 of file ObserverManager.cpp.

References currentQueue_, and nextQueue_.

173  {
174  // Destroy NextQueue, before the rest of this object, since it expects
175  // ObserverManager to be alive.
176  nextQueue_.reset();
177  currentQueue_.reset();
178 }
std::unique_ptr< CurrentQueue > currentQueue_
std::unique_ptr< NextQueue > nextQueue_
folly::observer_detail::ObserverManager::ObserverManager ( )
private

Definition at line 168 of file ObserverManager.cpp.

References currentQueue_, and nextQueue_.

Referenced by folly::observer_detail::ObserverManager::Singleton::createManager().

168  {
169  currentQueue_ = std::make_unique<CurrentQueue>();
170  nextQueue_ = std::make_unique<NextQueue>(*this);
171 }
std::unique_ptr< CurrentQueue > currentQueue_
std::unique_ptr< NextQueue > nextQueue_

Member Function Documentation

static size_t folly::observer_detail::ObserverManager::getVersion ( )
inlinestatic

Definition at line 54 of file ObserverManager.h.

References getInstance().

Referenced by folly::observer_detail::Core::getData().

54  {
55  auto instance = getInstance();
56 
57  if (!instance) {
58  return 1;
59  }
60 
61  return instance->version_;
62  }
static std::shared_ptr< ObserverManager > getInstance()
static void folly::observer_detail::ObserverManager::initCore ( Core::Ptr  core)
inlinestatic

Definition at line 119 of file ObserverManager.h.

References folly::exchange(), getInstance(), inManagerThread(), inManagerThread_, and SCOPE_EXIT.

Referenced by folly::observer::makeObserver().

119  {
120  DCHECK(core->getVersion() == 0);
121 
122  auto instance = getInstance();
123  if (!instance) {
124  throw std::logic_error("ObserverManager requested during shutdown");
125  }
126 
128  SCOPE_EXIT {
130  };
131 
132  SharedMutexReadPriority::ReadHolder rh(instance->versionMutex_);
133 
134  core->refresh(instance->version_, false);
135  }
#define SCOPE_EXIT
Definition: ScopeGuard.h:274
T exchange(T &obj, U &&new_value)
Definition: Utility.h:120
static std::shared_ptr< ObserverManager > getInstance()
void folly::observer_detail::ObserverManager::scheduleCurrent ( Function< void()>  task)
private

Definition at line 180 of file ObserverManager.cpp.

References currentQueue_, and folly::gen::move.

180  {
181  currentQueue_->add(std::move(task));
182 }
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
std::unique_ptr< CurrentQueue > currentQueue_
void folly::observer_detail::ObserverManager::scheduleNext ( Core::WeakPtr  core)
private

Definition at line 184 of file ObserverManager.cpp.

References folly::gen::move, and nextQueue_.

184  {
185  nextQueue_->add(std::move(core));
186 }
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
std::unique_ptr< NextQueue > nextQueue_
static void folly::observer_detail::ObserverManager::scheduleRefresh ( Core::Ptr  core,
size_t  minVersion,
bool  force = false 
)
inlinestatic

Definition at line 69 of file ObserverManager.h.

References folly::annotate_rwlock_acquired(), folly::annotate_rwlock_released(), getInstance(), folly::gen::move, and folly::rdlock.

Referenced by folly::observer_detail::Core::refresh().

69  {
70  if (core->getVersion() >= minVersion) {
71  return;
72  }
73 
74  auto instance = getInstance();
75 
76  if (!instance) {
77  return;
78  }
79 
80  SharedMutexReadPriority::ReadHolder rh(instance->versionMutex_);
81 
82  // TSAN assumes that the thread that locks the mutex must
83  // be the one that unlocks it. However, we are passing ownership of
84  // the read lock into the lambda, and the thread that performs the async
85  // work will be the one that unlocks it. To avoid noise with TSAN,
86  // annotate that the thread has released the mutex, and then annotate
87  // the async thread as acquiring the mutex.
89  &instance->versionMutex_,
91  __FILE__,
92  __LINE__);
93 
94  instance->scheduleCurrent([core = std::move(core),
95  instancePtr = instance.get(),
96  rh = std::move(rh),
97  force]() {
98  // Make TSAN know that the current thread owns the read lock now.
100  &instancePtr->versionMutex_,
102  __FILE__,
103  __LINE__);
104 
105  core->refresh(instancePtr->version_, force);
106  });
107  }
static FOLLY_ALWAYS_INLINE void annotate_rwlock_released(void const volatile *const addr, annotate_rwlock_level const w, char const *const f, int const l)
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
static std::shared_ptr< ObserverManager > getInstance()
static FOLLY_ALWAYS_INLINE void annotate_rwlock_acquired(void const volatile *const addr, annotate_rwlock_level const w, char const *const f, int const l)
static void folly::observer_detail::ObserverManager::scheduleRefreshNewVersion ( Core::WeakPtr  coreWeak)
inlinestatic

Definition at line 109 of file ObserverManager.h.

References getInstance(), and folly::gen::move.

Referenced by folly::observer::detail::ObserverCreatorContext< Observable, Traits >::update().

109  {
110  auto instance = getInstance();
111 
112  if (!instance) {
113  return;
114  }
115 
116  instance->scheduleNext(std::move(coreWeak));
117  }
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
static std::shared_ptr< ObserverManager > getInstance()

Member Data Documentation

std::unique_ptr<CurrentQueue> folly::observer_detail::ObserverManager::currentQueue_
private

Definition at line 221 of file ObserverManager.h.

Referenced by ObserverManager(), scheduleCurrent(), and ~ObserverManager().

folly::Synchronized<CycleDetector, std::mutex> folly::observer_detail::ObserverManager::cycleDetector_
private

Definition at line 242 of file ObserverManager.h.

FOLLY_TLS bool folly::observer_detail::ObserverManager::inManagerThread_ {false}
staticprivate
std::unique_ptr<NextQueue> folly::observer_detail::ObserverManager::nextQueue_
private

Definition at line 224 of file ObserverManager.h.

Referenced by ObserverManager(), scheduleNext(), and ~ObserverManager().

std::atomic<size_t> folly::observer_detail::ObserverManager::version_ {1}
private

Definition at line 239 of file ObserverManager.h.

SharedMutexReadPriority folly::observer_detail::ObserverManager::versionMutex_
private

Version mutex is used to make sure all updates are processed from the CurrentQueue, before bumping the version and moving to the NextQueue.

To achieve this every task added to CurrentQueue holds a reader lock. NextQueue grabs a writer lock before bumping the version, so it can only happen if CurrentQueue is empty (notice that we use read-priority shared mutex).

Definition at line 238 of file ObserverManager.h.


The documentation for this class was generated from the following files: