21 #include <glog/logging.h> 22 #include <gtest/gtest.h> 23 #include <boost/thread.hpp> 28 using namespace folly;
38 std::shared_ptr<AsyncTransportWrapper> sock)
override {
57 std::shared_ptr<AsyncTransportWrapper>)
override {
64 std::atomic<int> pipelines{0};
83 return std::make_shared<TestAcceptor>();
92 TEST(Bootstrap, ServerWithPipeline) {
94 server.childPipeline(std::make_shared<TestPipelineFactory>());
99 TEST(Bootstrap, ServerWithChildHandler) {
101 server.childHandler(std::make_shared<TestAcceptorFactory>());
106 TEST(Bootstrap, ClientServerTest) {
108 auto factory = std::make_shared<TestPipelineFactory>();
109 server.childPipeline(factory);
126 TEST(Bootstrap, ClientConnectionManagerTest) {
131 auto factory = std::make_shared<TestPipelineFactory>();
132 server.childPipeline(factory);
133 server.group(std::make_shared<IOThreadPoolExecutor>(1));
146 client2.
pipelineFactory(std::make_shared<TestClientPipelineFactory>());
156 TEST(Bootstrap, ServerAcceptGroupTest) {
160 auto factory = std::make_shared<TestPipelineFactory>();
161 server.childPipeline(factory);
162 server.group(std::make_shared<IOThreadPoolExecutor>(1),
nullptr);
168 boost::barrier barrier(2);
169 auto thread = std::thread([&](){
184 TEST(Bootstrap, ServerAcceptGroup2Test) {
191 serverSocket->bind(0);
192 serverSocket->listen(0);
193 serverSocket->startAccepting();
194 serverSocket->setReusePortEnabled(
true);
195 serverSocket->stopAccepting();
197 LOG(
INFO) <<
"Reuse port probably not supported";
202 auto factory = std::make_shared<TestPipelineFactory>();
203 server.childPipeline(factory);
204 server.group(std::make_shared<IOThreadPoolExecutor>(4),
nullptr);
222 TEST(Bootstrap, SharedThreadPool) {
227 serverSocket->bind(0);
228 serverSocket->listen(0);
229 serverSocket->startAccepting();
230 serverSocket->setReusePortEnabled(
true);
231 serverSocket->stopAccepting();
233 LOG(
INFO) <<
"Reuse port probably not supported";
237 auto pool = std::make_shared<IOThreadPoolExecutor>(2);
240 auto factory = std::make_shared<TestPipelineFactory>();
241 server.childPipeline(factory);
242 server.group(pool, pool);
254 client2.
pipelineFactory(std::make_shared<TestClientPipelineFactory>());
258 client3.
pipelineFactory(std::make_shared<TestClientPipelineFactory>());
262 client4.
pipelineFactory(std::make_shared<TestClientPipelineFactory>());
266 client5.
pipelineFactory(std::make_shared<TestClientPipelineFactory>());
277 TEST(Bootstrap, ExistingSocket) {
279 auto factory = std::make_shared<TestPipelineFactory>();
280 server.childPipeline(factory);
291 auto connEvent = boost::get<ConnEvent>(conn);
300 template <
typename HandlerPipeline>
305 pipeline->addBack(HandlerPipeline());
310 TEST(Bootstrap, LoadBalanceHandler) {
312 auto factory = std::make_shared<TestPipelineFactory>();
313 server.childPipeline(factory);
315 auto pipelinefactory =
316 std::make_shared<TestHandlerPipelineFactory<TestHandlerPipeline>>();
317 server.pipeline(pipelinefactory);
342 auto factory = std::make_shared<TestPipelineFactory>();
343 auto pipelinefactory =
344 std::make_shared<TestHandlerPipelineFactory<TestUDPPipeline>>();
345 server.pipeline(pipelinefactory);
346 server.channelFactory(std::make_shared<AsyncUDPServerSocketFactory>());
350 TEST(Bootstrap, UDPClientServerTest) {
354 auto factory = std::make_shared<TestPipelineFactory>();
355 auto pipelinefactory =
356 std::make_shared<TestHandlerPipelineFactory<TestUDPPipeline>>();
357 server.pipeline(pipelinefactory);
358 server.channelFactory(std::make_shared<AsyncUDPServerSocketFactory>());
368 client.
bind(localhost);
371 *(
data->writableData()) =
'a';
382 auto factory = std::make_shared<TestPipelineFactory>();
385 auto socketPath = (tmpdir.path() /
"sock").
string();
387 server.childPipeline(factory);
390 server.bind(address);
395 auto pipelineFuture = client.
connect(address);
404 TEST(Bootstrap, ServerBindFailure) {
408 serverSocket->bind(0);
409 serverSocket->listen(0);
418 auto factory = std::make_shared<TestPipelineFactory>();
419 server.childPipeline(factory);
421 server.bind(address);
422 FAIL() <<
"shouldn't be allowed to bind to an in-use address";
423 }
catch (
const std::system_error& ex) {
424 EXPECT_EQ(EADDRINUSE, ex.code().value()) <<
"unexpected error code " <<
425 ex.code().value() <<
": " << ex.code().message();
void setFromPath(StringPiece path)
EventBase * getEventBase() const
TEST(Wangle, ClientServerTest)
static std::unique_ptr< IOBuf > create(std::size_t capacity)
std::atomic< int > connections
boost::variant< folly::IOBuf *, folly::AsyncTransportWrapper *, ConnInfo &, ConnEvent, std::tuple< folly::IOBuf *, std::shared_ptr< folly::AsyncUDPSocket >, folly::SocketAddress > > AcceptPipelineType
#define EXPECT_EQ(val1, val2)
constexpr detail::Map< Move > move
bool tryRunAfterDelay(Func cob, uint32_t milliseconds, InternalEnum internal=InternalEnum::NORMAL)
HandlerAdapter< folly::IOBufQueue &, std::unique_ptr< folly::IOBuf > > BytesToBytesHandler
folly::Future< Pipeline * > connect(const folly::SocketAddress &address, std::chrono::milliseconds timeout=std::chrono::milliseconds(0)) override
—— Concurrent Priority Queue Implementation ——
void read(Context *ctx, AcceptPipelineType conn) override
static EventBaseManager * get()
virtual ssize_t write(const folly::SocketAddress &address, const std::unique_ptr< folly::IOBuf > &buf)
BytesPipeline::Ptr newPipeline(std::shared_ptr< AsyncTransportWrapper > sock) override
std::unique_ptr< AsyncTransportWrapper, Destructor > UniquePtr
BaseClientBootstrap< P > * pipelineFactory(std::shared_ptr< PipelineFactory< P >> factory) noexcept
static std::shared_ptr< AsyncServerSocket > newSocket(EventBase *evb=nullptr)
std::shared_ptr< Acceptor > newAcceptor(EventBase *) override
NetworkSocket socket(int af, int type, int protocol)
socklen_t getAddress(sockaddr_storage *addr) const
virtual void bind(const folly::SocketAddress &address)
void read(Context *, AcceptPipelineType) override
std::unique_ptr< AsyncServerSocket, Destructor > UniquePtr
#define EXPECT_TRUE(condition)
Pipeline< IOBufQueue &, std::unique_ptr< IOBuf > > BytesPipeline
ClientBootstrap< BytesPipeline > TestClient
AcceptPipeline::Ptr newPipeline(Acceptor *) override
virtual void fireRead(In msg)=0
BytesPipeline::Ptr newPipeline(std::shared_ptr< AsyncTransportWrapper >) override
const folly::SocketAddress & getAddress() const
void onNewConnection(AsyncTransportWrapper::UniquePtr, const folly::SocketAddress *, const std::string &, SecureTransportType, const TransportInfo &) override
std::shared_ptr< Pipeline > Ptr
static constexpr uint64_t data[1]
virtual void init(folly::AsyncServerSocket *serverSocket, folly::EventBase *eventBase, SSLStats *stats=nullptr)
ServerBootstrap< BytesPipeline > TestServer