proxygen
wangle::Observable< T, InlineObservers > Class Template Reference

#include <Observable.h>

Classes

class  Unsubscriber
 

Public Member Functions

 Observable ()
 
 Observable (Observable &&other)=delete
 
virtual ~Observable ()
 
virtual Subscription< Tsubscribe (ObserverPtr< T > observer)
 
virtual void observe (ObserverPtr< T > observer)
 
virtual void observe (Observer< T > *observer)
 
ObservablePtr< TobserveOn (SchedulerPtr scheduler)
 
std::unique_ptr< ObservablesubscribeOn (SchedulerPtr scheduler)
 

Protected Member Functions

template<class F >
void forEachObserver (F f)
 

Private Types

typedef folly::small_vector< Observer< T > *, InlineObservers > ObserverList
 
typedef std::map< uint64_t, ObserverPtr< T > > SubscriberMap
 

Private Member Functions

Subscription< TsubscribeImpl (ObserverPtr< T > observer, bool indefinite)
 
void unsubscribe (uint64_t id)
 
Subscription< TmakeSubscription (bool indefinite)
 

Private Attributes

std::shared_ptr< Unsubscriberunsubscriber_ {nullptr}
 
folly::MicroSpinLock unsubscriberLock_ {0}
 
std::atomic< uint64_tnextSubscriptionId_
 
folly::SharedMutex observersLock_
 
folly::ThreadLocalPtr< bool > inCallback_
 
ObserverList observers_
 
folly::ThreadLocalPtr< ObserverListnewObservers_
 
SubscriberMap subscribers_
 
folly::ThreadLocalPtr< SubscriberMapnewSubscribers_
 
folly::ThreadLocalPtr< std::vector< uint64_t > > oldSubscribers_
 

Friends

class Subscription< T >
 

Detailed Description

template<class T, size_t InlineObservers>
class wangle::Observable< T, InlineObservers >

Definition at line 35 of file Observable.h.

Member Typedef Documentation

template<class T, size_t InlineObservers>
typedef folly::small_vector<Observer<T>*, InlineObservers> wangle::Observable< T, InlineObservers >::ObserverList
private

Definition at line 275 of file Observable.h.

template<class T, size_t InlineObservers>
typedef std::map<uint64_t, ObserverPtr<T> > wangle::Observable< T, InlineObservers >::SubscriberMap
private

Definition at line 279 of file Observable.h.

Constructor & Destructor Documentation

template<class T, size_t InlineObservers>
wangle::Observable< T, InlineObservers >::Observable ( )
inline

Definition at line 37 of file Observable.h.

Referenced by wangle::Observable< T >::Observable().

37 : nextSubscriptionId_{1} {}
std::atomic< uint64_t > nextSubscriptionId_
Definition: Observable.h:271
template<class T, size_t InlineObservers>
wangle::Observable< T, InlineObservers >::Observable ( Observable< T, InlineObservers > &&  other)
delete
template<class T, size_t InlineObservers>
virtual wangle::Observable< T, InlineObservers >::~Observable ( )
inlinevirtual

Definition at line 42 of file Observable.h.

42  {
43  if (unsubscriber_) {
44  unsubscriber_->disable();
45  }
46  }
std::shared_ptr< Unsubscriber > unsubscriber_
Definition: Observable.h:233

Member Function Documentation

template<class T, size_t InlineObservers>
template<class F >
void wangle::Observable< T, InlineObservers >::forEachObserver ( f)
inlineprotected

Definition at line 147 of file Observable.h.

147  {
148  if (UNLIKELY(!inCallback_)) {
149  inCallback_.reset(new bool{false});
150  }
151  CHECK(!(*inCallback_));
152  *inCallback_ = true;
153 
154  {
156  for (auto o : observers_) {
157  f(o);
158  }
159 
160  for (auto& kv : subscribers_) {
161  f(kv.second.get());
162  }
163  }
164 
165  if (UNLIKELY((newObservers_ && !newObservers_->empty()) ||
166  (newSubscribers_ && !newSubscribers_->empty()) ||
167  (oldSubscribers_ && !oldSubscribers_->empty()))) {
168  {
170  if (newObservers_) {
171  for (auto observer : *(newObservers_)) {
172  observers_.push_back(observer);
173  }
174  newObservers_->clear();
175  }
176  if (newSubscribers_) {
177  for (auto& kv : *(newSubscribers_)) {
178  subscribers_.insert(std::move(kv));
179  }
180  newSubscribers_->clear();
181  }
182  if (oldSubscribers_) {
183  for (auto id : *(oldSubscribers_)) {
184  subscribers_.erase(id);
185  }
186  oldSubscribers_->clear();
187  }
188  }
189  }
190  *inCallback_ = false;
191  }
auto f
void reset(T *newPtr=nullptr)
Definition: ThreadLocal.h:176
folly::ThreadLocalPtr< std::vector< uint64_t > > oldSubscribers_
Definition: Observable.h:282
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
folly::ThreadLocalPtr< bool > inCallback_
Definition: Observable.h:273
ObserverList observers_
Definition: Observable.h:276
SubscriberMap subscribers_
Definition: Observable.h:280
folly::SharedMutex observersLock_
Definition: Observable.h:272
folly::ThreadLocalPtr< SubscriberMap > newSubscribers_
Definition: Observable.h:281
#define UNLIKELY(x)
Definition: Likely.h:48
folly::ThreadLocalPtr< ObserverList > newObservers_
Definition: Observable.h:277
template<class T, size_t InlineObservers>
Subscription<T> wangle::Observable< T, InlineObservers >::makeSubscription ( bool  indefinite)
inlineprivate

Definition at line 257 of file Observable.h.

Referenced by wangle::Observable< T >::subscribeImpl().

257  {
258  if (indefinite) {
259  return Subscription<T>(nullptr, nextSubscriptionId_++);
260  } else {
261  if (!unsubscriber_) {
262  std::lock_guard<folly::MicroSpinLock> guard(unsubscriberLock_);
263  if (!unsubscriber_) {
264  unsubscriber_ = std::make_shared<Unsubscriber>(this);
265  }
266  }
268  }
269  }
std::shared_ptr< Unsubscriber > unsubscriber_
Definition: Observable.h:233
GuardImpl guard(ErrorHandler &&handler)
Definition: Base.h:840
folly::MicroSpinLock unsubscriberLock_
Definition: Observable.h:234
friend class Subscription< T >
Definition: Observable.h:236
std::atomic< uint64_t > nextSubscriptionId_
Definition: Observable.h:271
template<class T, size_t InlineObservers>
virtual void wangle::Observable< T, InlineObservers >::observe ( ObserverPtr< T observer)
inlinevirtual

Definition at line 68 of file Observable.h.

68  {
69  subscribeImpl(observer, true);
70  }
Subscription< T > subscribeImpl(ObserverPtr< T > observer, bool indefinite)
Definition: Observable.h:194
template<class T, size_t InlineObservers>
virtual void wangle::Observable< T, InlineObservers >::observe ( Observer< T > *  observer)
inlinevirtual

Definition at line 72 of file Observable.h.

72  {
73  if (inCallback_ && *inCallback_) {
74  if (!newObservers_) {
75  newObservers_.reset(new ObserverList());
76  }
77  newObservers_->push_back(observer);
78  } else {
80  observers_.push_back(observer);
81  }
82  }
folly::ThreadLocalPtr< bool > inCallback_
Definition: Observable.h:273
ObserverList observers_
Definition: Observable.h:276
folly::SharedMutex observersLock_
Definition: Observable.h:272
folly::small_vector< Observer< T > *, InlineObservers > ObserverList
Definition: Observable.h:275
void push_back(value_type &&t)
Definition: small_vector.h:757
folly::ThreadLocalPtr< ObserverList > newObservers_
Definition: Observable.h:277
template<class T, size_t InlineObservers>
ObservablePtr<T> wangle::Observable< T, InlineObservers >::observeOn ( SchedulerPtr  scheduler)
inline

Returns a new Observable that will call back on the given Scheduler. The returned Observable must outlive the parent Observable.

Definition at line 93 of file Observable.h.

93  {
94  // you're right Hannes, if we have Observable::create we don't need this
95  // helper class.
96  struct ViaSubject : public Observable<T>
97  {
98  ViaSubject(SchedulerPtr sched,
99  Observable* obs)
100  : scheduler_(sched), observable_(obs)
101  {}
102 
103  Subscription<T> subscribe(ObserverPtr<T> o) override {
104  return observable_->subscribe(
106  [=](T val) { scheduler_->add([o, val] { o->onNext(val); }); },
107  [=](Error e) { scheduler_->add([o, e] { o->onError(e); }); },
108  [=]() { scheduler_->add([o] { o->onCompleted(); }); }));
109  }
110 
111  protected:
112  SchedulerPtr scheduler_;
113  Observable* observable_;
114  };
115 
116  return std::make_shared<ViaSubject>(scheduler, this);
117  }
#define T(v)
Definition: http_parser.c:233
folly::exception_wrapper Error
Definition: types.h:24
virtual Subscription< T > subscribe(ObserverPtr< T > observer)
Definition: Observable.h:64
double val
Definition: String.cpp:273
std::shared_ptr< folly::FunctionScheduler > scheduler
Definition: FilePoller.cpp:50
std::shared_ptr< folly::Executor > SchedulerPtr
Definition: types.h:27
friend class Subscription< T >
Definition: Observable.h:236
static std::unique_ptr< Observer > create(N &&onNextFn, E &&onErrorFn, C &&onCompletedFn)
Definition: Observer.h:48
template<class T, size_t InlineObservers>
virtual Subscription<T> wangle::Observable< T, InlineObservers >::subscribe ( ObserverPtr< T observer)
inlinevirtual

Definition at line 64 of file Observable.h.

Referenced by wangle::Observable< T >::observeOn(), and wangle::Observable< T >::subscribeOn().

64  {
65  return subscribeImpl(observer, false);
66  }
Subscription< T > subscribeImpl(ObserverPtr< T > observer, bool indefinite)
Definition: Observable.h:194
template<class T, size_t InlineObservers>
Subscription<T> wangle::Observable< T, InlineObservers >::subscribeImpl ( ObserverPtr< T observer,
bool  indefinite 
)
inlineprivate

Definition at line 194 of file Observable.h.

Referenced by wangle::Observable< T >::observe(), and wangle::Observable< T >::subscribe().

194  {
195  auto subscription = makeSubscription(indefinite);
196  typename SubscriberMap::value_type kv{subscription.id_, std::move(observer)};
197  if (inCallback_ && *inCallback_) {
198  if (!newSubscribers_) {
200  }
201  newSubscribers_->insert(std::move(kv));
202  } else {
204  subscribers_.insert(std::move(kv));
205  }
206  return subscription;
207  }
void reset(T *newPtr=nullptr)
Definition: ThreadLocal.h:176
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
folly::ThreadLocalPtr< bool > inCallback_
Definition: Observable.h:273
SubscriberMap subscribers_
Definition: Observable.h:280
folly::SharedMutex observersLock_
Definition: Observable.h:272
Subscription< T > makeSubscription(bool indefinite)
Definition: Observable.h:257
std::map< uint64_t, ObserverPtr< T > > SubscriberMap
Definition: Observable.h:279
folly::ThreadLocalPtr< SubscriberMap > newSubscribers_
Definition: Observable.h:281
template<class T, size_t InlineObservers>
std::unique_ptr<Observable> wangle::Observable< T, InlineObservers >::subscribeOn ( SchedulerPtr  scheduler)
inline

Returns a new Observable that will subscribe to this parent Observable via the given Scheduler. This can be subtle and confusing at first, see http://www.introtorx.com/Content/v1.0.10621.0/15_SchedulingAndThreading.html#SubscribeOnObserveOn

Definition at line 122 of file Observable.h.

122  {
123  struct Subject_ : public Subject<T> {
124  public:
125  Subject_(SchedulerPtr s, Observable* o) : scheduler_(s), observable_(o) {
126  }
127 
128  Subscription<T> subscribe(ObserverPtr<T> o) {
129  scheduler_->add([=] {
130  observable_->subscribe(o);
131  });
132  return Subscription<T>(nullptr, 0); // TODO
133  }
134 
135  protected:
136  SchedulerPtr scheduler_;
137  Observable* observable_;
138  };
139 
140  return std::make_unique<Subject_>(scheduler, this);
141  }
virtual Subscription< T > subscribe(ObserverPtr< T > observer)
Definition: Observable.h:64
std::shared_ptr< folly::FunctionScheduler > scheduler
Definition: FilePoller.cpp:50
std::shared_ptr< folly::Executor > SchedulerPtr
Definition: types.h:27
static set< string > s
friend class Subscription< T >
Definition: Observable.h:236
template<class T, size_t InlineObservers>
void wangle::Observable< T, InlineObservers >::unsubscribe ( uint64_t  id)
inlineprivate

Definition at line 238 of file Observable.h.

238  {
239  if (inCallback_ && *inCallback_) {
240  if (!oldSubscribers_) {
241  oldSubscribers_.reset(new std::vector<uint64_t>());
242  }
243  if (newSubscribers_) {
244  auto it = newSubscribers_->find(id);
245  if (it != newSubscribers_->end()) {
246  newSubscribers_->erase(it);
247  return;
248  }
249  }
250  oldSubscribers_->push_back(id);
251  } else {
253  subscribers_.erase(id);
254  }
255  }
void reset(T *newPtr=nullptr)
Definition: ThreadLocal.h:176
folly::ThreadLocalPtr< std::vector< uint64_t > > oldSubscribers_
Definition: Observable.h:282
folly::ThreadLocalPtr< bool > inCallback_
Definition: Observable.h:273
SubscriberMap subscribers_
Definition: Observable.h:280
folly::SharedMutex observersLock_
Definition: Observable.h:272
folly::ThreadLocalPtr< SubscriberMap > newSubscribers_
Definition: Observable.h:281

Friends And Related Function Documentation

template<class T, size_t InlineObservers>
friend class Subscription< T >
friend

Definition at line 236 of file Observable.h.

Member Data Documentation

template<class T, size_t InlineObservers>
folly::ThreadLocalPtr<bool> wangle::Observable< T, InlineObservers >::inCallback_
private
template<class T, size_t InlineObservers>
folly::ThreadLocalPtr<ObserverList> wangle::Observable< T, InlineObservers >::newObservers_
private
template<class T, size_t InlineObservers>
folly::ThreadLocalPtr<SubscriberMap> wangle::Observable< T, InlineObservers >::newSubscribers_
private
template<class T, size_t InlineObservers>
std::atomic<uint64_t> wangle::Observable< T, InlineObservers >::nextSubscriptionId_
private

Definition at line 271 of file Observable.h.

Referenced by wangle::Observable< T >::makeSubscription().

template<class T, size_t InlineObservers>
ObserverList wangle::Observable< T, InlineObservers >::observers_
private
template<class T, size_t InlineObservers>
folly::SharedMutex wangle::Observable< T, InlineObservers >::observersLock_
private
template<class T, size_t InlineObservers>
folly::ThreadLocalPtr<std::vector<uint64_t> > wangle::Observable< T, InlineObservers >::oldSubscribers_
private
template<class T, size_t InlineObservers>
SubscriberMap wangle::Observable< T, InlineObservers >::subscribers_
private
template<class T, size_t InlineObservers>
std::shared_ptr<Unsubscriber> wangle::Observable< T, InlineObservers >::unsubscriber_ {nullptr}
private
template<class T, size_t InlineObservers>
folly::MicroSpinLock wangle::Observable< T, InlineObservers >::unsubscriberLock_ {0}
private

Definition at line 234 of file Observable.h.

Referenced by wangle::Observable< T >::makeSubscription().


The documentation for this class was generated from the following file: