proxygen
wangle::BroadcastHandler< T, R > Class Template Reference

#include <BroadcastHandler.h>

Inheritance diagram for wangle::BroadcastHandler< T, R >:
wangle::HandlerAdapter< T, std::unique_ptr< folly::IOBuf > > wangle::Handler< T, T, std::unique_ptr< folly::IOBuf >, std::unique_ptr< folly::IOBuf > > wangle::HandlerBase< HandlerContext< T, std::unique_ptr< folly::IOBuf > > >

Public Types

typedef HandlerAdapter< T, std::unique_ptr< folly::IOBuf > >::Context Context
 
- Public Types inherited from wangle::HandlerAdapter< T, std::unique_ptr< folly::IOBuf > >
typedef Handler< T, T, std::unique_ptr< folly::IOBuf >, std::unique_ptr< folly::IOBuf > >::Context Context
 
- Public Types inherited from wangle::Handler< T, T, std::unique_ptr< folly::IOBuf >, std::unique_ptr< folly::IOBuf > >
typedef T rin
 
typedef T rout
 
typedef std::unique_ptr< folly::IOBufwin
 
typedef std::unique_ptr< folly::IOBufwout
 
typedef HandlerContext< T, std::unique_ptr< folly::IOBuf > > Context
 

Public Member Functions

 ~BroadcastHandler () override
 
void read (Context *ctx, T data) override
 
void readEOF (Context *ctx) override
 
void readException (Context *ctx, folly::exception_wrapper ex) override
 
virtual uint64_t subscribe (Subscriber< T, R > *subscriber)
 
virtual void unsubscribe (uint64_t subscriptionId)
 
virtual void closeIfIdle ()
 
virtual void onSubscribe (Subscriber< T, R > *)
 
virtual void onUnsubscribe (Subscriber< T, R > *)
 
virtual void onData (T &)
 
uint64_t getArbitraryIdentifier ()
 
- Public Member Functions inherited from wangle::HandlerAdapter< T, std::unique_ptr< folly::IOBuf > >
void read (Context *ctx, Tmsg) override
 
folly::Future< folly::Unitwrite (Context *ctx, std::unique_ptr< folly::IOBuf >msg) override
 
- Public Member Functions inherited from wangle::Handler< T, T, std::unique_ptr< folly::IOBuf >, std::unique_ptr< folly::IOBuf > >
 ~Handler () override=default
 
virtual void readEOF (Context *ctx)
 
virtual void readException (Context *ctx, folly::exception_wrapper e)
 
virtual void transportActive (Context *ctx)
 
virtual void transportInactive (Context *ctx)
 
virtual folly::Future< folly::UnitwriteException (Context *ctx, folly::exception_wrapper e)
 
virtual folly::Future< folly::Unitclose (Context *ctx)
 
- Public Member Functions inherited from wangle::HandlerBase< HandlerContext< T, std::unique_ptr< folly::IOBuf > > >
virtual ~HandlerBase ()=default
 
virtual void attachPipeline (HandlerContext< T, std::unique_ptr< folly::IOBuf > > *)
 
virtual void detachPipeline (HandlerContext< T, std::unique_ptr< folly::IOBuf > > *)
 
HandlerContext< T, std::unique_ptr< folly::IOBuf > > * getContext ()
 

Protected Member Functions

template<typename FUNC >
void forEachSubscriber (FUNC f)
 

Private Attributes

std::map< uint64_t, Subscriber< T, R > * > subscribers_
 
uint64_t nextSubscriptionId_ {0}
 
uint64_t identifier_ {0}
 

Additional Inherited Members

- Static Public Attributes inherited from wangle::Handler< T, T, std::unique_ptr< folly::IOBuf >, std::unique_ptr< folly::IOBuf > >
static const HandlerDir dir
 

Detailed Description

template<typename T, typename R>
class wangle::BroadcastHandler< T, R >

An Observable type handler for broadcasting/streaming data to a list of subscribers.

Definition at line 30 of file BroadcastHandler.h.

Member Typedef Documentation

template<typename T, typename R>
typedef HandlerAdapter<T, std::unique_ptr<folly::IOBuf> >::Context wangle::BroadcastHandler< T, R >::Context

Definition at line 32 of file BroadcastHandler.h.

Constructor & Destructor Documentation

template<typename T, typename R>
wangle::BroadcastHandler< T, R >::~BroadcastHandler ( )
inlineoverride

Definition at line 34 of file BroadcastHandler.h.

34  {
35  CHECK(subscribers_.empty());
36  }
std::map< uint64_t, Subscriber< T, R > * > subscribers_

Member Function Documentation

template<typename T , typename R >
void wangle::BroadcastHandler< T, R >::closeIfIdle ( )
virtual

If there are no subscribers listening to the broadcast, close the pipeline. This will also delete the broadcast from the BroadcastPool.

Definition at line 71 of file BroadcastHandler-inl.h.

References folly::netops::close().

Referenced by wangle::BroadcastHandler< std::string, std::string >::~BroadcastHandler().

71  {
72  if (subscribers_.empty()) {
73  // No more subscribers. Clean up.
74  // This will delete the broadcast from the pool.
75  this->close(this->getContext());
76  }
77 }
HandlerContext< T, std::unique_ptr< folly::IOBuf > > * getContext()
Definition: Handler.h:34
std::map< uint64_t, Subscriber< T, R > * > subscribers_
template<typename T, typename R>
template<typename FUNC >
void wangle::BroadcastHandler< T, R >::forEachSubscriber ( FUNC  f)
inlineprotected

Definition at line 87 of file BroadcastHandler.h.

87  {
88  auto subscribers = subscribers_;
89  for (const auto& it : subscribers) {
90  f(it.second);
91  }
92  }
auto f
std::map< uint64_t, Subscriber< T, R > * > subscribers_
template<typename T , typename R >
uint64_t wangle::BroadcastHandler< T, R >::getArbitraryIdentifier ( )

FOR TESTS ONLY! Return a unique identifier of this object.

Definition at line 80 of file BroadcastHandler-inl.h.

Referenced by wangle::BroadcastHandler< std::string, std::string >::onData(), and TEST_F().

80  {
81  static std::atomic<uint64_t> identifierCounter{42};
82  return identifier_ ? identifier_ : (identifier_ = ++identifierCounter);
83 }
template<typename T, typename R>
virtual void wangle::BroadcastHandler< T, R >::onData ( T )
inlinevirtual

Invoked for each data that is about to be broadcasted to the subscribers. Subclasses can override to add custom behavior.

Definition at line 77 of file BroadcastHandler.h.

77 {}
template<typename T, typename R>
virtual void wangle::BroadcastHandler< T, R >::onSubscribe ( Subscriber< T, R > *  )
inlinevirtual

Invoked when a new subscriber is added. Subclasses can override to add custom behavior.

Definition at line 65 of file BroadcastHandler.h.

65 {}
template<typename T, typename R>
virtual void wangle::BroadcastHandler< T, R >::onUnsubscribe ( Subscriber< T, R > *  )
inlinevirtual

Invoked when a subscriber is removed. Subclasses can override to add custom behavior.

Definition at line 71 of file BroadcastHandler.h.

71 {}
template<typename T, typename R >
void wangle::BroadcastHandler< T, R >::read ( Context ctx,
T  data 
)
override

Definition at line 21 of file BroadcastHandler-inl.h.

References wangle::Subscriber< T, R >::onNext(), and s.

Referenced by wangle::BroadcastHandler< std::string, std::string >::~BroadcastHandler().

21  {
22  onData(data);
23  forEachSubscriber([&](Subscriber<T, R>* s) {
24  s->onNext(data);
25  });
26 }
virtual void onData(T &)
static set< string > s
static constexpr uint64_t data[1]
Definition: Fingerprint.cpp:43
template<typename T , typename R >
void wangle::BroadcastHandler< T, R >::readEOF ( Context ctx)
override

Definition at line 29 of file BroadcastHandler-inl.h.

References wangle::Subscriber< T, R >::onCompleted(), and s.

Referenced by TEST_F(), and wangle::BroadcastHandler< std::string, std::string >::~BroadcastHandler().

29  {
30  forEachSubscriber([&](Subscriber<T, R>* s) {
31  s->onCompleted();
32  });
33  subscribers_.clear();
34  closeIfIdle();
35 }
static set< string > s
std::map< uint64_t, Subscriber< T, R > * > subscribers_
template<typename T , typename R >
void wangle::BroadcastHandler< T, R >::readException ( Context ctx,
folly::exception_wrapper  ex 
)
override

Definition at line 38 of file BroadcastHandler-inl.h.

References folly::exceptionStr(), wangle::Subscriber< T, R >::onError(), and s.

Referenced by wangle::BroadcastHandler< std::string, std::string >::~BroadcastHandler().

39  {
40  LOG(ERROR) << "Error while reading from upstream for broadcast: "
41  << exceptionStr(ex);
42 
43  forEachSubscriber([&](Subscriber<T, R>* s) {
44  s->onError(ex);
45  });
46  subscribers_.clear();
47  closeIfIdle();
48 }
fbstring exceptionStr(const std::exception &e)
static set< string > s
std::map< uint64_t, Subscriber< T, R > * > subscribers_
template<typename T, typename R>
uint64_t wangle::BroadcastHandler< T, R >::subscribe ( Subscriber< T, R > *  subscriber)
virtual

Subscribes to the broadcast. Returns a unique subscription ID for this subscriber.

Definition at line 51 of file BroadcastHandler-inl.h.

Referenced by TEST_F(), and wangle::BroadcastHandler< std::string, std::string >::~BroadcastHandler().

51  {
52  auto subscriptionId = nextSubscriptionId_++;
53  subscribers_[subscriptionId] = subscriber;
54  onSubscribe(subscriber);
55  return subscriptionId;
56 }
virtual void onSubscribe(Subscriber< T, R > *)
std::map< uint64_t, Subscriber< T, R > * > subscribers_
template<typename T , typename R >
void wangle::BroadcastHandler< T, R >::unsubscribe ( uint64_t  subscriptionId)
virtual

Unsubscribe from the broadcast. Closes the pipeline if the number of subscribers reaches zero.

Definition at line 59 of file BroadcastHandler-inl.h.

Referenced by wangle::BroadcastHandler< std::string, std::string >::~BroadcastHandler().

59  {
60  auto iter = subscribers_.find(subscriptionId);
61  if (iter == subscribers_.end()) {
62  return;
63  }
64 
65  onUnsubscribe(iter->second);
66  subscribers_.erase(iter);
67  closeIfIdle();
68 }
virtual void onUnsubscribe(Subscriber< T, R > *)
std::map< uint64_t, Subscriber< T, R > * > subscribers_

Member Data Documentation

template<typename T, typename R>
uint64_t wangle::BroadcastHandler< T, R >::identifier_ {0}
private

Definition at line 99 of file BroadcastHandler.h.

template<typename T, typename R>
uint64_t wangle::BroadcastHandler< T, R >::nextSubscriptionId_ {0}
private

Definition at line 96 of file BroadcastHandler.h.


The documentation for this class was generated from the following files: