22 using namespace folly;
29 addr = std::make_shared<SocketAddress>();
30 serverPool = std::make_shared<StrictMock<MockServerPool>>(
addr);
33 std::make_shared<StrictMock<MockBroadcastPipelineFactory>>();
35 pool = std::make_unique<BroadcastPool<int, std::string>>(
38 std::make_shared<ClientBootstrapFactory>());
44 Mock::VerifyAndClear(serverPool.get());
45 Mock::VerifyAndClear(pipelineFactory.get());
49 pipelineFactory.reset();
59 std::shared_ptr<AsyncTransportWrapper>)
override {
60 auto pipeline = DefaultPipeline::create();
68 server = std::make_unique<ServerBootstrap<DefaultPipeline>>();
69 server->childPipeline(std::make_shared<ServerPipelineFactory>());
71 server->getSockets()[0]->getAddress(
addr.get());
78 std::unique_ptr<BroadcastPool<int, std::string>>
pool;
82 std::unique_ptr<ServerBootstrap<DefaultPipeline>>
server;
83 std::shared_ptr<SocketAddress>
addr;
101 pool->getHandler(routingData1)
108 EXPECT_CALL(*pipelineFactory, setRoutingData(
_,
"url1")).Times(1);
116 pool->getHandler(routingData1)
131 pool->getHandler(routingData1)
134 handler1Id = handler1->getArbitraryIdentifier();
135 handler1->subscribe(&subscriber);
138 EXPECT_CALL(*pipelineFactory, setRoutingData(
_,
"url1")).Times(1);
144 handler1->readEOF(handler1->getContext());
149 pool->getHandler(routingData2)
156 EXPECT_CALL(*pipelineFactory, setRoutingData(
_,
"url2")).Times(1);
178 pool->getHandler(routingData)
188 pool->getHandler(routingData)
197 EXPECT_CALL(*pipelineFactory, setRoutingData(
_,
"url1")).Times(1);
209 pool->getHandler(routingData)
225 bool handler1Error =
false;
226 bool handler2Error =
false;
235 pool->getHandler(routingData)
237 .onError([&](
const std::exception&) {
238 handler1Error =
true;
246 pool->getHandler(routingData)
248 .onError([&](
const std::exception&) {
249 handler2Error =
true;
269 pool->getHandler(routingData)
275 EXPECT_CALL(*pipelineFactory, setRoutingData(
_,
"url1")).Times(1);
290 bool handler1Error =
false;
291 bool handler2Error =
false;
297 serverPool->failConnect();
298 pool->getHandler(routingData)
300 .onError([&](
const std::exception&) {
301 handler1Error =
true;
314 bool handlerError =
false;
320 pool->getHandler(routingData)
322 .onError([&](
const std::exception&) {
328 .WillOnce(Throw(std::exception()));
340 bool handlerError =
false;
346 pool->getHandler(routingData)
348 .onError([&](
const std::exception&) {
356 pipeline->
readException(std::runtime_error(
"upstream error"));
375 pool->getHandler(routingData)
382 EXPECT_CALL(*pipelineFactory, setRoutingData(
_,
"url1")).Times(1);
400 bool handler1Connected =
false;
401 bool handler2Connected =
false;
408 pool->getHandler(routingData)
410 handler1Connected =
true;
419 pool->getHandler(routingData)
421 handler2Connected =
true;
427 EXPECT_CALL(*pipelineFactory, setRoutingData(
_,
"url1")).Times(1);
439 handler1Connected =
false;
440 handler2Connected =
false;
441 pool->getHandler(routingData)
443 handler1Connected =
true;
450 pool->getHandler(routingData)
452 handler2Connected =
true;
460 EXPECT_CALL(*pipelineFactory, setRoutingData(
_,
"url1")).Times(1);
491 broadcastHandler = pipelineFactory->getBroadcastHandler(pipeline);
494 auto pipeline1 = factory1.
newPipeline(
nullptr, kUrl,
nullptr,
nullptr);
495 pipeline1->transportActive();
502 EXPECT_CALL(*pipelineFactory, setRoutingData(
_,
_)).Times(0);
503 auto pipeline2 = factory1.
newPipeline(
nullptr, kUrl,
nullptr,
nullptr);
504 pipeline2->transportActive();
519 pipelineFactory->getBroadcastHandler(pipeline)
520 ->getArbitraryIdentifier(),
523 auto pipeline3 = factory1.
newPipeline(
nullptr, kUrl,
nullptr,
nullptr);
524 pipeline3->transportActive();
530 pipeline3->readEOF();
539 pipelineFactory->getBroadcastHandler(pipeline)
540 ->getArbitraryIdentifier(),
543 auto pipeline4 = factory2.
newPipeline(
nullptr, kUrl,
nullptr,
nullptr);
544 pipeline4->transportActive();
549 pipeline1->readEOF();
550 pipeline2->readEOF();
551 pipeline4->readEOF();
void readEOF(Context *ctx) override
ObservingPipeline< int >::Ptr newPipeline(std::shared_ptr< folly::AsyncTransportWrapper >, const std::string &routingData, RoutingDataHandler< std::string > *, std::shared_ptr< TransportInfo >) override
uint64_t getArbitraryIdentifier()
EventBase * getEventBase() const
std::unique_ptr< BroadcastPool< int, std::string > > pool
NiceMock< MockSubscriber< int, std::string > > subscriber
#define EXPECT_EQ(val1, val2)
std::shared_ptr< StrictMock< MockServerPool > > serverPool
DefaultPipeline::Ptr newPipeline(std::shared_ptr< AsyncTransportWrapper >) override
HandlerAdapter< folly::IOBufQueue &, std::unique_ptr< folly::IOBuf > > BytesToBytesHandler
std::enable_if<!std::is_same< T, folly::Unit >::value >::type readEOF()
—— Concurrent Priority Queue Implementation ——
static EventBaseManager * get()
std::shared_ptr< SocketAddress > addr
void handler(int, siginfo_t *, void *)
virtual uint64_t subscribe(Subscriber< T, R > *subscriber)
bool loopOnce(int flags=0)
PolymorphicAction< internal::InvokeAction< FunctionImpl > > Invoke(FunctionImpl function_impl)
bool wait(Waiter *waiter, bool shouldSleep, Waiter *&next)
std::shared_ptr< StrictMock< MockBroadcastPipelineFactory > > pipelineFactory
TEST_F(AsyncSSLSocketWriteTest, write_coalescing1)
#define EXPECT_TRUE(condition)
std::enable_if<!std::is_same< T, folly::Unit >::value >::type readException(folly::exception_wrapper e)
std::unique_ptr< ServerBootstrap< DefaultPipeline > > server
#define EXPECT_NE(val1, val2)
std::shared_ptr< Pipeline > Ptr
#define EXPECT_CALL(obj, call)
const internal::AnythingMatcher _
void join(const Delim &delimiter, Iterator begin, Iterator end, String &output)
bool isBroadcasting(const R &routingData)
#define EXPECT_FALSE(condition)
ThreadPoolListHook * addr
virtual BroadcastPool< T, R, P > * broadcastPool(std::shared_ptr< BaseClientBootstrapFactory<>> clientFactory=nullptr)
virtual PipelineBase * getPipeline()=0