27 namespace observer_detail {
30 FOLLY_TLS ObserverManager::DependencyRecorder::Dependencies*
34 observer_manager_pool_size,
36 "How many internal threads ObserverManager should use");
41 constexpr
size_t kCurrentQueueSize{10 * 1024};
42 constexpr
size_t kNextQueueSize{10 * 1024};
48 if (FLAGS_observer_manager_pool_size < 1) {
49 LOG(ERROR) <<
"--observer_manager_pool_size should be >= 1";
50 FLAGS_observer_manager_pool_size = 1;
52 for (
int32_t i = 0;
i < FLAGS_observer_manager_pool_size; ++
i) {
69 LOG(ERROR) <<
"Exception while running CurrentQueue task: " 79 queue_.blockingWrite(
nullptr);
92 throw std::runtime_error(
"Too many Observers scheduled for update.");
107 : manager_(manager),
queue_(kNextQueueSize) {
108 thread_ = std::thread([&]() {
112 queue_.blockingRead(queueCoreWeak);
117 std::vector<Core::Ptr> cores;
119 auto queueCore = queueCoreWeak.lock();
123 cores.emplace_back(
std::move(queueCore));
127 SharedMutexReadPriority::WriteHolder wh(manager_.versionMutex_);
131 while (cores.size() < kNextQueueSize &&
queue_.read(queueCoreWeak)) {
135 if (
auto queueCore = queueCoreWeak.lock()) {
136 cores.emplace_back(
std::move(queueCore));
143 for (
auto& core : cores) {
144 manager_.scheduleRefresh(
std::move(core), manager_.version_,
true);
165 std::atomic<bool> stop_{
false};
170 nextQueue_ = std::make_unique<NextQueue>(*this);
static bool inManagerThread()
std::string sformat(StringPiece fmt, Args &&...args)
void add(Function< void()> task)
std::vector< std::thread > threads_
static FOLLY_TLS bool inManagerThread_
static ObserverManager * createManager()
fbstring exceptionStr(const std::exception &e)
constexpr detail::Map< Move > move
MPMCQueue< Function< void()> > queue_
MPMCQueue< Core::WeakPtr > queue_
void scheduleCurrent(Function< void()>)
—— Concurrent Priority Queue Implementation ——
std::unique_ptr< CurrentQueue > currentQueue_
static constexpr StringPiece kObserverManagerThreadNamePrefix
void add(Core::WeakPtr core)
NextQueue(ObserverManager &manager)
static FOLLY_TLS Dependencies * currentDependencies_
std::weak_ptr< Core > WeakPtr
bool setThreadName(std::thread::id tid, StringPiece name)
std::unique_ptr< NextQueue > nextQueue_
DEFINE_int32(observer_manager_pool_size, 4,"How many internal threads ObserverManager should use")
static std::shared_ptr< ObserverManager > getInstance()
static folly::Singleton< ObserverManager > instance
void scheduleNext(Core::WeakPtr)
ObserverManager & manager_