proxygen
wangle::BroadcastPool< T, R, P >::BroadcastManager Class Reference

#include <BroadcastPool.h>

Inheritance diagram for wangle::BroadcastPool< T, R, P >::BroadcastManager:
wangle::PipelineManager folly::DelayedDestruction folly::DelayedDestructionBase

Public Types

using UniquePtr = std::unique_ptr< BroadcastManager, folly::DelayedDestruction::Destructor >
 

Public Member Functions

 BroadcastManager (BroadcastPool< T, R, P > *broadcastPool, const R &routingData)
 
 ~BroadcastManager () override
 
folly::Future< BroadcastHandler< T, R > * > getHandler ()
 
void deletePipeline (PipelineBase *pipeline) override
 
- Public Member Functions inherited from wangle::PipelineManager
virtual ~PipelineManager ()=default
 
virtual void refreshTimeout ()
 
- Public Member Functions inherited from folly::DelayedDestruction
virtual void destroy ()
 
bool getDestroyPending () const
 
- Public Member Functions inherited from folly::DelayedDestructionBase
virtual ~DelayedDestructionBase ()=default
 

Private Member Functions

void handleConnectError (const std::exception &ex) noexcept
 

Private Attributes

BroadcastPool< T, R, P > * broadcastPool_ {nullptr}
 
routingData_
 
std::unique_ptr< BaseClientBootstrap< P > > client_
 
bool connectStarted_ {false}
 
bool deletingBroadcast_ {false}
 
folly::SharedPromise< BroadcastHandler< T, R > * > sharedPromise_
 

Additional Inherited Members

- Protected Member Functions inherited from folly::DelayedDestruction
 ~DelayedDestruction () override=default
 
 DelayedDestruction ()
 
- Protected Member Functions inherited from folly::DelayedDestructionBase
 DelayedDestructionBase ()
 
uint32_t getDestructorGuardCount () const
 

Detailed Description

template<typename T, typename R, typename P = DefaultPipeline>
class wangle::BroadcastPool< T, R, P >::BroadcastManager

Definition at line 52 of file BroadcastPool.h.

Member Typedef Documentation

template<typename T, typename R, typename P = DefaultPipeline>
using wangle::BroadcastPool< T, R, P >::BroadcastManager::UniquePtr = std::unique_ptr< BroadcastManager, folly::DelayedDestruction::Destructor>

Definition at line 56 of file BroadcastPool.h.

Constructor & Destructor Documentation

template<typename T, typename R, typename P = DefaultPipeline>
wangle::BroadcastPool< T, R, P >::BroadcastManager::BroadcastManager ( BroadcastPool< T, R, P > *  broadcastPool,
const R &  routingData 
)
inline

Definition at line 58 of file BroadcastPool.h.

Referenced by wangle::BroadcastPool< T, R, P >::getHandler().

61  : broadcastPool_(broadcastPool),
62  routingData_(routingData),
63  client_(broadcastPool_->clientBootstrapFactory_->newClient()) {
64  client_->pipelineFactory(broadcastPool_->broadcastPipelineFactory_);
65  }
std::unique_ptr< BaseClientBootstrap< P > > client_
Definition: BroadcastPool.h:84
BroadcastPool< T, R, P > * broadcastPool_
Definition: BroadcastPool.h:81
template<typename T, typename R, typename P = DefaultPipeline>
wangle::BroadcastPool< T, R, P >::BroadcastManager::~BroadcastManager ( )
inlineoverride

Definition at line 67 of file BroadcastPool.h.

References folly::pushmi::__adl::noexcept().

67  {
68  if (client_->getPipeline()) {
69  client_->getPipeline()->setPipelineManager(nullptr);
70  }
71  }
std::unique_ptr< BaseClientBootstrap< P > > client_
Definition: BroadcastPool.h:84

Member Function Documentation

template<typename T , typename R , typename P >
void wangle::BroadcastPool< T, R, P >::BroadcastManager::deletePipeline ( PipelineBase pipeline)
overridevirtual
template<typename T , typename R , typename P >
folly::Future< BroadcastHandler< T, R > * > wangle::BroadcastPool< T, R, P >::BroadcastManager::getHandler ( )

Definition at line 22 of file BroadcastPool-inl.h.

References wangle::BroadcastPool< T, R, P >::BroadcastManager::broadcastPool_, wangle::BroadcastPool< T, R, P >::BroadcastManager::client_, wangle::BroadcastPool< T, R, P >::BroadcastManager::connectStarted_, wangle::BroadcastPool< T, R, P >::BroadcastManager::deletingBroadcast_, wangle::BroadcastPool< T, R, P >::BroadcastManager::handleConnectError(), handler(), folly::InlineExecutor::instance(), wangle::BroadcastPool< T, R, P >::BroadcastManager::routingData_, wangle::PipelineBase::setPipelineManager(), and wangle::BroadcastPool< T, R, P >::BroadcastManager::sharedPromise_.

22  {
23  // getFuture() returns a completed future if we are already connected
24  // Set the executor to the InlineExecutor because subsequent code depends
25  // on the future callback being called inline to ensure that the handler
26  // is not garbage collected before use.
27  auto future = sharedPromise_.getFuture().via(
29 
30  if (connectStarted_) {
31  // Either already connected, in which case the future has the handler,
32  // or there's an outstanding connect request and the promise will be
33  // fulfilled when the connect request completes.
34  return future;
35  }
36 
37  // Kickoff connect request and fulfill all pending promises on completion
38  connectStarted_ = true;
39 
40  broadcastPool_->serverPool_->connect(client_.get(), routingData_)
41  .thenValue([this](DefaultPipeline* pipeline) {
42  DestructorGuard dg(this);
43  pipeline->setPipelineManager(this);
44 
45  auto pipelineFactory = broadcastPool_->broadcastPipelineFactory_;
46  try {
47  pipelineFactory->setRoutingData(pipeline, routingData_);
48  } catch (const std::exception& ex) {
50  return;
51  }
52 
53  if (deletingBroadcast_) {
54  // setRoutingData() could result in an error that would cause the
55  // BroadcastPipeline to get deleted.
56  handleConnectError(std::runtime_error(
57  "Broadcast deleted due to upstream connection error"));
58  return;
59  }
60 
61  auto handler = pipelineFactory->getBroadcastHandler(pipeline);
62  CHECK(handler);
63  sharedPromise_.setValue(handler);
64 
65  // If all the observers go away before connect returns, then the
66  // BroadcastHandler will be idle without any subscribers. Close
67  // the pipeline and remove the broadcast from the pool so that
68  // connections are not leaked.
69  handler->closeIfIdle();
70  })
71  .onError([this](const std::exception& ex) { handleConnectError(ex); });
72 
73  return future;
74 }
void handleConnectError(const std::exception &ex) noexcept
void handler(int, siginfo_t *, void *)
FOLLY_ATTR_VISIBILITY_HIDDEN static FOLLY_ALWAYS_INLINE InlineExecutor & instance() noexcept
std::unique_ptr< BaseClientBootstrap< P > > client_
Definition: BroadcastPool.h:84
folly::SharedPromise< BroadcastHandler< T, R > * > sharedPromise_
Definition: BroadcastPool.h:88
BroadcastPool< T, R, P > * broadcastPool_
Definition: BroadcastPool.h:81
Pipeline< folly::IOBufQueue &, std::unique_ptr< folly::IOBuf >> DefaultPipeline
Definition: Pipeline.h:241
template<typename T , typename R , typename P >
void wangle::BroadcastPool< T, R, P >::BroadcastManager::handleConnectError ( const std::exception &  ex)
privatenoexcept

Definition at line 85 of file BroadcastPool-inl.h.

References wangle::BroadcastPool< T, R, P >::BroadcastManager::broadcastPool_, folly::gen::move, wangle::BroadcastPool< T, R, P >::BroadcastManager::routingData_, and wangle::BroadcastPool< T, R, P >::BroadcastManager::sharedPromise_.

Referenced by wangle::BroadcastPool< T, R, P >::BroadcastManager::getHandler().

86  {
87  LOG(ERROR) << "Error connecting to upstream: " << ex.what();
88 
89  auto sharedPromise = std::move(sharedPromise_);
90  broadcastPool_->deleteBroadcast(routingData_);
91  sharedPromise.setException(folly::make_exception_wrapper<std::exception>(ex));
92 }
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
folly::SharedPromise< BroadcastHandler< T, R > * > sharedPromise_
Definition: BroadcastPool.h:88
BroadcastPool< T, R, P > * broadcastPool_
Definition: BroadcastPool.h:81

Member Data Documentation

template<typename T, typename R, typename P = DefaultPipeline>
BroadcastPool<T, R, P>* wangle::BroadcastPool< T, R, P >::BroadcastManager::broadcastPool_ {nullptr}
private
template<typename T, typename R, typename P = DefaultPipeline>
std::unique_ptr<BaseClientBootstrap<P> > wangle::BroadcastPool< T, R, P >::BroadcastManager::client_
private
template<typename T, typename R, typename P = DefaultPipeline>
bool wangle::BroadcastPool< T, R, P >::BroadcastManager::connectStarted_ {false}
private
template<typename T, typename R, typename P = DefaultPipeline>
bool wangle::BroadcastPool< T, R, P >::BroadcastManager::deletingBroadcast_ {false}
private
template<typename T, typename R, typename P = DefaultPipeline>
R wangle::BroadcastPool< T, R, P >::BroadcastManager::routingData_
private
template<typename T, typename R, typename P = DefaultPipeline>
folly::SharedPromise<BroadcastHandler<T, R>*> wangle::BroadcastPool< T, R, P >::BroadcastManager::sharedPromise_
private

The documentation for this class was generated from the following files: