20 template <
typename T,
typename R,
typename P>
24 : routingData_(routingData), broadcastPool_(CHECK_NOTNULL(broadcastPool)) {}
26 template <
typename T,
typename R,
typename P>
39 template <
typename T,
typename R,
typename P>
50 pipeline->transportInactive();
62 VLOG(10) <<
"Subscribed to a broadcast";
65 pipeline->transportActive();
67 .
onError([
this, ctx, deleted](
const std::exception& ex) {
72 LOG(ERROR) <<
"Error subscribing to a broadcast: " << ex.what();
77 template <
typename T,
typename R,
typename P>
82 template <
typename T,
typename R,
typename P>
90 template <
typename T,
typename R,
typename P>
94 this->
write(ctx, data)
95 .onError([
this, ctx, deleted](
const std::exception& ex) {
100 LOG(ERROR) <<
"Error on write: " << ex.what();
105 template <
typename T,
typename R,
typename P>
107 LOG(ERROR) <<
"Error observing a broadcast: " <<
exceptionStr(ex);
114 template <
typename T,
typename R,
typename P>
121 template <
typename T,
typename R,
typename P>
void readEOF(Context *ctx) override
R & routingData() override
std::shared_ptr< bool > deleted_
HandlerAdapter< folly::IOBufQueue &, T >::Context Context
BroadcastHandler< T, R > * broadcastHandler_
fbstring exceptionStr(const std::exception &e)
void onError(folly::exception_wrapper ex) override
ObservingHandler(const R &routingData, BroadcastPool< T, R, P > *broadcastPool)
folly::Future< folly::Unit > write(Context *ctx, Tmsg) override
HandlerContext< folly::IOBufQueue &, T > * getContext()
BroadcastPool< T, R, P > * broadcastPool_
virtual folly::Future< folly::Unit > close(Context *ctx)
~ObservingHandler() override
void onNext(const T &buf) override
void transportActive(Context *ctx) override
void readException(Context *ctx, folly::exception_wrapper ex) override
static constexpr uint64_t data[1]
void onCompleted() override