proxygen
ServiceTest.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <gtest/gtest.h>
18 
23 #include <wangle/service/Service.h>
26 
27 namespace wangle {
28 
29 using namespace wangle;
30 using namespace folly;
31 
33 
35  public:
36  bool decode(Context*,
37  IOBufQueue& buf,
38  std::unique_ptr<IOBuf>& result,
39  size_t&) override {
40  result = buf.move();
41  return result != nullptr;
42  }
43 };
44 
45 class EchoService : public Service<std::string, std::string> {
46  public:
47  Future<std::string> operator()(std::string req) override { return req; }
48 };
49 
50 class EchoIntService : public Service<std::string, int> {
51  public:
53  return folly::to<int>(req);
54  }
55 };
56 
57 template <typename Req, typename Resp>
59  : public PipelineFactory<ServicePipeline> {
60  public:
61 
63  std::shared_ptr<AsyncTransportWrapper> socket) override {
64  auto pipeline = ServicePipeline::create();
65  pipeline->addBack(AsyncSocketHandler(socket));
66  pipeline->addBack(SimpleDecode());
67  pipeline->addBack(StringCodec());
68  pipeline->addBack(SerialServerDispatcher<Req, Resp>(&service_));
69  pipeline->finalize();
70  return pipeline;
71  }
72 
73  private:
75 };
76 
77 template <typename Req, typename Resp>
78 class ClientPipelineFactory : public PipelineFactory<ServicePipeline> {
79  public:
80 
82  std::shared_ptr<AsyncTransportWrapper> socket) override {
83  auto pipeline = ServicePipeline::create();
84  pipeline->addBack(AsyncSocketHandler(socket));
85  pipeline->addBack(SimpleDecode());
86  pipeline->addBack(StringCodec());
87  pipeline->finalize();
88  return pipeline;
89  }
90 };
91 
92 template <typename Pipeline, typename Req, typename Resp>
93 class ClientServiceFactory : public ServiceFactory<Pipeline, Req, Resp> {
94  public:
95  class ClientService : public Service<Req, Resp> {
96  public:
97  explicit ClientService(Pipeline* pipeline) {
98  dispatcher_.setPipeline(pipeline);
99  }
100  Future<Resp> operator()(Req request) override {
101  return dispatcher_(std::move(request));
102  }
103  private:
105  };
106 
108  std::shared_ptr<ClientBootstrap<Pipeline>> client) override {
110  std::make_shared<ClientService>(client->getPipeline()));
111  }
112 };
113 
114 TEST(Wangle, ClientServerTest) {
115  // server
117  server.childPipeline(
119  server.bind(0);
120 
121  // client
122  auto client = std::make_shared<ClientBootstrap<ServicePipeline>>();
124  client->pipelineFactory(
127  server.getSockets()[0]->getAddress(&addr);
128  client->connect(addr).waitVia(EventBaseManager::get()->getEventBase());
129 
130  auto service = serviceFactory(client).value();
131  auto rep = (*service)("test");
132 
133  std::move(rep).thenValue([&](std::string value) {
134  EXPECT_EQ("test", value);
136  });
138  server.stop();
139 }
140 
141 class AppendFilter : public ServiceFilter<std::string, std::string> {
142  public:
143  explicit AppendFilter(
144  std::shared_ptr<Service<std::string, std::string>> service) :
145  ServiceFilter<std::string, std::string>(service) {}
146 
148  return (*service_)(req + "\n");
149  }
150 };
151 
153  : public ServiceFilter<int, int, std::string, std::string> {
154  public:
156  std::shared_ptr<Service<std::string, std::string>> service) :
157  ServiceFilter<int, int, std::string, std::string>(service) {}
158 
159  Future<int> operator()(int req) override {
160  return (*service_)(folly::to<std::string>(req))
161  .thenValue([](std::string resp) { return folly::to<int>(resp); });
162  }
163 };
164 
165 TEST(Wangle, FilterTest) {
166  auto service = std::make_shared<EchoService>();
167  auto filter = std::make_shared<AppendFilter>(service);
168  auto result = (*filter)("test");
169  EXPECT_EQ(result.value(), "test\n");
170 }
171 
172 TEST(Wangle, ComplexFilterTest) {
173  auto service = std::make_shared<EchoService>();
174  auto filter = std::make_shared<IntToStringFilter>(service);
175  auto result = (*filter)(1);
176  EXPECT_EQ(result.value(), 1);
177 }
178 
180  : public ServiceFilter<int, std::string, std::string, int> {
181  public:
183  std::shared_ptr<Service<std::string, int>> service) :
184  ServiceFilter<int, std::string, std::string, int>(service) {}
185 
186  Future<std::string> operator()(int req) override {
187  return (*service_)(folly::to<std::string>(req)).thenValue([](int resp) {
188  return folly::to<std::string>(resp);
189  });
190  }
191 };
192 
193 TEST(Wangle, SuperComplexFilterTest) {
194  auto service = std::make_shared<EchoIntService>();
195  auto filter = std::make_shared<ChangeTypeFilter>(service);
196  auto result = (*filter)(1);
197  EXPECT_EQ(result.value(), "1");
198 }
199 
200 template <typename Pipeline, typename Req, typename Resp>
201 class ConnectionCountFilter : public ServiceFactoryFilter<Pipeline, Req, Resp> {
202  public:
204  std::shared_ptr<ServiceFactory<Pipeline, Req, Resp>> factory)
205  : ServiceFactoryFilter<Pipeline, Req, Resp>(factory) {}
206 
208  std::shared_ptr<ClientBootstrap<Pipeline>> client) override {
209  connectionCount++;
210  return (*this->serviceFactory_)(client);
211  }
212 
213  int connectionCount{0};
214 };
215 
217  // server
219  server.childPipeline(
221  server.bind(0);
222 
223  // client
224  auto clientFactory =
225  std::make_shared<
227  auto countingFactory =
228  std::make_shared<
230  clientFactory);
231 
232  auto client = std::make_shared<ClientBootstrap<ServicePipeline>>();
233  client->pipelineFactory(
236  server.getSockets()[0]->getAddress(&addr);
237  client->connect(addr).waitVia(EventBaseManager::get()->getEventBase());
238 
239  auto service = (*countingFactory)(client).value();
240 
241  // After the first service goes away, the client can be reused
242  service = (*countingFactory)(client).value();
243  EXPECT_EQ(2, countingFactory->connectionCount);
244 
245  server.stop();
246 }
247 
249  auto constfactory =
250  std::make_shared<ConstFactory<ServicePipeline, std::string, std::string>>(
251  std::make_shared<EchoService>());
253  constfactory);
254 
255  EXPECT_EQ("test", service("test").value());
256 }
257 
258 class TimekeeperTester : public Timekeeper {
259  public:
261  Promise<Unit> p;
262  auto f = p.getFuture();
263  promises_.push_back(std::move(p));
264  return f;
265  }
266  template <class Clock>
267  Future<Unit> at(std::chrono::time_point<Clock>) {
268  Promise<Unit> p;
269  auto f = p.getFuture();
270  promises_.push_back(std::move(p));
271  return f;
272  }
273  std::vector<Promise<Unit>> promises_;
274 };
275 
276 TEST(ServiceFilter, ExpiringMax) {
277  TimekeeperTester timekeeper;
278 
279  std::shared_ptr<Service<std::string, std::string>> service =
280  std::make_shared<EchoService>();
281  std::shared_ptr<Service<std::string, std::string>> closeOnReleaseService =
282  std::make_shared<CloseOnReleaseFilter<std::string, std::string>>(service);
283  std::shared_ptr<Service<std::string, std::string>> expiringService =
284  std::make_shared<ExpiringFilter<std::string, std::string>>(
285  closeOnReleaseService,
286  std::chrono::milliseconds(0),
287  std::chrono::milliseconds(400),
288  &timekeeper);
289 
290  EXPECT_EQ("test", (*expiringService)("test").get());
291  timekeeper.promises_[0].setValue();
292  EXPECT_TRUE((*expiringService)("test").getTry().hasException());
293 }
294 
295 TEST(ServiceFilter, ExpiringIdle) {
296  TimekeeperTester timekeeper;
297 
298  std::shared_ptr<Service<std::string, std::string>> service =
299  std::make_shared<EchoService>();
300  std::shared_ptr<Service<std::string, std::string>> closeOnReleaseService =
301  std::make_shared<CloseOnReleaseFilter<std::string, std::string>>(service);
302  std::shared_ptr<Service<std::string, std::string>> expiringService =
303  std::make_shared<ExpiringFilter<std::string, std::string>>(
304  closeOnReleaseService,
305  std::chrono::milliseconds(100),
306  std::chrono::milliseconds(0),
307  &timekeeper);
308 
309  EXPECT_EQ(1, timekeeper.promises_.size());
310 }
311 
312 TEST(ServiceFilter, NoIdleDuringRequests) {
313  TimekeeperTester timekeeper;
314 
315  std::shared_ptr<Service<std::string, std::string>> service =
316  std::make_shared<EchoService>();
317  std::shared_ptr<Service<std::string, std::string>> closeOnReleaseService =
318  std::make_shared<CloseOnReleaseFilter<std::string, std::string>>(service);
319  std::shared_ptr<Service<std::string, std::string>> expiringService =
320  std::make_shared<ExpiringFilter<std::string, std::string>>(
321  closeOnReleaseService,
322  std::chrono::milliseconds(1),
323  std::chrono::milliseconds(0),
324  &timekeeper);
325 
326  auto f = (*expiringService)("2000");
327  EXPECT_EQ(2, timekeeper.promises_.size());
328  std::move(f).get();
329  EXPECT_EQ("2000", (*expiringService)("2000").get());
330  EXPECT_EQ(3, timekeeper.promises_.size());
331 }
332 
333 } // namespace wangle
ConnectionCountFilter(std::shared_ptr< ServiceFactory< Pipeline, Req, Resp >> factory)
Future< std::string > operator()(std::string req) override
Definition: ServiceTest.cpp:47
auto f
Future< Unit > after(Duration) override
EventBase * getEventBase() const
PUSHMI_INLINE_VAR constexpr detail::filter_fn filter
Definition: filter.h:75
AppendFilter(std::shared_ptr< Service< std::string, std::string >> service)
TEST(Wangle, ClientServerTest)
Future< Unit > at(std::chrono::time_point< Clock >)
bool decode(Context *, IOBufQueue &buf, std::unique_ptr< IOBuf > &result, size_t &) override
Definition: ServiceTest.cpp:36
void bind(folly::AsyncServerSocket::UniquePtr s)
#define EXPECT_EQ(val1, val2)
Definition: gtest.h:1922
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
EventBase * getEventBase()
ServicePipeline::Ptr newPipeline(std::shared_ptr< AsyncTransportWrapper > socket) override
Definition: ServiceTest.cpp:81
STL namespace.
Future< Resp > operator()(Req request) override
Future< int > operator()(std::string req) override
Definition: ServiceTest.cpp:52
std::unique_ptr< folly::IOBuf > move()
Definition: IOBufQueue.h:459
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
Future< std::string > operator()(std::string req) override
SerialClientDispatcher< Pipeline, Req, Resp > dispatcher_
static EventBaseManager * get()
ServerBootstrap * childPipeline(std::shared_ptr< PipelineFactory< Pipeline >> factory)
InboundHandler< folly::IOBufQueue &, M >::Context Context
std::chrono::milliseconds Duration
Definition: Types.h:36
std::vector< Promise< Unit > > promises_
void terminateLoopSoon()
Definition: EventBase.cpp:493
Future< std::string > operator()(int req) override
Future< T > getFuture()
Definition: Promise-inl.h:97
NetworkSocket socket(int af, int type, int protocol)
Definition: NetOps.cpp:412
Future< std::shared_ptr< Service< Req, Resp > > > operator()(std::shared_ptr< ClientBootstrap< Pipeline >> client) override
static const char *const value
Definition: Conv.cpp:50
const std::vector< std::shared_ptr< folly::AsyncSocketBase > > & getSockets() const
ServicePipeline::Ptr newPipeline(std::shared_ptr< AsyncTransportWrapper > socket) override
Definition: ServiceTest.cpp:62
ChangeTypeFilter(std::shared_ptr< Service< std::string, int >> service)
#define EXPECT_TRUE(condition)
Definition: gtest.h:1859
const char * string
Definition: Conv.cpp:212
static Ptr create()
Definition: Pipeline.h:174
IntToStringFilter(std::shared_ptr< Service< std::string, std::string >> service)
Future< int > operator()(int req) override
std::shared_ptr< Pipeline > Ptr
Definition: Pipeline.h:172
Pipeline< IOBufQueue &, std::string > ServicePipeline
Definition: ServiceTest.cpp:32
ThreadPoolListHook * addr