proxygen
EventHandlerTest.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2014-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <bitset>
18 #include <future>
19 #include <thread>
20 
21 #include <folly/MPMCQueue.h>
22 #include <folly/ScopeGuard.h>
28 #include <sys/eventfd.h>
29 
30 using namespace std;
31 using namespace folly;
32 using namespace testing;
33 
34 void runInThreadsAndWait(size_t nthreads, function<void(size_t)> cb) {
35  vector<thread> threads(nthreads);
36  for (size_t i = 0; i < nthreads; ++i) {
37  threads[i] = thread(cb, i);
38  }
39  for (size_t i = 0; i < nthreads; ++i) {
40  threads[i].join();
41  }
42 }
43 
44 void runInThreadsAndWait(vector<function<void()>> cbs) {
45  runInThreadsAndWait(cbs.size(), [&](size_t k) { cbs[k](); });
46 }
47 
49  public:
50  EventHandlerMock(EventBase* eb, int fd) : EventHandler(eb, fd) {}
51  // gmock can't mock noexcept methods, so we need an intermediary
52  MOCK_METHOD1(_handlerReady, void(uint16_t));
53  void handlerReady(uint16_t events) noexcept override {
54  _handlerReady(events);
55  }
56 };
57 
58 class EventHandlerTest : public Test {
59  public:
60  int efd = 0;
61 
62  void SetUp() override {
63  efd = eventfd(0, EFD_SEMAPHORE);
64  ASSERT_THAT(efd, Gt(0));
65  }
66 
67  void TearDown() override {
68  if (efd > 0) {
69  close(efd);
70  }
71  efd = 0;
72  }
73 
75  write(efd, &val, sizeof(val));
76  }
77 
79  uint64_t val = 0;
80  read(efd, &val, sizeof(val));
81  return val;
82  }
83 };
84 
86  const size_t writes = 4;
87  size_t readsRemaining = writes;
88 
89  EventBase eb;
90  EventHandlerMock eh(&eb, efd);
91  eh.registerHandler(EventHandler::READ | EventHandler::PERSIST);
92  EXPECT_CALL(eh, _handlerReady(_))
93  .Times(writes)
94  .WillRepeatedly(Invoke([&](uint16_t /* events */) {
95  efd_read();
96  if (--readsRemaining == 0) {
97  eh.unregisterHandler();
98  }
99  }));
100  efd_write(writes);
101  eb.loop();
102 
103  EXPECT_EQ(0, readsRemaining);
104 }
105 
106 TEST_F(EventHandlerTest, many_concurrent_producers) {
107  const size_t writes = 200;
108  const size_t nproducers = 20;
109  size_t readsRemaining = writes;
110 
112  [&] {
113  EventBase eb;
114  EventHandlerMock eh(&eb, efd);
115  eh.registerHandler(EventHandler::READ | EventHandler::PERSIST);
116  EXPECT_CALL(eh, _handlerReady(_))
117  .Times(writes)
118  .WillRepeatedly(Invoke([&](uint16_t /* events */) {
119  efd_read();
120  if (--readsRemaining == 0) {
121  eh.unregisterHandler();
122  }
123  }));
124  eb.loop();
125  },
126  [&] {
127  runInThreadsAndWait(nproducers, [&](size_t /* k */) {
128  for (size_t i = 0; i < writes / nproducers; ++i) {
129  this_thread::sleep_for(std::chrono::milliseconds(1));
130  efd_write(1);
131  }
132  });
133  },
134  });
135 
136  EXPECT_EQ(0, readsRemaining);
137 }
138 
139 TEST_F(EventHandlerTest, many_concurrent_consumers) {
140  const size_t writes = 200;
141  const size_t nproducers = 8;
142  const size_t nconsumers = 20;
143  atomic<size_t> writesRemaining(writes);
144  atomic<size_t> readsRemaining(writes);
145 
146  MPMCQueue<nullptr_t> queue(writes / 10);
147 
149  [&] {
150  runInThreadsAndWait(nconsumers, [&](size_t /* k */) {
151  size_t thReadsRemaining = writes / nconsumers;
152  EventBase eb;
153  EventHandlerMock eh(&eb, efd);
154  eh.registerHandler(EventHandler::READ | EventHandler::PERSIST);
155  EXPECT_CALL(eh, _handlerReady(_))
156  .WillRepeatedly(Invoke([&](uint16_t /* events */) {
157  nullptr_t val;
158  if (!queue.readIfNotEmpty(val)) {
159  return;
160  }
161  efd_read();
162  --readsRemaining;
163  if (--thReadsRemaining == 0) {
164  eh.unregisterHandler();
165  }
166  }));
167  eb.loop();
168  });
169  },
170  [&] {
171  runInThreadsAndWait(nproducers, [&](size_t /* k */) {
172  for (size_t i = 0; i < writes / nproducers; ++i) {
173  this_thread::sleep_for(std::chrono::milliseconds(1));
174  queue.blockingWrite(nullptr);
175  efd_write(1);
176  --writesRemaining;
177  }
178  });
179  },
180  });
181 
182  EXPECT_EQ(0, writesRemaining);
183  EXPECT_EQ(0, readsRemaining);
184 }
185 
186 #ifdef EV_PRI
187 //
188 // See rfc6093 for extensive discussion on TCP URG semantics. Specificaly,
189 // it points out that URG mechanism was never intended to be used
190 // for out-of-band information delivery. However, pretty much every
191 // implementation interprets the LAST octect or urgent data as the
192 // OOB byte.
193 //
194 class EventHandlerOobTest : public ::testing::Test {
195  public:
196  //
197  // Wait for port number to connect to, then connect and invoke
198  // clientOps(fd) where fd is the connection file descriptor
199  //
200  void runClient(std::function<void(int fd)> clientOps) {
201  clientThread = std::thread([serverPortFuture = serverReady.get_future(),
202  clientOps]() mutable {
203  int clientFd = socket(AF_INET, SOCK_STREAM, 0);
204  SCOPE_EXIT {
205  close(clientFd);
206  };
207  struct hostent* he{nullptr};
208  struct sockaddr_in server;
209 
210  std::array<const char, 10> hostname = {"localhost"};
211  he = gethostbyname(hostname.data());
212  PCHECK(he);
213 
214  memcpy(&server.sin_addr, he->h_addr_list[0], he->h_length);
215  server.sin_family = AF_INET;
216 
217  // block here until port is known
218  server.sin_port = serverPortFuture.get();
219  LOG(INFO) << "Server is ready";
220 
221  PCHECK(
222  ::connect(clientFd, (struct sockaddr*)&server, sizeof(server)) == 0);
223  LOG(INFO) << "Server connection available";
224 
225  clientOps(clientFd);
226  });
227  }
228 
229  //
230  // Bind, get port number, pass it to client, listen/accept and store the
231  // accepted fd
232  //
233  void acceptConn() {
234  // make the server.
235  int listenfd = socket(AF_INET, SOCK_STREAM, 0);
236  SCOPE_EXIT {
237  close(listenfd);
238  };
239  PCHECK(listenfd != -1) << "unable to open socket";
240 
241  struct sockaddr_in sin;
242  sin.sin_port = htons(0);
243  sin.sin_addr.s_addr = INADDR_ANY;
244  sin.sin_family = AF_INET;
245 
246  PCHECK(bind(listenfd, (struct sockaddr*)&sin, sizeof(sin)) >= 0)
247  << "Can't bind to port";
248  listen(listenfd, 5);
249 
250  struct sockaddr_in findSockName;
251  socklen_t sz = sizeof(findSockName);
252  getsockname(listenfd, (struct sockaddr*)&findSockName, &sz);
253  serverReady.set_value(findSockName.sin_port);
254 
255  struct sockaddr_in cli_addr;
256  socklen_t clilen = sizeof(cli_addr);
257  serverFd = accept(listenfd, (struct sockaddr*)&cli_addr, &clilen);
258  PCHECK(serverFd >= 0) << "can't accept";
259  }
260 
261  void SetUp() override {}
262 
263  void TearDown() override {
264  clientThread.join();
265  close(serverFd);
266  }
267 
268  EventBase eb;
269  std::thread clientThread;
270  std::promise<decltype(sockaddr_in::sin_port)> serverReady;
271  int serverFd{-1};
272 };
273 
274 //
275 // Test that sending OOB data is detected by event handler
276 //
277 TEST_F(EventHandlerOobTest, EPOLLPRI) {
278  auto clientOps = [](int fd) {
279  char buffer[] = "banana";
280  int n = send(fd, buffer, strlen(buffer) + 1, MSG_OOB);
281  LOG(INFO) << "Client send finished";
282  PCHECK(n > 0);
283  };
284 
285  runClient(clientOps);
286  acceptConn();
287 
288  struct SockEvent : public EventHandler {
289  SockEvent(EventBase* eb, int fd) : EventHandler(eb, fd), fd_(fd) {}
290 
291  void handlerReady(uint16_t events) noexcept override {
292  EXPECT_TRUE(EventHandler::EventFlags::PRI & events);
293  std::array<char, 255> buffer;
294  int n = read(fd_, buffer.data(), buffer.size());
295  //
296  // NB: we sent 7 bytes, but only received 6. The last byte
297  // has been stored in the OOB buffer.
298  //
299  EXPECT_EQ(6, n);
300  EXPECT_EQ("banana", std::string(buffer.data(), 6));
301  // now read the byte stored in OOB buffer
302  n = recv(fd_, buffer.data(), buffer.size(), MSG_OOB);
303  EXPECT_EQ(1, n);
304  }
305 
306  private:
307  int fd_;
308  } sockHandler(&eb, serverFd);
309 
310  sockHandler.registerHandler(EventHandler::EventFlags::PRI);
311  LOG(INFO) << "Registered Handler";
312  eb.loop();
313 }
314 
315 //
316 // Test if we can send an OOB byte and then normal data
317 //
318 TEST_F(EventHandlerOobTest, OOB_AND_NORMAL_DATA) {
319  auto clientOps = [](int sockfd) {
320  {
321  // OOB buffer can only hold one byte in most implementations
322  std::array<char, 2> buffer = {"X"};
323  int n = send(sockfd, buffer.data(), 1, MSG_OOB);
324  PCHECK(n > 0);
325  }
326 
327  {
328  std::array<char, 7> buffer = {"banana"};
329  int n = send(sockfd, buffer.data(), buffer.size(), 0);
330  PCHECK(n > 0);
331  }
332  };
333 
334  runClient(clientOps);
335  acceptConn();
336 
337  struct SockEvent : public EventHandler {
338  SockEvent(EventBase* eb, int fd) : EventHandler(eb, fd), eb_(eb), fd_(fd) {}
339 
340  void handlerReady(uint16_t events) noexcept override {
341  std::array<char, 255> buffer;
342  if (events & EventHandler::EventFlags::PRI) {
343  int n = recv(fd_, buffer.data(), buffer.size(), MSG_OOB);
344  EXPECT_EQ(1, n);
345  EXPECT_EQ("X", std::string(buffer.data(), 1));
346  registerHandler(EventHandler::EventFlags::READ);
347  return;
348  }
349 
350  if (events & EventHandler::EventFlags::READ) {
351  int n = recv(fd_, buffer.data(), buffer.size(), 0);
352  EXPECT_EQ(7, n);
353  EXPECT_EQ("banana", std::string(buffer.data()));
354  eb_->terminateLoopSoon();
355  return;
356  }
357  }
358 
359  private:
360  EventBase* eb_;
361  int fd_;
362  } sockHandler(&eb, serverFd);
363  sockHandler.registerHandler(
364  EventHandler::EventFlags::PRI | EventHandler::EventFlags::READ);
365  LOG(INFO) << "Registered Handler";
366  eb.loopForever();
367 }
368 
369 //
370 // Demonstrate that "regular" reads ignore the OOB byte sent to us
371 //
372 TEST_F(EventHandlerOobTest, SWALLOW_OOB) {
373  auto clientOps = [](int sockfd) {
374  {
375  std::array<char, 2> buffer = {"X"};
376  int n = send(sockfd, buffer.data(), 1, MSG_OOB);
377  PCHECK(n > 0);
378  }
379 
380  {
381  std::array<char, 7> buffer = {"banana"};
382  int n = send(sockfd, buffer.data(), buffer.size(), 0);
383  PCHECK(n > 0);
384  }
385  };
386 
387  runClient(clientOps);
388  acceptConn();
389 
390  struct SockEvent : public EventHandler {
391  SockEvent(EventBase* eb, int fd) : EventHandler(eb, fd), fd_(fd) {}
392 
393  void handlerReady(uint16_t events) noexcept override {
394  std::array<char, 255> buffer;
395  ASSERT_TRUE(events & EventHandler::EventFlags::READ);
396  int n = recv(fd_, buffer.data(), buffer.size(), 0);
397  EXPECT_EQ(7, n);
398  EXPECT_EQ("banana", std::string(buffer.data()));
399  }
400 
401  private:
402  int fd_;
403  } sockHandler(&eb, serverFd);
404  sockHandler.registerHandler(EventHandler::EventFlags::READ);
405  LOG(INFO) << "Registered Handler";
406  eb.loop();
407 }
408 #endif
internal::GtMatcher< Rhs > Gt(Rhs x)
std::vector< uint8_t > buffer(kBufferSize+16)
void efd_write(uint64_t val)
void write(const T &in, folly::io::Appender &appender)
Definition: Types-inl.h:112
int connect(NetworkSocket s, const sockaddr *name, socklen_t namelen)
Definition: NetOps.cpp:94
#define eventfd(initval, flags)
#define EXPECT_EQ(val1, val2)
Definition: gtest.h:1922
STL namespace.
double val
Definition: String.cpp:273
#define SCOPE_EXIT
Definition: ScopeGuard.h:274
static bool simple
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
requires E e noexcept(noexcept(s.error(std::move(e))))
std::vector< std::thread::id > threads
static int nthreads
int getsockname(NetworkSocket s, sockaddr *name, socklen_t *namelen)
Definition: NetOps.cpp:108
EventHandlerMock(EventBase *eb, int fd)
PolymorphicAction< internal::InvokeAction< FunctionImpl > > Invoke(FunctionImpl function_impl)
size_t read(T &out, folly::io::Cursor &cursor)
Definition: Types-inl.h:258
void runInThreadsAndWait(size_t nthreads, function< void(size_t)> cb)
void SetUp() override
ssize_t send(NetworkSocket s, const void *buf, size_t len, int flags)
Definition: NetOps.cpp:319
NetworkSocket socket(int af, int type, int protocol)
Definition: NetOps.cpp:412
int listen(NetworkSocket s, int backlog)
Definition: NetOps.cpp:137
TEST_F(EventHandlerTest, simple)
#define EXPECT_TRUE(condition)
Definition: gtest.h:1859
void handlerReady(uint16_t events) noexceptoverride
#define ASSERT_THAT(value, matcher)
#define MOCK_METHOD1(m,...)
const char * string
Definition: Conv.cpp:212
int bind(NetworkSocket s, const sockaddr *name, socklen_t namelen)
Definition: NetOps.cpp:76
#define EXPECT_CALL(obj, call)
const internal::AnythingMatcher _
ssize_t recv(NetworkSocket s, void *buf, size_t len, int flags)
Definition: NetOps.cpp:180
bool registerHandler(uint16_t events)
Definition: EventHandler.h:100
#define ASSERT_TRUE(condition)
Definition: gtest.h:1865
KeyT k
void TearDown() override
int close(NetworkSocket s)
Definition: NetOps.cpp:90
NetworkSocket accept(NetworkSocket s, sockaddr *addr, socklen_t *addrlen)
Definition: NetOps.cpp:71