proxygen
Observable-inl.h
Go to the documentation of this file.
1 /*
2  * Copyright 2016-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17 
18 namespace folly {
19 namespace observer {
20 
21 namespace detail {
22 
23 template <typename Observable, typename Traits>
25  using T = typename Traits::element_type;
26 
27  public:
28  template <typename... Args>
30  : observable_(std::forward<Args>(args)...) {
31  updateValue();
32  }
33 
35  if (value_.copy()) {
36  Traits::unsubscribe(observable_);
37  }
38  }
39 
41  coreWeak_ = std::move(coreWeak);
42  }
43 
44  std::shared_ptr<const T> get() {
45  updateRequested_ = false;
46  return value_.copy();
47  }
48 
49  void update() {
50  // This mutex ensures there's no race condition between initial update()
51  // call and update() calls from the subsciption callback.
52  //
53  // Additionally it helps avoid races between two different subscription
54  // callbacks (getting new value from observable and storing it into value_
55  // is not atomic).
56  std::lock_guard<std::mutex> lg(updateMutex_);
57  if (!updateValue()) {
58  // Value didn't change, so we can skip the version update.
59  return;
60  }
61 
62  bool expected = false;
63  if (updateRequested_.compare_exchange_strong(expected, true)) {
65  }
66  }
67 
68  template <typename F>
69  void subscribe(F&& callback) {
70  Traits::subscribe(observable_, std::forward<F>(callback));
71  }
72 
73  private:
74  bool updateValue() {
75  auto newValue = Traits::get(observable_);
76  auto newValuePtr = newValue.get();
77  if (!newValue) {
78  throw std::logic_error("Observable returned nullptr.");
79  }
80  value_.swap(newValue);
81  return newValuePtr != newValue.get();
82  }
83 
85  std::atomic<bool> updateRequested_{false};
86 
88 
89  Observable observable_;
90 
92 };
93 
94 } // namespace detail
95 
96 template <typename Observable, typename Traits>
97 template <typename... Args>
99  : context_(std::make_shared<Context>(std::forward<Args>(args)...)) {}
100 
101 template <typename Observable, typename Traits>
104  // This master shared_ptr allows grabbing derived weak_ptrs, pointing to the
105  // the same Context object, but using a separate reference count. Master
106  // shared_ptr destructor then blocks until all shared_ptrs obtained from
107  // derived weak_ptrs are released.
108  class ContextMasterPointer {
109  public:
110  explicit ContextMasterPointer(std::shared_ptr<Context> context)
111  : contextMaster_(std::move(context)),
112  context_(
113  contextMaster_.get(),
114  [destroyBaton = destroyBaton_](Context*) {
115  destroyBaton->post();
116  }) {}
117  ~ContextMasterPointer() {
118  if (context_) {
119  context_.reset();
120  destroyBaton_->wait();
121  }
122  }
123  ContextMasterPointer(const ContextMasterPointer&) = delete;
124  ContextMasterPointer(ContextMasterPointer&&) = default;
125  ContextMasterPointer& operator=(const ContextMasterPointer&) = delete;
126  ContextMasterPointer& operator=(ContextMasterPointer&&) = default;
127 
128  Context* operator->() const {
129  return contextMaster_.get();
130  }
131 
132  std::weak_ptr<Context> get_weak() {
133  return context_;
134  }
135 
136  private:
137  std::shared_ptr<folly::Baton<>> destroyBaton_{
138  std::make_shared<folly::Baton<>>()};
139  std::shared_ptr<Context> contextMaster_;
140  std::shared_ptr<Context> context_;
141  };
142  // We want to make sure that Context can only be destroyed when Core is
143  // destroyed. So we have to avoid the situation when subscribe callback is
144  // locking Context shared_ptr and remains the last to release it.
145  // We solve this by having Core hold the master shared_ptr and subscription
146  // callback gets derived weak_ptr.
147  ContextMasterPointer contextMaster(context_);
148  auto contextWeak = contextMaster.get_weak();
149  auto observer = makeObserver(
150  [context = std::move(contextMaster)]() { return context->get(); });
151 
152  context_->setCore(observer.core_);
153  context_->subscribe([contextWeak = std::move(contextWeak)] {
154  if (auto context = contextWeak.lock()) {
155  context->update();
156  }
157  });
158 
159  // Do an extra update in case observable was updated between observer creation
160  // and setting updates callback.
161  context_->update();
162  context_.reset();
163 
164  return observer;
165 }
166 } // namespace observer
167 } // namespace folly
static void scheduleRefreshNewVersion(Core::WeakPtr coreWeak)
void swap(Synchronized &rhs)
Definition: Synchronized.h:684
void subscribe(uint32_t iters, int N)
Definition: RxBenchmark.cpp:54
context
Definition: CMakeCache.txt:563
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
STL namespace.
observer_detail::Core::WeakPtr coreWeak_
internal::ArgsMatcher< InnerMatcher > Args(const InnerMatcher &matcher)
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
folly::Synchronized< std::shared_ptr< const T > > value_
Observer< observer_detail::ResultOfUnwrapSharedPtr< F > > makeObserver(F &&creator)
Definition: Observer-inl.h:37
void setCore(observer_detail::Core::WeakPtr coreWeak)
std::shared_ptr< Context > context_
Definition: Observable.h:60
std::weak_ptr< Core > WeakPtr
Definition: Core.h:41
std::mutex mutex
PUSHMI_INLINE_VAR constexpr detail::get_fn< T > get
Definition: submit.h:391
void copy(T *target) const
Definition: Synchronized.h:721