18 using namespace folly;
28 std::shared_ptr<AsyncTransportWrapper>
socket)
override {
32 auto pipeline = DefaultPipeline::create();
42 routingData_.routingData =
'A';
45 downstreamPipelineFactory_ =
46 std::make_shared<MockDownstreamPipelineFactory>(downstreamHandler_);
48 server_ = std::make_unique<TestServer>();
52 routingPipeline_ = DefaultPipeline::create();
54 routingDataHandlerFactory_ =
55 std::make_shared<MockRoutingDataHandlerFactory>();
58 routingDataHandlerFactory_,
59 downstreamPipelineFactory_,
63 routingDataHandlerFactory_->setRoutingDataHandler(routingDataHandler_);
65 acceptPipeline_ = AcceptPipeline::create();
66 acceptPipeline_->addBack(
67 std::shared_ptr<MockAcceptRoutingHandler>(acceptRoutingHandler_));
68 acceptPipeline_->finalize();
72 auto ioGroup = std::make_shared<IOThreadPoolExecutor>(kNumIOThreads);
74 acceptPipelineFactory_ =
75 std::make_shared<MockAcceptPipelineFactory>(acceptPipeline_);
76 server_->pipeline(acceptPipelineFactory_)->group(ioGroup, ioGroup)->bind(0);
77 server_->getSockets()[0]->getAddress(&address_);
78 VLOG(4) <<
"Start server at " << address_;
86 client_ = std::make_shared<TestClient>();
87 client_->pipelineFactory(std::make_shared<TestClientPipelineFactory>());
88 client_->group(server_->getIOGroup());
89 return client_->connect(address_);
93 auto clientPipelinePromise =
94 std::make_shared<folly::Promise<DefaultPipeline*>>();
98 VLOG(4) <<
"Client connected. Send data.";
101 *(
data->writableData()) =
'a';
103 clientPipelinePromise->setValue(clientPipeline);
108 return clientPipelinePromise->getFuture();
112 auto clientPipelinePromise =
113 std::make_shared<folly::Promise<DefaultPipeline*>>();
116 clientConnectAndWrite().thenValue([=](
DefaultPipeline* clientPipeline) {
117 VLOG(4) <<
"Client close";
118 clientPipeline->
close().thenValue(
119 [=](
auto&&) { clientPipelinePromise->setValue(clientPipeline); });
123 return clientPipelinePromise->getFuture();
127 auto clientPipelinePromise =
128 std::make_shared<folly::Promise<DefaultPipeline*>>();
131 clientPipelinePromise->setValue(clientPipeline);
135 return clientPipelinePromise->getFuture();
141 std::runtime_error(
"Client socket exception, right after connect."));
146 acceptPipeline_.reset();
147 acceptPipelineFactory_->cleanup();
167 int kNumIOThreads{1};
176 MockRoutingDataHandler::RoutingData& ) {
177 VLOG(4) <<
"Parsed routing data";
182 boost::barrier barrier(2);
185 .WillOnce(
Invoke([&](MockBytesToBytesHandler::Context* ,
187 VLOG(4) <<
"Downstream received a read";
190 .WillOnce(
Invoke([&](MockBytesToBytesHandler::Context* ctx) {
191 VLOG(4) <<
"Downstream EOF";
198 clientConnectAndCleanClose();
203 EXPECT_EQ(0, acceptRoutingHandler_->getRoutingPipelineCount());
208 boost::barrier barrierConnect(2);
213 MockRoutingDataHandler::RoutingData& ) {
214 VLOG(4) <<
"Need more data to be parse.";
215 barrierConnect.wait();
220 auto futureClientPipeline = clientConnectAndWrite();
223 barrierConnect.wait();
224 boost::barrier barrierException(2);
226 clientPipeline->
getTransport()->getEventBase()->runInEventBaseThread(
229 std::runtime_error(
"Socket error while expecting routing data."));
233 .WillOnce(
Invoke([&](MockBytesToBytesHandler::Context* ,
235 VLOG(4) <<
"Routing data handler Exception";
236 acceptRoutingHandler_->onError(kConnId0, ex);
237 barrierException.wait();
239 barrierException.wait();
242 EXPECT_CALL(*downstreamHandler_, transportActive(
_)).Times(0);
243 delete downstreamHandler_;
246 EXPECT_EQ(0, acceptRoutingHandler_->getRoutingPipelineCount());
251 EXPECT_CALL(*routingDataHandler_, parseRoutingData(
_,
_)).Times(0);
254 EXPECT_CALL(*downstreamHandler_, transportActive(
_)).Times(0);
255 delete downstreamHandler_;
258 boost::barrier barrierConnect(2);
260 .WillOnce(
Invoke([&](MockBytesToBytesHandler::Context* ) {
261 barrierConnect.wait();
263 auto futureClientPipeline = justClientConnect();
264 barrierConnect.wait();
265 futureClientPipeline.wait();
268 boost::barrier barrierException(2);
271 [&](MockBytesToBytesHandler::Context* ,
273 sendClientException(futureClientPipeline.value());
274 barrierException.wait();
277 EXPECT_EQ(1, acceptRoutingHandler_->getRoutingPipelineCount());
283 acceptPipeline_->readException(
284 std::runtime_error(
"An exception from the socket."));
285 acceptRoutingHandler_->onRoutingData(kConnId0, routingData_);
Future< DefaultPipeline * > clientConnectAndCleanClose()
std::unique_ptr< TestServer > server_
std::enable_if<!std::is_same< T, folly::Unit >::value, folly::Future< folly::Unit > >::type close()
static std::unique_ptr< IOBuf > create(std::size_t capacity)
MockRoutingDataHandler * routingDataHandler_
#define EXPECT_EQ(val1, val2)
constexpr detail::Map< Move > move
EventBase * getEventBase()
std::shared_ptr< MockRoutingDataHandlerFactory > routingDataHandlerFactory_
—— Concurrent Priority Queue Implementation ——
EventBase * getEventBase()
RoutingDataHandler< char >::RoutingData routingData_
Future< DefaultPipeline * > clientConnectAndWrite()
DefaultPipeline::Ptr routingPipeline_
std::enable_if<!std::is_same< T, folly::Unit >::value, folly::Future< folly::Unit > >::type write(W msg)
PolymorphicAction< internal::InvokeAction< FunctionImpl > > Invoke(FunctionImpl function_impl)
size_t read(T &out, folly::io::Cursor &cursor)
MockBytesToBytesHandler * downstreamHandler_
DefaultPipeline::Ptr newPipeline(std::shared_ptr< AsyncTransportWrapper > socket) override
Future< DefaultPipeline * > justClientConnect()
EventBase * getEventBase() override
Implements the IOExecutor interface.
constexpr auto data(C &c) -> decltype(c.data())
MockAcceptRoutingHandler * acceptRoutingHandler_
bool runInEventBaseThread(void(*fn)(T *), T *arg)
std::shared_ptr< folly::AsyncTransport > getTransport()
NetworkSocket socket(int af, int type, int protocol)
std::shared_ptr< TestClient > client_
std::shared_ptr< MockDownstreamPipelineFactory > downstreamPipelineFactory_
AcceptPipeline::Ptr acceptPipeline_
TEST_F(AsyncSSLSocketWriteTest, write_coalescing1)
std::enable_if<!std::is_same< T, folly::Unit >::value, folly::Future< folly::Unit > >::type writeException(folly::exception_wrapper e)
void sendClientException(DefaultPipeline *clientPipeline)
#define EXPECT_TRUE(condition)
std::shared_ptr< Pipeline > Ptr
Future< DefaultPipeline * > clientConnect()
#define EXPECT_CALL(obj, call)
const internal::AnythingMatcher _
std::shared_ptr< MockAcceptPipelineFactory > acceptPipelineFactory_