20 template <
typename T,
typename R>
28 template <
typename T,
typename R>
37 template <
typename T,
typename R>
40 LOG(ERROR) <<
"Error while reading from upstream for broadcast: " 50 template <
typename T,
typename R>
52 auto subscriptionId = nextSubscriptionId_++;
53 subscribers_[subscriptionId] = subscriber;
54 onSubscribe(subscriber);
55 return subscriptionId;
58 template <
typename T,
typename R>
60 auto iter = subscribers_.find(subscriptionId);
61 if (iter == subscribers_.end()) {
65 onUnsubscribe(iter->second);
66 subscribers_.erase(iter);
70 template <
typename T,
typename R>
72 if (subscribers_.empty()) {
75 this->
close(this->getContext());
79 template <
typename T,
typename R>
81 static std::atomic<uint64_t> identifierCounter{42};
82 return identifier_ ? identifier_ : (identifier_ = ++identifierCounter);
void readEOF(Context *ctx) override
virtual void onError(folly::exception_wrapper ex)=0
HandlerAdapter< T, std::unique_ptr< folly::IOBuf > >::Context Context
uint64_t getArbitraryIdentifier()
fbstring exceptionStr(const std::exception &e)
void read(Context *ctx, T data) override
virtual void closeIfIdle()
virtual void onNext(const T &)=0
virtual uint64_t subscribe(Subscriber< T, R > *subscriber)
virtual void unsubscribe(uint64_t subscriptionId)
void readException(Context *ctx, folly::exception_wrapper ex) override
virtual void onCompleted()=0
int close(NetworkSocket s)
static constexpr uint64_t data[1]