proxygen
Observable.h
Go to the documentation of this file.
1 /*
2  * Copyright 2017-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <wangle/deprecated/rx/types.h> // must come first
22 
23 #include <folly/SharedMutex.h>
25 #include <folly/ThreadLocal.h>
26 #include <folly/small_vector.h>
27 #include <folly/Executor.h>
28 #include <folly/Memory.h>
29 #include <map>
30 #include <memory>
31 
32 namespace wangle {
33 
34 template <class T, size_t InlineObservers>
35 class Observable {
36  public:
38 
39  // TODO perhaps we want to provide this #5283229
40  Observable(Observable&& other) = delete;
41 
42  virtual ~Observable() {
43  if (unsubscriber_) {
44  unsubscriber_->disable();
45  }
46  }
47 
48  // The next three methods subscribe the given Observer to this Observable.
49  //
50  // If these are called within an Observer callback, the new observer will not
51  // get the current update but will get subsequent updates.
52  //
53  // subscribe() returns a Subscription object. The observer will continue to
54  // get updates until the Subscription is destroyed.
55  //
56  // observe(ObserverPtr<T>) creates an indefinite subscription
57  //
58  // observe(Observer<T>*) also creates an indefinite subscription, but the
59  // caller is responsible for ensuring that the given Observer outlives this
60  // Observable. This might be useful in high performance environments where
61  // allocations must be kept to a minimum. Template parameter InlineObservers
62  // specifies how many observers can been subscribed inline without any
63  // allocations (it's just the size of a folly::small_vector).
65  return subscribeImpl(observer, false);
66  }
67 
68  virtual void observe(ObserverPtr<T> observer) {
69  subscribeImpl(observer, true);
70  }
71 
72  virtual void observe(Observer<T>* observer) {
73  if (inCallback_ && *inCallback_) {
74  if (!newObservers_) {
75  newObservers_.reset(new ObserverList());
76  }
77  newObservers_->push_back(observer);
78  } else {
80  observers_.push_back(observer);
81  }
82  }
83 
84  // TODO unobserve(ObserverPtr<T>), unobserve(Observer<T>*)
85 
88 
89  // This and subscribeOn should maybe just be a first-class feature of an
90  // Observable, rather than making new ones whose lifetimes are tied to their
91  // parents. In that case it'd return a reference to this object for
92  // chaining.
94  // you're right Hannes, if we have Observable::create we don't need this
95  // helper class.
96  struct ViaSubject : public Observable<T>
97  {
98  ViaSubject(SchedulerPtr sched,
99  Observable* obs)
100  : scheduler_(sched), observable_(obs)
101  {}
102 
104  return observable_->subscribe(
106  [=](T val) { scheduler_->add([o, val] { o->onNext(val); }); },
107  [=](Error e) { scheduler_->add([o, e] { o->onError(e); }); },
108  [=]() { scheduler_->add([o] { o->onCompleted(); }); }));
109  }
110 
111  protected:
112  SchedulerPtr scheduler_;
113  Observable* observable_;
114  };
115 
116  return std::make_shared<ViaSubject>(scheduler, this);
117  }
118 
122  std::unique_ptr<Observable> subscribeOn(SchedulerPtr scheduler) {
123  struct Subject_ : public Subject<T> {
124  public:
125  Subject_(SchedulerPtr s, Observable* o) : scheduler_(s), observable_(o) {
126  }
127 
129  scheduler_->add([=] {
130  observable_->subscribe(o);
131  });
132  return Subscription<T>(nullptr, 0); // TODO
133  }
134 
135  protected:
136  SchedulerPtr scheduler_;
137  Observable* observable_;
138  };
139 
140  return std::make_unique<Subject_>(scheduler, this);
141  }
142 
143  protected:
144  // Safely execute an operation on each observer. F must take a single
145  // Observer<T>* as its argument.
146  template <class F>
147  void forEachObserver(F f) {
148  if (UNLIKELY(!inCallback_)) {
149  inCallback_.reset(new bool{false});
150  }
151  CHECK(!(*inCallback_));
152  *inCallback_ = true;
153 
154  {
156  for (auto o : observers_) {
157  f(o);
158  }
159 
160  for (auto& kv : subscribers_) {
161  f(kv.second.get());
162  }
163  }
164 
165  if (UNLIKELY((newObservers_ && !newObservers_->empty()) ||
166  (newSubscribers_ && !newSubscribers_->empty()) ||
167  (oldSubscribers_ && !oldSubscribers_->empty()))) {
168  {
170  if (newObservers_) {
171  for (auto observer : *(newObservers_)) {
172  observers_.push_back(observer);
173  }
174  newObservers_->clear();
175  }
176  if (newSubscribers_) {
177  for (auto& kv : *(newSubscribers_)) {
178  subscribers_.insert(std::move(kv));
179  }
180  newSubscribers_->clear();
181  }
182  if (oldSubscribers_) {
183  for (auto id : *(oldSubscribers_)) {
184  subscribers_.erase(id);
185  }
186  oldSubscribers_->clear();
187  }
188  }
189  }
190  *inCallback_ = false;
191  }
192 
193  private:
194  Subscription<T> subscribeImpl(ObserverPtr<T> observer, bool indefinite) {
195  auto subscription = makeSubscription(indefinite);
196  typename SubscriberMap::value_type kv{subscription.id_, std::move(observer)};
197  if (inCallback_ && *inCallback_) {
198  if (!newSubscribers_) {
200  }
201  newSubscribers_->insert(std::move(kv));
202  } else {
204  subscribers_.insert(std::move(kv));
205  }
206  return subscription;
207  }
208 
209  class Unsubscriber {
210  public:
211  explicit Unsubscriber(Observable* observable) : observable_(observable) {
212  CHECK(observable_);
213  }
214 
216  CHECK(id > 0);
218  if (observable_) {
219  observable_->unsubscribe(id);
220  }
221  }
222 
223  void disable() {
225  observable_ = nullptr;
226  }
227 
228  private:
231  };
232 
233  std::shared_ptr<Unsubscriber> unsubscriber_{nullptr};
235 
236  friend class Subscription<T>;
237 
239  if (inCallback_ && *inCallback_) {
240  if (!oldSubscribers_) {
241  oldSubscribers_.reset(new std::vector<uint64_t>());
242  }
243  if (newSubscribers_) {
244  auto it = newSubscribers_->find(id);
245  if (it != newSubscribers_->end()) {
246  newSubscribers_->erase(it);
247  return;
248  }
249  }
250  oldSubscribers_->push_back(id);
251  } else {
253  subscribers_.erase(id);
254  }
255  }
256 
258  if (indefinite) {
259  return Subscription<T>(nullptr, nextSubscriptionId_++);
260  } else {
261  if (!unsubscriber_) {
262  std::lock_guard<folly::MicroSpinLock> guard(unsubscriberLock_);
263  if (!unsubscriber_) {
264  unsubscriber_ = std::make_shared<Unsubscriber>(this);
265  }
266  }
268  }
269  }
270 
271  std::atomic<uint64_t> nextSubscriptionId_;
274 
275  typedef folly::small_vector<Observer<T>*, InlineObservers> ObserverList;
276  ObserverList observers_;
278 
279  typedef std::map<uint64_t, ObserverPtr<T>> SubscriberMap;
280  SubscriberMap subscribers_;
283 };
284 
285 } // namespace wangle
#define T(v)
Definition: http_parser.c:233
virtual void observe(ObserverPtr< T > observer)
Definition: Observable.h:68
std::shared_ptr< Observer< T >> ObserverPtr
Definition: types.h:34
auto f
void reset(T *newPtr=nullptr)
Definition: ThreadLocal.h:176
virtual Subscription< T > subscribe(ObserverPtr< T > observer)
Definition: Observable.h:64
folly::ThreadLocalPtr< std::vector< uint64_t > > oldSubscribers_
Definition: Observable.h:282
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
double val
Definition: String.cpp:273
folly::ThreadLocalPtr< bool > inCallback_
Definition: Observable.h:273
std::shared_ptr< Observable< T >> ObservablePtr
Definition: types.h:33
std::shared_ptr< Unsubscriber > unsubscriber_
Definition: Observable.h:233
std::shared_ptr< folly::FunctionScheduler > scheduler
Definition: FilePoller.cpp:50
void unsubscribe(uint64_t id)
Definition: Observable.h:215
ObserverList observers_
Definition: Observable.h:276
void forEachObserver(F f)
Definition: Observable.h:147
SubscriberMap subscribers_
Definition: Observable.h:280
folly::SharedMutex observersLock_
Definition: Observable.h:272
folly::small_vector< Observer< T > *, InlineObservers > ObserverList
Definition: Observable.h:275
void unsubscribe(uint64_t id)
Definition: Observable.h:238
void push_back(value_type &&t)
Definition: small_vector.h:757
GuardImpl guard(ErrorHandler &&handler)
Definition: Base.h:840
Subscription< T > makeSubscription(bool indefinite)
Definition: Observable.h:257
std::map< uint64_t, ObserverPtr< T > > SubscriberMap
Definition: Observable.h:279
Subscription< T > subscribeImpl(ObserverPtr< T > observer, bool indefinite)
Definition: Observable.h:194
folly::ThreadLocalPtr< SubscriberMap > newSubscribers_
Definition: Observable.h:281
folly::MicroSpinLock unsubscriberLock_
Definition: Observable.h:234
std::shared_ptr< folly::Executor > SchedulerPtr
Definition: types.h:27
ObservablePtr< T > observeOn(SchedulerPtr scheduler)
Definition: Observable.h:93
static set< string > s
virtual void observe(Observer< T > *observer)
Definition: Observable.h:72
virtual ~Observable()
Definition: Observable.h:42
#define UNLIKELY(x)
Definition: Likely.h:48
folly::ThreadLocalPtr< ObserverList > newObservers_
Definition: Observable.h:277
std::atomic< uint64_t > nextSubscriptionId_
Definition: Observable.h:271
Unsubscriber(Observable *observable)
Definition: Observable.h:211
std::unique_ptr< Observable > subscribeOn(SchedulerPtr scheduler)
Definition: Observable.h:122