34 template <
class T,
size_t InlineObservers>
100 : scheduler_(sched), observable_(obs)
104 return observable_->subscribe(
106 [=](
T val) { scheduler_->add([o, val] { o->onNext(val); }); },
107 [=](
Error e) { scheduler_->add([o, e] { o->onError(e); }); },
108 [=]() { scheduler_->add([o] { o->onCompleted(); }); }));
116 return std::make_shared<ViaSubject>(
scheduler,
this);
123 struct Subject_ :
public Subject<T> {
129 scheduler_->add([=] {
130 observable_->subscribe(o);
140 return std::make_unique<Subject_>(
scheduler,
this);
196 typename SubscriberMap::value_type kv{subscription.id_,
std::move(observer)};
219 observable_->unsubscribe(
id);
225 observable_ =
nullptr;
virtual void observe(ObserverPtr< T > observer)
std::shared_ptr< Observer< T >> ObserverPtr
void reset(T *newPtr=nullptr)
virtual Subscription< T > subscribe(ObserverPtr< T > observer)
folly::ThreadLocalPtr< std::vector< uint64_t > > oldSubscribers_
constexpr detail::Map< Move > move
folly::ThreadLocalPtr< bool > inCallback_
std::shared_ptr< Observable< T >> ObservablePtr
std::shared_ptr< Unsubscriber > unsubscriber_
std::shared_ptr< folly::FunctionScheduler > scheduler
void unsubscribe(uint64_t id)
void forEachObserver(F f)
SubscriberMap subscribers_
folly::SharedMutex observersLock_
folly::small_vector< Observer< T > *, InlineObservers > ObserverList
void unsubscribe(uint64_t id)
void push_back(value_type &&t)
GuardImpl guard(ErrorHandler &&handler)
Subscription< T > makeSubscription(bool indefinite)
std::map< uint64_t, ObserverPtr< T > > SubscriberMap
Subscription< T > subscribeImpl(ObserverPtr< T > observer, bool indefinite)
folly::ThreadLocalPtr< SubscriberMap > newSubscribers_
folly::MicroSpinLock unsubscriberLock_
std::shared_ptr< folly::Executor > SchedulerPtr
ObservablePtr< T > observeOn(SchedulerPtr scheduler)
virtual void observe(Observer< T > *observer)
folly::ThreadLocalPtr< ObserverList > newObservers_
std::atomic< uint64_t > nextSubscriptionId_
Unsubscriber(Observable *observable)
std::unique_ptr< Observable > subscribeOn(SchedulerPtr scheduler)