proxygen
wangle::ObservingHandler< T, R, P > Class Template Reference

#include <ObservingHandler.h>

Inheritance diagram for wangle::ObservingHandler< T, R, P >:
wangle::HandlerAdapter< folly::IOBufQueue &, T > wangle::Subscriber< T, R > wangle::Handler< folly::IOBufQueue &, folly::IOBufQueue &, T, T > wangle::HandlerBase< HandlerContext< folly::IOBufQueue &, T > >

Public Types

typedef HandlerAdapter< folly::IOBufQueue &, T >::Context Context
 
- Public Types inherited from wangle::HandlerAdapter< folly::IOBufQueue &, T >
typedef Handler< folly::IOBufQueue &, folly::IOBufQueue &, T, T >::Context Context
 
- Public Types inherited from wangle::Handler< folly::IOBufQueue &, folly::IOBufQueue &, T, T >
typedef folly::IOBufQueuerin
 
typedef folly::IOBufQueuerout
 
typedef T win
 
typedef T wout
 
typedef HandlerContext< folly::IOBufQueue &, TContext
 

Public Member Functions

 ObservingHandler (const R &routingData, BroadcastPool< T, R, P > *broadcastPool)
 
 ~ObservingHandler () override
 
 ObservingHandler (const ObservingHandler &)=delete
 
ObservingHandleroperator= (const ObservingHandler &)=delete
 
 ObservingHandler (ObservingHandler &&)=default
 
ObservingHandleroperator= (ObservingHandler &&)=default
 
void transportActive (Context *ctx) override
 
void readEOF (Context *ctx) override
 
void readException (Context *ctx, folly::exception_wrapper ex) override
 
void onNext (const T &buf) override
 
void onError (folly::exception_wrapper ex) override
 
void onCompleted () override
 
R & routingData () override
 
- Public Member Functions inherited from wangle::HandlerAdapter< folly::IOBufQueue &, T >
void read (Context *ctx, folly::IOBufQueue &msg) override
 
folly::Future< folly::Unitwrite (Context *ctx, Tmsg) override
 
- Public Member Functions inherited from wangle::Handler< folly::IOBufQueue &, folly::IOBufQueue &, T, T >
 ~Handler () override=default
 
virtual void readEOF (Context *ctx)
 
virtual void readException (Context *ctx, folly::exception_wrapper e)
 
virtual void transportActive (Context *ctx)
 
virtual void transportInactive (Context *ctx)
 
virtual folly::Future< folly::UnitwriteException (Context *ctx, folly::exception_wrapper e)
 
virtual folly::Future< folly::Unitclose (Context *ctx)
 
- Public Member Functions inherited from wangle::HandlerBase< HandlerContext< folly::IOBufQueue &, T > >
virtual ~HandlerBase ()=default
 
virtual void attachPipeline (HandlerContext< folly::IOBufQueue &, T > *)
 
virtual void detachPipeline (HandlerContext< folly::IOBufQueue &, T > *)
 
HandlerContext< folly::IOBufQueue &, T > * getContext ()
 
- Public Member Functions inherited from wangle::Subscriber< T, R >
virtual ~Subscriber ()
 

Private Attributes

routingData_
 
BroadcastPool< T, R, P > * broadcastPool_ {nullptr}
 
BroadcastHandler< T, R > * broadcastHandler_ {nullptr}
 
uint64_t subscriptionId_ {0}
 
bool paused_ {false}
 
std::shared_ptr< bool > deleted_ {new bool(false)}
 

Additional Inherited Members

- Static Public Attributes inherited from wangle::Handler< folly::IOBufQueue &, folly::IOBufQueue &, T, T >
static const HandlerDir dir
 

Detailed Description

template<typename T, typename R, typename P = DefaultPipeline>
class wangle::ObservingHandler< T, R, P >

A Handler-Observer adaptor that can be used for subscribing to broadcasts. Maintains a thread-local BroadcastPool from which a BroadcastHandler is obtained and subscribed to based on the given routing data.

Definition at line 31 of file ObservingHandler.h.

Member Typedef Documentation

template<typename T, typename R, typename P = DefaultPipeline>
typedef HandlerAdapter<folly::IOBufQueue&, T>::Context wangle::ObservingHandler< T, R, P >::Context

Definition at line 34 of file ObservingHandler.h.

Constructor & Destructor Documentation

template<typename T, typename R, typename P>
wangle::ObservingHandler< T, R, P >::ObservingHandler ( const R &  routingData,
BroadcastPool< T, R, P > *  broadcastPool 
)

Definition at line 21 of file ObservingHandler-inl.h.

24  : routingData_(routingData), broadcastPool_(CHECK_NOTNULL(broadcastPool)) {}
BroadcastPool< T, R, P > * broadcastPool_
template<typename T , typename R , typename P >
wangle::ObservingHandler< T, R, P >::~ObservingHandler ( )
override

Definition at line 27 of file ObservingHandler-inl.h.

References wangle::ObservingHandler< T, R, P >::broadcastHandler_, wangle::ObservingHandler< T, R, P >::deleted_, and wangle::ObservingHandler< T, R, P >::subscriptionId_.

27  {
28  if (broadcastHandler_) {
29  auto broadcastHandler = broadcastHandler_;
30  broadcastHandler_ = nullptr;
31  broadcastHandler->unsubscribe(subscriptionId_);
32  }
33 
34  if (deleted_) {
35  *deleted_ = true;
36  }
37 }
std::shared_ptr< bool > deleted_
BroadcastHandler< T, R > * broadcastHandler_
template<typename T, typename R, typename P = DefaultPipeline>
wangle::ObservingHandler< T, R, P >::ObservingHandler ( const ObservingHandler< T, R, P > &  )
delete
template<typename T, typename R, typename P = DefaultPipeline>
wangle::ObservingHandler< T, R, P >::ObservingHandler ( ObservingHandler< T, R, P > &&  )
default

Member Function Documentation

template<typename T , typename R , typename P >
void wangle::ObservingHandler< T, R, P >::onCompleted ( )
overridevirtual

Implements wangle::Subscriber< T, R >.

Definition at line 115 of file ObservingHandler-inl.h.

References wangle::ObservingHandler< T, R, P >::broadcastHandler_, wangle::Handler< folly::IOBufQueue &, folly::IOBufQueue &, T, T >::close(), and wangle::HandlerBase< HandlerContext< folly::IOBufQueue &, T > >::getContext().

115  {
116  // broadcastHandler_ will clear its subscribers and delete itself
117  broadcastHandler_ = nullptr;
118  this->close(this->getContext());
119 }
BroadcastHandler< T, R > * broadcastHandler_
HandlerContext< folly::IOBufQueue &, T > * getContext()
Definition: Handler.h:34
virtual folly::Future< folly::Unit > close(Context *ctx)
Definition: Handler.h:79
template<typename T , typename R , typename P >
void wangle::ObservingHandler< T, R, P >::onError ( folly::exception_wrapper  ex)
overridevirtual

Implements wangle::Subscriber< T, R >.

Definition at line 106 of file ObservingHandler-inl.h.

References wangle::ObservingHandler< T, R, P >::broadcastHandler_, wangle::Handler< folly::IOBufQueue &, folly::IOBufQueue &, T, T >::close(), folly::exceptionStr(), and wangle::HandlerBase< HandlerContext< folly::IOBufQueue &, T > >::getContext().

Referenced by wangle::ObservingHandler< T, R, P >::transportActive().

106  {
107  LOG(ERROR) << "Error observing a broadcast: " << exceptionStr(ex);
108 
109  // broadcastHandler_ will clear its subscribers and delete itself
110  broadcastHandler_ = nullptr;
111  this->close(this->getContext());
112 }
BroadcastHandler< T, R > * broadcastHandler_
fbstring exceptionStr(const std::exception &e)
HandlerContext< folly::IOBufQueue &, T > * getContext()
Definition: Handler.h:34
virtual folly::Future< folly::Unit > close(Context *ctx)
Definition: Handler.h:79
template<typename T, typename R , typename P >
void wangle::ObservingHandler< T, R, P >::onNext ( const T buf)
overridevirtual

Implements wangle::Subscriber< T, R >.

Definition at line 91 of file ObservingHandler-inl.h.

References wangle::Handler< folly::IOBufQueue &, folly::IOBufQueue &, T, T >::close(), wangle::ObservingHandler< T, R, P >::deleted_, wangle::HandlerBase< HandlerContext< folly::IOBufQueue &, T > >::getContext(), and wangle::HandlerAdapter< folly::IOBufQueue &, T >::write().

91  {
92  auto ctx = this->getContext();
93  auto deleted = deleted_;
94  this->write(ctx, data)
95  .onError([this, ctx, deleted](const std::exception& ex) {
96  if (*deleted) {
97  return;
98  }
99 
100  LOG(ERROR) << "Error on write: " << ex.what();
101  this->close(ctx);
102  });
103 }
std::shared_ptr< bool > deleted_
folly::Future< folly::Unit > write(Context *ctx, Tmsg) override
Definition: Handler.h:167
HandlerContext< folly::IOBufQueue &, T > * getContext()
Definition: Handler.h:34
virtual folly::Future< folly::Unit > close(Context *ctx)
Definition: Handler.h:79
static constexpr uint64_t data[1]
Definition: Fingerprint.cpp:43
template<typename T, typename R, typename P = DefaultPipeline>
ObservingHandler& wangle::ObservingHandler< T, R, P >::operator= ( const ObservingHandler< T, R, P > &  )
delete
template<typename T, typename R, typename P = DefaultPipeline>
ObservingHandler& wangle::ObservingHandler< T, R, P >::operator= ( ObservingHandler< T, R, P > &&  )
default
template<typename T , typename R , typename P >
void wangle::ObservingHandler< T, R, P >::readEOF ( Context ctx)
override

Definition at line 78 of file ObservingHandler-inl.h.

References wangle::Handler< folly::IOBufQueue &, folly::IOBufQueue &, T, T >::close().

78  {
79  this->close(ctx);
80 }
virtual folly::Future< folly::Unit > close(Context *ctx)
Definition: Handler.h:79
template<typename T , typename R , typename P >
void wangle::ObservingHandler< T, R, P >::readException ( Context ctx,
folly::exception_wrapper  ex 
)
override

Definition at line 83 of file ObservingHandler-inl.h.

References wangle::Handler< folly::IOBufQueue &, folly::IOBufQueue &, T, T >::close(), and folly::exceptionStr().

85  {
86  LOG(ERROR) << "Error on read: " << exceptionStr(ex);
87  this->close(ctx);
88 }
fbstring exceptionStr(const std::exception &e)
virtual folly::Future< folly::Unit > close(Context *ctx)
Definition: Handler.h:79
template<typename T , typename R , typename P >
R & wangle::ObservingHandler< T, R, P >::routingData ( )
overridevirtual
template<typename T , typename R , typename P >
void wangle::ObservingHandler< T, R, P >::transportActive ( Context ctx)
override

Definition at line 40 of file ObservingHandler-inl.h.

References wangle::ObservingHandler< T, R, P >::broadcastHandler_, wangle::ObservingHandler< T, R, P >::broadcastPool_, wangle::Handler< folly::IOBufQueue &, folly::IOBufQueue &, T, T >::close(), wangle::ObservingHandler< T, R, P >::deleted_, wangle::ObservingHandler< T, R, P >::onError(), wangle::ObservingHandler< T, R, P >::routingData_, and wangle::ObservingHandler< T, R, P >::subscriptionId_.

40  {
41  if (broadcastHandler_) {
42  // Already connected
43  return;
44  }
45 
46  // Pause ingress until the remote connection is established and
47  // broadcast handler is ready
48  auto pipeline = dynamic_cast<ObservingPipeline<T>*>(ctx->getPipeline());
49  CHECK(pipeline);
50  pipeline->transportInactive();
51 
52  auto deleted = deleted_;
53  broadcastPool_->getHandler(routingData_)
54  .thenValue(
55  [this, pipeline, deleted](BroadcastHandler<T, R>* broadcastHandler) {
56  if (*deleted) {
57  return;
58  }
59 
60  broadcastHandler_ = broadcastHandler;
61  subscriptionId_ = broadcastHandler_->subscribe(this);
62  VLOG(10) << "Subscribed to a broadcast";
63 
64  // Resume ingress
65  pipeline->transportActive();
66  })
67  .onError([this, ctx, deleted](const std::exception& ex) {
68  if (*deleted) {
69  return;
70  }
71 
72  LOG(ERROR) << "Error subscribing to a broadcast: " << ex.what();
73  this->close(ctx);
74  });
75 }
std::shared_ptr< bool > deleted_
BroadcastHandler< T, R > * broadcastHandler_
void onError(folly::exception_wrapper ex) override
BroadcastPool< T, R, P > * broadcastPool_
virtual folly::Future< folly::Unit > close(Context *ctx)
Definition: Handler.h:79

Member Data Documentation

template<typename T, typename R, typename P = DefaultPipeline>
BroadcastPool<T, R, P>* wangle::ObservingHandler< T, R, P >::broadcastPool_ {nullptr}
private
template<typename T, typename R, typename P = DefaultPipeline>
std::shared_ptr<bool> wangle::ObservingHandler< T, R, P >::deleted_ {new bool(false)}
private
template<typename T, typename R, typename P = DefaultPipeline>
bool wangle::ObservingHandler< T, R, P >::paused_ {false}
private

Definition at line 64 of file ObservingHandler.h.

template<typename T, typename R, typename P = DefaultPipeline>
R wangle::ObservingHandler< T, R, P >::routingData_
private
template<typename T, typename R, typename P = DefaultPipeline>
uint64_t wangle::ObservingHandler< T, R, P >::subscriptionId_ {0}
private

The documentation for this class was generated from the following files: