proxygen
folly::observer_detail::ObserverManager::NextQueue Class Reference

Public Member Functions

 NextQueue (ObserverManager &manager)
 
void add (Core::WeakPtr core)
 
 ~NextQueue ()
 

Private Attributes

ObserverManagermanager_
 
MPMCQueue< Core::WeakPtrqueue_
 
std::thread thread_
 
std::atomic< bool > stop_ {false}
 

Detailed Description

Definition at line 104 of file ObserverManager.cpp.

Constructor & Destructor Documentation

folly::observer_detail::ObserverManager::NextQueue::NextQueue ( ObserverManager manager)
inlineexplicit

Definition at line 106 of file ObserverManager.cpp.

References folly::gen::move, and folly::observer_detail::ObserverManager::CurrentQueue::queue_.

107  : manager_(manager), queue_(kNextQueueSize) {
108  thread_ = std::thread([&]() {
109  Core::WeakPtr queueCoreWeak;
110 
111  while (true) {
112  queue_.blockingRead(queueCoreWeak);
113  if (stop_) {
114  return;
115  }
116 
117  std::vector<Core::Ptr> cores;
118  {
119  auto queueCore = queueCoreWeak.lock();
120  if (!queueCore) {
121  continue;
122  }
123  cores.emplace_back(std::move(queueCore));
124  }
125 
126  {
127  SharedMutexReadPriority::WriteHolder wh(manager_.versionMutex_);
128 
129  // We can't pick more tasks from the queue after we bumped the
130  // version, so we have to do this while holding the lock.
131  while (cores.size() < kNextQueueSize && queue_.read(queueCoreWeak)) {
132  if (stop_) {
133  return;
134  }
135  if (auto queueCore = queueCoreWeak.lock()) {
136  cores.emplace_back(std::move(queueCore));
137  }
138  }
139 
140  ++manager_.version_;
141  }
142 
143  for (auto& core : cores) {
145  }
146  }
147  });
148  }
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
std::weak_ptr< Core > WeakPtr
Definition: Core.h:41
static void scheduleRefresh(Core::Ptr core, size_t minVersion, bool force=false)
folly::observer_detail::ObserverManager::NextQueue::~NextQueue ( )
inline

Definition at line 154 of file ObserverManager.cpp.

References folly::observer_detail::ObserverManager::CurrentQueue::queue_.

154  {
155  stop_ = true;
156  // Write to the queue to notify the thread.
157  queue_.blockingWrite(Core::WeakPtr());
158  thread_.join();
159  }
std::weak_ptr< Core > WeakPtr
Definition: Core.h:41

Member Function Documentation

void folly::observer_detail::ObserverManager::NextQueue::add ( Core::WeakPtr  core)
inline

Definition at line 150 of file ObserverManager.cpp.

References folly::gen::move, and folly::observer_detail::ObserverManager::CurrentQueue::queue_.

150  {
151  queue_.blockingWrite(std::move(core));
152  }
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567

Member Data Documentation

ObserverManager& folly::observer_detail::ObserverManager::NextQueue::manager_
private

Definition at line 162 of file ObserverManager.cpp.

MPMCQueue<Core::WeakPtr> folly::observer_detail::ObserverManager::NextQueue::queue_
private

Definition at line 163 of file ObserverManager.cpp.

std::atomic<bool> folly::observer_detail::ObserverManager::NextQueue::stop_ {false}
private

Definition at line 165 of file ObserverManager.cpp.

std::thread folly::observer_detail::ObserverManager::NextQueue::thread_
private

Definition at line 164 of file ObserverManager.cpp.


The documentation for this class was generated from the following file: