proxygen
EventBaseTest.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2014-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <folly/Memory.h>
18 #include <folly/ScopeGuard.h>
19 
26 
27 #include <folly/futures/Promise.h>
28 
29 #include <atomic>
30 #include <iostream>
31 #include <memory>
32 #include <thread>
33 
34 using std::atomic;
35 using std::cerr;
36 using std::deque;
37 using std::endl;
38 using std::make_pair;
39 using std::pair;
40 using std::thread;
41 using std::unique_ptr;
42 using std::vector;
43 using std::chrono::duration_cast;
44 using std::chrono::microseconds;
45 using std::chrono::milliseconds;
46 
47 using namespace std::chrono_literals;
48 
49 using namespace folly;
50 
52 // Tests for read and write events
54 
55 enum { BUF_SIZE = 4096 };
56 
57 ssize_t writeToFD(int fd, size_t length) {
58  // write an arbitrary amount of data to the fd
59  auto bufv = vector<char>(length);
60  auto buf = bufv.data();
61  memset(buf, 'a', length);
62  ssize_t rc = write(fd, buf, length);
63  CHECK_EQ(rc, length);
64  return rc;
65 }
66 
67 size_t writeUntilFull(int fd) {
68  // Write to the fd until EAGAIN is returned
69  size_t bytesWritten = 0;
70  char buf[BUF_SIZE];
71  memset(buf, 'a', sizeof(buf));
72  while (true) {
73  ssize_t rc = write(fd, buf, sizeof(buf));
74  if (rc < 0) {
75  CHECK_EQ(errno, EAGAIN);
76  break;
77  } else {
78  bytesWritten += rc;
79  }
80  }
81  return bytesWritten;
82 }
83 
84 ssize_t readFromFD(int fd, size_t length) {
85  // write an arbitrary amount of data to the fd
86  auto buf = vector<char>(length);
87  return read(fd, buf.data(), length);
88 }
89 
90 size_t readUntilEmpty(int fd) {
91  // Read from the fd until EAGAIN is returned
92  char buf[BUF_SIZE];
93  size_t bytesRead = 0;
94  while (true) {
95  int rc = read(fd, buf, sizeof(buf));
96  if (rc == 0) {
97  CHECK(false) << "unexpected EOF";
98  } else if (rc < 0) {
99  CHECK_EQ(errno, EAGAIN);
100  break;
101  } else {
102  bytesRead += rc;
103  }
104  }
105  return bytesRead;
106 }
107 
108 void checkReadUntilEmpty(int fd, size_t expectedLength) {
109  ASSERT_EQ(readUntilEmpty(fd), expectedLength);
110 }
111 
115  size_t length;
116  ssize_t result;
117 
118  void perform(int fd) {
119  if (events & EventHandler::READ) {
120  if (length == 0) {
121  result = readUntilEmpty(fd);
122  } else {
123  result = readFromFD(fd, length);
124  }
125  }
126  if (events & EventHandler::WRITE) {
127  if (length == 0) {
128  result = writeUntilFull(fd);
129  } else {
130  result = writeToFD(fd, length);
131  }
132  }
133  }
134 };
135 
136 void scheduleEvents(EventBase* eventBase, int fd, ScheduledEvent* events) {
137  for (ScheduledEvent* ev = events; ev->milliseconds > 0; ++ev) {
138  eventBase->tryRunAfterDelay(
139  std::bind(&ScheduledEvent::perform, ev, fd), ev->milliseconds);
140  }
141 }
142 
143 class TestHandler : public EventHandler {
144  public:
145  TestHandler(EventBase* eventBase, int fd)
146  : EventHandler(eventBase, fd), fd_(fd) {}
147 
148  void handlerReady(uint16_t events) noexcept override {
149  ssize_t bytesRead = 0;
150  ssize_t bytesWritten = 0;
151  if (events & READ) {
152  // Read all available data, so EventBase will stop calling us
153  // until new data becomes available
154  bytesRead = readUntilEmpty(fd_);
155  }
156  if (events & WRITE) {
157  // Write until the pipe buffer is full, so EventBase will stop calling
158  // us until the other end has read some data
159  bytesWritten = writeUntilFull(fd_);
160  }
161 
162  log.emplace_back(events, bytesRead, bytesWritten);
163  }
164 
165  struct EventRecord {
166  EventRecord(uint16_t events_, size_t bytesRead_, size_t bytesWritten_)
167  : events(events_),
168  timestamp(),
169  bytesRead(bytesRead_),
170  bytesWritten(bytesWritten_) {}
171 
174  ssize_t bytesRead;
175  ssize_t bytesWritten;
176  };
177 
178  deque<EventRecord> log;
179 
180  private:
181  int fd_;
182 };
183 
187 TEST(EventBaseTest, ReadEvent) {
188  EventBase eb;
189  SocketPair sp;
190 
191  // Register for read events
192  TestHandler handler(&eb, sp[0]);
193  handler.registerHandler(EventHandler::READ);
194 
195  // Register timeouts to perform two write events
196  ScheduledEvent events[] = {
197  {10, EventHandler::WRITE, 2345, 0},
198  {160, EventHandler::WRITE, 99, 0},
199  {0, 0, 0, 0},
200  };
201  scheduleEvents(&eb, sp[1], events);
202 
203  // Loop
205  eb.loop();
206  TimePoint end;
207 
208  // Since we didn't use the EventHandler::PERSIST flag, the handler should
209  // have received the first read, then unregistered itself. Check that only
210  // the first chunk of data was received.
211  ASSERT_EQ(handler.log.size(), 1);
212  ASSERT_EQ(handler.log[0].events, EventHandler::READ);
214  start,
215  handler.log[0].timestamp,
216  milliseconds(events[0].milliseconds),
217  milliseconds(90));
218  ASSERT_EQ(handler.log[0].bytesRead, events[0].length);
219  ASSERT_EQ(handler.log[0].bytesWritten, 0);
221  start, end, milliseconds(events[1].milliseconds), milliseconds(30));
222 
223  // Make sure the second chunk of data is still waiting to be read.
224  size_t bytesRemaining = readUntilEmpty(sp[0]);
225  ASSERT_EQ(bytesRemaining, events[1].length);
226 }
227 
231 TEST(EventBaseTest, ReadPersist) {
232  EventBase eb;
233  SocketPair sp;
234 
235  // Register for read events
236  TestHandler handler(&eb, sp[0]);
237  handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
238 
239  // Register several timeouts to perform writes
240  ScheduledEvent events[] = {
241  {10, EventHandler::WRITE, 1024, 0},
242  {20, EventHandler::WRITE, 2211, 0},
243  {30, EventHandler::WRITE, 4096, 0},
244  {100, EventHandler::WRITE, 100, 0},
245  {0, 0, 0, 0},
246  };
247  scheduleEvents(&eb, sp[1], events);
248 
249  // Schedule a timeout to unregister the handler after the third write
251 
252  // Loop
254  eb.loop();
255  TimePoint end;
256 
257  // The handler should have received the first 3 events,
258  // then been unregistered after that.
259  ASSERT_EQ(handler.log.size(), 3);
260  for (int n = 0; n < 3; ++n) {
261  ASSERT_EQ(handler.log[n].events, EventHandler::READ);
263  start, handler.log[n].timestamp, milliseconds(events[n].milliseconds));
264  ASSERT_EQ(handler.log[n].bytesRead, events[n].length);
265  ASSERT_EQ(handler.log[n].bytesWritten, 0);
266  }
267  T_CHECK_TIMEOUT(start, end, milliseconds(events[3].milliseconds));
268 
269  // Make sure the data from the last write is still waiting to be read
270  size_t bytesRemaining = readUntilEmpty(sp[0]);
271  ASSERT_EQ(bytesRemaining, events[3].length);
272 }
273 
277 TEST(EventBaseTest, ReadImmediate) {
278  EventBase eb;
279  SocketPair sp;
280 
281  // Write some data to the socket so the other end will
282  // be immediately readable
283  size_t dataLength = 1234;
284  writeToFD(sp[1], dataLength);
285 
286  // Register for read events
287  TestHandler handler(&eb, sp[0]);
288  handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
289 
290  // Register a timeout to perform another write
291  ScheduledEvent events[] = {
292  {10, EventHandler::WRITE, 2345, 0},
293  {0, 0, 0, 0},
294  };
295  scheduleEvents(&eb, sp[1], events);
296 
297  // Schedule a timeout to unregister the handler
299 
300  // Loop
302  eb.loop();
303  TimePoint end;
304 
305  ASSERT_EQ(handler.log.size(), 2);
306 
307  // There should have been 1 event for immediate readability
308  ASSERT_EQ(handler.log[0].events, EventHandler::READ);
309  T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
310  ASSERT_EQ(handler.log[0].bytesRead, dataLength);
311  ASSERT_EQ(handler.log[0].bytesWritten, 0);
312 
313  // There should be another event after the timeout wrote more data
314  ASSERT_EQ(handler.log[1].events, EventHandler::READ);
316  start, handler.log[1].timestamp, milliseconds(events[0].milliseconds));
317  ASSERT_EQ(handler.log[1].bytesRead, events[0].length);
318  ASSERT_EQ(handler.log[1].bytesWritten, 0);
319 
320  T_CHECK_TIMEOUT(start, end, milliseconds(20));
321 }
322 
326 TEST(EventBaseTest, WriteEvent) {
327  EventBase eb;
328  SocketPair sp;
329 
330  // Fill up the write buffer before starting
331  size_t initialBytesWritten = writeUntilFull(sp[0]);
332 
333  // Register for write events
334  TestHandler handler(&eb, sp[0]);
335  handler.registerHandler(EventHandler::WRITE);
336 
337  // Register timeouts to perform two reads
338  ScheduledEvent events[] = {
339  {10, EventHandler::READ, 0, 0},
340  {60, EventHandler::READ, 0, 0},
341  {0, 0, 0, 0},
342  };
343  scheduleEvents(&eb, sp[1], events);
344 
345  // Loop
347  eb.loop();
348  TimePoint end;
349 
350  // Since we didn't use the EventHandler::PERSIST flag, the handler should
351  // have only been able to write once, then unregistered itself.
352  ASSERT_EQ(handler.log.size(), 1);
353  ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
355  start, handler.log[0].timestamp, milliseconds(events[0].milliseconds));
356  ASSERT_EQ(handler.log[0].bytesRead, 0);
357  ASSERT_GT(handler.log[0].bytesWritten, 0);
358  T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
359 
360  ASSERT_EQ(events[0].result, initialBytesWritten);
361  ASSERT_EQ(events[1].result, handler.log[0].bytesWritten);
362 }
363 
367 TEST(EventBaseTest, WritePersist) {
368  EventBase eb;
369  SocketPair sp;
370 
371  // Fill up the write buffer before starting
372  size_t initialBytesWritten = writeUntilFull(sp[0]);
373 
374  // Register for write events
375  TestHandler handler(&eb, sp[0]);
376  handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
377 
378  // Register several timeouts to read from the socket at several intervals
379  ScheduledEvent events[] = {
380  {10, EventHandler::READ, 0, 0},
381  {40, EventHandler::READ, 0, 0},
382  {70, EventHandler::READ, 0, 0},
383  {100, EventHandler::READ, 0, 0},
384  {0, 0, 0, 0},
385  };
386  scheduleEvents(&eb, sp[1], events);
387 
388  // Schedule a timeout to unregister the handler after the third read
390 
391  // Loop
393  eb.loop();
394  TimePoint end;
395 
396  // The handler should have received the first 3 events,
397  // then been unregistered after that.
398  ASSERT_EQ(handler.log.size(), 3);
399  ASSERT_EQ(events[0].result, initialBytesWritten);
400  for (int n = 0; n < 3; ++n) {
401  ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
403  start, handler.log[n].timestamp, milliseconds(events[n].milliseconds));
404  ASSERT_EQ(handler.log[n].bytesRead, 0);
405  ASSERT_GT(handler.log[n].bytesWritten, 0);
406  ASSERT_EQ(handler.log[n].bytesWritten, events[n + 1].result);
407  }
408  T_CHECK_TIMEOUT(start, end, milliseconds(events[3].milliseconds));
409 }
410 
414 TEST(EventBaseTest, WriteImmediate) {
415  EventBase eb;
416  SocketPair sp;
417 
418  // Register for write events
419  TestHandler handler(&eb, sp[0]);
420  handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
421 
422  // Register a timeout to perform a read
423  ScheduledEvent events[] = {
424  {10, EventHandler::READ, 0, 0},
425  {0, 0, 0, 0},
426  };
427  scheduleEvents(&eb, sp[1], events);
428 
429  // Schedule a timeout to unregister the handler
430  int64_t unregisterTimeout = 40;
431  eb.tryRunAfterDelay(
432  std::bind(&TestHandler::unregisterHandler, &handler), unregisterTimeout);
433 
434  // Loop
436  eb.loop();
437  TimePoint end;
438 
439  ASSERT_EQ(handler.log.size(), 2);
440 
441  // Since the socket buffer was initially empty,
442  // there should have been 1 event for immediate writability
443  ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
444  T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
445  ASSERT_EQ(handler.log[0].bytesRead, 0);
446  ASSERT_GT(handler.log[0].bytesWritten, 0);
447 
448  // There should be another event after the timeout wrote more data
449  ASSERT_EQ(handler.log[1].events, EventHandler::WRITE);
451  start, handler.log[1].timestamp, milliseconds(events[0].milliseconds));
452  ASSERT_EQ(handler.log[1].bytesRead, 0);
453  ASSERT_GT(handler.log[1].bytesWritten, 0);
454 
455  T_CHECK_TIMEOUT(start, end, milliseconds(unregisterTimeout));
456 }
457 
461 TEST(EventBaseTest, ReadWrite) {
462  EventBase eb;
463  SocketPair sp;
464 
465  // Fill up the write buffer before starting
466  size_t sock0WriteLength = writeUntilFull(sp[0]);
467 
468  // Register for read and write events
469  TestHandler handler(&eb, sp[0]);
470  handler.registerHandler(EventHandler::READ_WRITE);
471 
472  // Register timeouts to perform a write then a read.
473  ScheduledEvent events[] = {
474  {10, EventHandler::WRITE, 2345, 0},
475  {40, EventHandler::READ, 0, 0},
476  {0, 0, 0, 0},
477  };
478  scheduleEvents(&eb, sp[1], events);
479 
480  // Loop
482  eb.loop();
483  TimePoint end;
484 
485  // Since we didn't use the EventHandler::PERSIST flag, the handler should
486  // have only noticed readability, then unregistered itself. Check that only
487  // one event was logged.
488  ASSERT_EQ(handler.log.size(), 1);
489  ASSERT_EQ(handler.log[0].events, EventHandler::READ);
491  start, handler.log[0].timestamp, milliseconds(events[0].milliseconds));
492  ASSERT_EQ(handler.log[0].bytesRead, events[0].length);
493  ASSERT_EQ(handler.log[0].bytesWritten, 0);
494  ASSERT_EQ(events[1].result, sock0WriteLength);
495  T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
496 }
497 
501 TEST(EventBaseTest, WriteRead) {
502  EventBase eb;
503  SocketPair sp;
504 
505  // Fill up the write buffer before starting
506  size_t sock0WriteLength = writeUntilFull(sp[0]);
507 
508  // Register for read and write events
509  TestHandler handler(&eb, sp[0]);
510  handler.registerHandler(EventHandler::READ_WRITE);
511 
512  // Register timeouts to perform a read then a write.
513  size_t sock1WriteLength = 2345;
514  ScheduledEvent events[] = {
515  {10, EventHandler::READ, 0, 0},
516  {40, EventHandler::WRITE, sock1WriteLength, 0},
517  {0, 0, 0, 0},
518  };
519  scheduleEvents(&eb, sp[1], events);
520 
521  // Loop
523  eb.loop();
524  TimePoint end;
525 
526  // Since we didn't use the EventHandler::PERSIST flag, the handler should
527  // have only noticed writability, then unregistered itself. Check that only
528  // one event was logged.
529  ASSERT_EQ(handler.log.size(), 1);
530  ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
532  start, handler.log[0].timestamp, milliseconds(events[0].milliseconds));
533  ASSERT_EQ(handler.log[0].bytesRead, 0);
534  ASSERT_GT(handler.log[0].bytesWritten, 0);
535  ASSERT_EQ(events[0].result, sock0WriteLength);
536  ASSERT_EQ(events[1].result, sock1WriteLength);
537  T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
538 
539  // Make sure the written data is still waiting to be read.
540  size_t bytesRemaining = readUntilEmpty(sp[0]);
541  ASSERT_EQ(bytesRemaining, events[1].length);
542 }
543 
548 TEST(EventBaseTest, ReadWriteSimultaneous) {
549  EventBase eb;
550  SocketPair sp;
551 
552  // Fill up the write buffer before starting
553  size_t sock0WriteLength = writeUntilFull(sp[0]);
554 
555  // Register for read and write events
556  TestHandler handler(&eb, sp[0]);
557  handler.registerHandler(EventHandler::READ_WRITE);
558 
559  // Register a timeout to perform a read and write together
560  ScheduledEvent events[] = {
561  {10, EventHandler::READ | EventHandler::WRITE, 0, 0},
562  {0, 0, 0, 0},
563  };
564  scheduleEvents(&eb, sp[1], events);
565 
566  // Loop
568  eb.loop();
569  TimePoint end;
570 
571  // It's not strictly required that the EventBase register us about both
572  // events in the same call. So, it's possible that if the EventBase
573  // implementation changes this test could start failing, and it wouldn't be
574  // considered breaking the API. However for now it's nice to exercise this
575  // code path.
576  ASSERT_EQ(handler.log.size(), 1);
577  ASSERT_EQ(handler.log[0].events, EventHandler::READ | EventHandler::WRITE);
579  start, handler.log[0].timestamp, milliseconds(events[0].milliseconds));
580  ASSERT_EQ(handler.log[0].bytesRead, sock0WriteLength);
581  ASSERT_GT(handler.log[0].bytesWritten, 0);
582  T_CHECK_TIMEOUT(start, end, milliseconds(events[0].milliseconds));
583 }
584 
588 TEST(EventBaseTest, ReadWritePersist) {
589  EventBase eb;
590  SocketPair sp;
591 
592  // Register for read and write events
593  TestHandler handler(&eb, sp[0]);
594  handler.registerHandler(
595  EventHandler::READ | EventHandler::WRITE | EventHandler::PERSIST);
596 
597  // Register timeouts to perform several reads and writes
598  ScheduledEvent events[] = {
599  {10, EventHandler::WRITE, 2345, 0},
600  {20, EventHandler::READ, 0, 0},
601  {35, EventHandler::WRITE, 200, 0},
602  {45, EventHandler::WRITE, 15, 0},
603  {55, EventHandler::READ, 0, 0},
604  {120, EventHandler::WRITE, 2345, 0},
605  {0, 0, 0, 0},
606  };
607  scheduleEvents(&eb, sp[1], events);
608 
609  // Schedule a timeout to unregister the handler
611 
612  // Loop
614  eb.loop();
615  TimePoint end;
616 
617  ASSERT_EQ(handler.log.size(), 6);
618 
619  // Since we didn't fill up the write buffer immediately, there should
620  // be an immediate event for writability.
621  ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
622  T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
623  ASSERT_EQ(handler.log[0].bytesRead, 0);
624  ASSERT_GT(handler.log[0].bytesWritten, 0);
625 
626  // Events 1 through 5 should correspond to the scheduled events
627  for (int n = 1; n < 6; ++n) {
628  ScheduledEvent* event = &events[n - 1];
630  start, handler.log[n].timestamp, milliseconds(event->milliseconds));
631  if (event->events == EventHandler::READ) {
632  ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
633  ASSERT_EQ(handler.log[n].bytesRead, 0);
634  ASSERT_GT(handler.log[n].bytesWritten, 0);
635  } else {
636  ASSERT_EQ(handler.log[n].events, EventHandler::READ);
637  ASSERT_EQ(handler.log[n].bytesRead, event->length);
638  ASSERT_EQ(handler.log[n].bytesWritten, 0);
639  }
640  }
641 
642  // The timeout should have unregistered the handler before the last write.
643  // Make sure that data is still waiting to be read
644  size_t bytesRemaining = readUntilEmpty(sp[0]);
645  ASSERT_EQ(bytesRemaining, events[5].length);
646 }
647 
649  public:
650  PartialReadHandler(EventBase* eventBase, int fd, size_t readLength)
651  : TestHandler(eventBase, fd), fd_(fd), readLength_(readLength) {}
652 
653  void handlerReady(uint16_t events) noexcept override {
654  assert(events == EventHandler::READ);
655  ssize_t bytesRead = readFromFD(fd_, readLength_);
656  log.emplace_back(events, bytesRead, 0);
657  }
658 
659  private:
660  int fd_;
661  size_t readLength_;
662 };
663 
669 TEST(EventBaseTest, ReadPartial) {
670  EventBase eb;
671  SocketPair sp;
672 
673  // Register for read events
674  size_t readLength = 100;
675  PartialReadHandler handler(&eb, sp[0], readLength);
676  handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
677 
678  // Register a timeout to perform a single write,
679  // with more data than PartialReadHandler will read at once
680  ScheduledEvent events[] = {
681  {10, EventHandler::WRITE, (3 * readLength) + (readLength / 2), 0},
682  {0, 0, 0, 0},
683  };
684  scheduleEvents(&eb, sp[1], events);
685 
686  // Schedule a timeout to unregister the handler
688 
689  // Loop
691  eb.loop();
692  TimePoint end;
693 
694  ASSERT_EQ(handler.log.size(), 4);
695 
696  // The first 3 invocations should read readLength bytes each
697  for (int n = 0; n < 3; ++n) {
698  ASSERT_EQ(handler.log[n].events, EventHandler::READ);
700  start, handler.log[n].timestamp, milliseconds(events[0].milliseconds));
701  ASSERT_EQ(handler.log[n].bytesRead, readLength);
702  ASSERT_EQ(handler.log[n].bytesWritten, 0);
703  }
704  // The last read only has readLength/2 bytes
705  ASSERT_EQ(handler.log[3].events, EventHandler::READ);
707  start, handler.log[3].timestamp, milliseconds(events[0].milliseconds));
708  ASSERT_EQ(handler.log[3].bytesRead, readLength / 2);
709  ASSERT_EQ(handler.log[3].bytesWritten, 0);
710 }
711 
713  public:
714  PartialWriteHandler(EventBase* eventBase, int fd, size_t writeLength)
715  : TestHandler(eventBase, fd), fd_(fd), writeLength_(writeLength) {}
716 
717  void handlerReady(uint16_t events) noexcept override {
718  assert(events == EventHandler::WRITE);
719  ssize_t bytesWritten = writeToFD(fd_, writeLength_);
720  log.emplace_back(events, 0, bytesWritten);
721  }
722 
723  private:
724  int fd_;
725  size_t writeLength_;
726 };
727 
733 TEST(EventBaseTest, WritePartial) {
734  EventBase eb;
735  SocketPair sp;
736 
737  // Fill up the write buffer before starting
738  size_t initialBytesWritten = writeUntilFull(sp[0]);
739 
740  // Register for write events
741  size_t writeLength = 100;
742  PartialWriteHandler handler(&eb, sp[0], writeLength);
743  handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
744 
745  // Register a timeout to read, so that more data can be written
746  ScheduledEvent events[] = {
747  {10, EventHandler::READ, 0, 0},
748  {0, 0, 0, 0},
749  };
750  scheduleEvents(&eb, sp[1], events);
751 
752  // Schedule a timeout to unregister the handler
754 
755  // Loop
757  eb.loop();
758  TimePoint end;
759 
760  // Depending on how big the socket buffer is, there will be multiple writes
761  // Only check the first 5
762  int numChecked = 5;
763  ASSERT_GE(handler.log.size(), numChecked);
764  ASSERT_EQ(events[0].result, initialBytesWritten);
765 
766  // The first 3 invocations should read writeLength bytes each
767  for (int n = 0; n < numChecked; ++n) {
768  ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
770  start, handler.log[n].timestamp, milliseconds(events[0].milliseconds));
771  ASSERT_EQ(handler.log[n].bytesRead, 0);
772  ASSERT_EQ(handler.log[n].bytesWritten, writeLength);
773  }
774 }
775 
779 TEST(EventBaseTest, DestroyHandler) {
780  class DestroyHandler : public AsyncTimeout {
781  public:
782  DestroyHandler(EventBase* eb, EventHandler* h)
783  : AsyncTimeout(eb), handler_(h) {}
784 
785  void timeoutExpired() noexcept override {
786  delete handler_;
787  }
788 
789  private:
790  EventHandler* handler_;
791  };
792 
793  EventBase eb;
794  SocketPair sp;
795 
796  // Fill up the write buffer before starting
797  size_t initialBytesWritten = writeUntilFull(sp[0]);
798 
799  // Register for write events
800  TestHandler* handler = new TestHandler(&eb, sp[0]);
801  handler->registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
802 
803  // After 10ms, read some data, so that the handler
804  // will be notified that it can write.
805  eb.tryRunAfterDelay(
806  std::bind(checkReadUntilEmpty, sp[1], initialBytesWritten), 10);
807 
808  // Start a timer to destroy the handler after 25ms
809  // This mainly just makes sure the code doesn't break or assert
810  DestroyHandler dh(&eb, handler);
811  dh.scheduleTimeout(25);
812 
814  eb.loop();
815  TimePoint end;
816 
817  // Make sure the EventHandler was uninstalled properly when it was
818  // destroyed, and the EventBase loop exited
819  T_CHECK_TIMEOUT(start, end, milliseconds(25));
820 
821  // Make sure that the handler wrote data to the socket
822  // before it was destroyed
823  size_t bytesRemaining = readUntilEmpty(sp[1]);
824  ASSERT_GT(bytesRemaining, 0);
825 }
826 
828 // Tests for timeout events
830 
831 TEST(EventBaseTest, RunAfterDelay) {
832  EventBase eb;
833 
834  TimePoint timestamp1(false);
835  TimePoint timestamp2(false);
836  TimePoint timestamp3(false);
837  eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp1), 10);
838  eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp2), 20);
839  eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp3), 40);
840 
842  eb.loop();
843  TimePoint end;
844 
845  T_CHECK_TIMEOUT(start, timestamp1, milliseconds(10));
846  T_CHECK_TIMEOUT(start, timestamp2, milliseconds(20));
847  T_CHECK_TIMEOUT(start, timestamp3, milliseconds(40));
848  T_CHECK_TIMEOUT(start, end, milliseconds(40));
849 }
850 
855 TEST(EventBaseTest, RunAfterDelayDestruction) {
856  TimePoint timestamp1(false);
857  TimePoint timestamp2(false);
858  TimePoint timestamp3(false);
859  TimePoint timestamp4(false);
860  TimePoint start(false);
861  TimePoint end(false);
862 
863  {
864  EventBase eb;
865 
866  // Run two normal timeouts
867  eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp1), 10);
868  eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp2), 20);
869 
870  // Schedule a timeout to stop the event loop after 40ms
871  eb.tryRunAfterDelay(std::bind(&EventBase::terminateLoopSoon, &eb), 40);
872 
873  // Schedule 2 timeouts that would fire after the event loop stops
874  eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp3), 80);
875  eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp4), 160);
876 
877  start.reset();
878  eb.loop();
879  end.reset();
880  }
881 
882  T_CHECK_TIMEOUT(start, timestamp1, milliseconds(10));
883  T_CHECK_TIMEOUT(start, timestamp2, milliseconds(20));
884  T_CHECK_TIMEOUT(start, end, milliseconds(40));
885 
886  ASSERT_TRUE(timestamp3.isUnset());
887  ASSERT_TRUE(timestamp4.isUnset());
888 
889  // Ideally this test should be run under valgrind to ensure that no
890  // memory is leaked.
891 }
892 
893 class TestTimeout : public AsyncTimeout {
894  public:
895  explicit TestTimeout(EventBase* eventBase)
896  : AsyncTimeout(eventBase), timestamp(false) {}
897 
898  void timeoutExpired() noexcept override {
899  timestamp.reset();
900  }
901 
903 };
904 
905 TEST(EventBaseTest, BasicTimeouts) {
906  EventBase eb;
907 
908  TestTimeout t1(&eb);
909  TestTimeout t2(&eb);
910  TestTimeout t3(&eb);
911  t1.scheduleTimeout(10);
912  t2.scheduleTimeout(20);
913  t3.scheduleTimeout(40);
914 
916  eb.loop();
917  TimePoint end;
918 
919  T_CHECK_TIMEOUT(start, t1.timestamp, milliseconds(10));
920  T_CHECK_TIMEOUT(start, t2.timestamp, milliseconds(20));
921  T_CHECK_TIMEOUT(start, t3.timestamp, milliseconds(40));
922  T_CHECK_TIMEOUT(start, end, milliseconds(40));
923 }
924 
926  public:
927  ReschedulingTimeout(EventBase* evb, const vector<uint32_t>& timeouts)
928  : AsyncTimeout(evb), timeouts_(timeouts), iterator_(timeouts_.begin()) {}
929 
930  void start() {
931  reschedule();
932  }
933 
934  void timeoutExpired() noexcept override {
935  timestamps.emplace_back();
936  reschedule();
937  }
938 
939  void reschedule() {
940  if (iterator_ != timeouts_.end()) {
941  uint32_t timeout = *iterator_;
942  ++iterator_;
943  scheduleTimeout(timeout);
944  }
945  }
946 
947  vector<TimePoint> timestamps;
948 
949  private:
950  vector<uint32_t> timeouts_;
951  vector<uint32_t>::const_iterator iterator_;
952 };
953 
957 TEST(EventBaseTest, ReuseTimeout) {
958  EventBase eb;
959 
960  vector<uint32_t> timeouts;
961  timeouts.push_back(10);
962  timeouts.push_back(30);
963  timeouts.push_back(15);
964 
965  ReschedulingTimeout t(&eb, timeouts);
966  t.start();
967 
969  eb.loop();
970  TimePoint end;
971 
972  // Use a higher tolerance than usual. We're waiting on 3 timeouts
973  // consecutively. In general, each timeout may go over by a few
974  // milliseconds, and we're tripling this error by witing on 3 timeouts.
975  milliseconds tolerance{6};
976 
977  ASSERT_EQ(timeouts.size(), t.timestamps.size());
978  uint32_t total = 0;
979  for (size_t n = 0; n < timeouts.size(); ++n) {
980  total += timeouts[n];
981  T_CHECK_TIMEOUT(start, t.timestamps[n], milliseconds(total), tolerance);
982  }
983  T_CHECK_TIMEOUT(start, end, milliseconds(total), tolerance);
984 }
985 
989 TEST(EventBaseTest, RescheduleTimeout) {
990  EventBase eb;
991 
992  TestTimeout t1(&eb);
993  TestTimeout t2(&eb);
994  TestTimeout t3(&eb);
995 
996  t1.scheduleTimeout(15);
997  t2.scheduleTimeout(30);
998  t3.scheduleTimeout(30);
999 
1000  auto f = static_cast<bool (AsyncTimeout::*)(uint32_t)>(
1001  &AsyncTimeout::scheduleTimeout);
1002 
1003  // after 10ms, reschedule t2 to run sooner than originally scheduled
1004  eb.tryRunAfterDelay(std::bind(f, &t2, 10), 10);
1005  // after 10ms, reschedule t3 to run later than originally scheduled
1006  eb.tryRunAfterDelay(std::bind(f, &t3, 40), 10);
1007 
1008  TimePoint start;
1009  eb.loop();
1010  TimePoint end;
1011 
1012  T_CHECK_TIMEOUT(start, t1.timestamp, milliseconds(15));
1013  T_CHECK_TIMEOUT(start, t2.timestamp, milliseconds(20));
1014  T_CHECK_TIMEOUT(start, t3.timestamp, milliseconds(50));
1015  T_CHECK_TIMEOUT(start, end, milliseconds(50));
1016 }
1017 
1021 TEST(EventBaseTest, CancelTimeout) {
1022  EventBase eb;
1023 
1024  vector<uint32_t> timeouts;
1025  timeouts.push_back(10);
1026  timeouts.push_back(30);
1027  timeouts.push_back(25);
1028 
1029  ReschedulingTimeout t(&eb, timeouts);
1030  t.start();
1031  eb.tryRunAfterDelay(std::bind(&AsyncTimeout::cancelTimeout, &t), 50);
1032 
1033  TimePoint start;
1034  eb.loop();
1035  TimePoint end;
1036 
1037  ASSERT_EQ(t.timestamps.size(), 2);
1038  T_CHECK_TIMEOUT(start, t.timestamps[0], milliseconds(10));
1039  T_CHECK_TIMEOUT(start, t.timestamps[1], milliseconds(40));
1040  T_CHECK_TIMEOUT(start, end, milliseconds(50));
1041 }
1042 
1046 TEST(EventBaseTest, DestroyTimeout) {
1047  class DestroyTimeout : public AsyncTimeout {
1048  public:
1049  DestroyTimeout(EventBase* eb, AsyncTimeout* t)
1050  : AsyncTimeout(eb), timeout_(t) {}
1051 
1052  void timeoutExpired() noexcept override {
1053  delete timeout_;
1054  }
1055 
1056  private:
1057  AsyncTimeout* timeout_;
1058  };
1059 
1060  EventBase eb;
1061 
1062  TestTimeout* t1 = new TestTimeout(&eb);
1063  t1->scheduleTimeout(30);
1064 
1065  DestroyTimeout dt(&eb, t1);
1066  dt.scheduleTimeout(10);
1067 
1068  TimePoint start;
1069  eb.loop();
1070  TimePoint end;
1071 
1072  T_CHECK_TIMEOUT(start, end, milliseconds(10));
1073 }
1074 
1078 TEST(EventBaseTest, ScheduledFn) {
1079  EventBase eb;
1080 
1081  TimePoint timestamp1(false);
1082  TimePoint timestamp2(false);
1083  TimePoint timestamp3(false);
1084  eb.schedule(std::bind(&TimePoint::reset, &timestamp1), milliseconds(9));
1085  eb.schedule(std::bind(&TimePoint::reset, &timestamp2), milliseconds(19));
1086  eb.schedule(std::bind(&TimePoint::reset, &timestamp3), milliseconds(39));
1087 
1088  TimePoint start;
1089  eb.loop();
1090  TimePoint end;
1091 
1092  T_CHECK_TIMEOUT(start, timestamp1, milliseconds(9));
1093  T_CHECK_TIMEOUT(start, timestamp2, milliseconds(19));
1094  T_CHECK_TIMEOUT(start, timestamp3, milliseconds(39));
1095  T_CHECK_TIMEOUT(start, end, milliseconds(39));
1096 }
1097 
1098 TEST(EventBaseTest, ScheduledFnAt) {
1099  EventBase eb;
1100 
1101  TimePoint timestamp0(false);
1102  TimePoint timestamp1(false);
1103  TimePoint timestamp2(false);
1104  TimePoint timestamp3(false);
1105  eb.scheduleAt(
1106  std::bind(&TimePoint::reset, &timestamp1), eb.now() - milliseconds(5));
1107  eb.scheduleAt(
1108  std::bind(&TimePoint::reset, &timestamp1), eb.now() + milliseconds(9));
1109  eb.scheduleAt(
1110  std::bind(&TimePoint::reset, &timestamp2), eb.now() + milliseconds(19));
1111  eb.scheduleAt(
1112  std::bind(&TimePoint::reset, &timestamp3), eb.now() + milliseconds(39));
1113 
1114  TimePoint start;
1115  eb.loop();
1116  TimePoint end;
1117 
1118  T_CHECK_TIME_LT(start, timestamp0, milliseconds(0));
1119  T_CHECK_TIMEOUT(start, timestamp1, milliseconds(9));
1120  T_CHECK_TIMEOUT(start, timestamp2, milliseconds(19));
1121  T_CHECK_TIMEOUT(start, timestamp3, milliseconds(39));
1122  T_CHECK_TIMEOUT(start, end, milliseconds(39));
1123 }
1124 
1126 // Test for runInThreadTestFunc()
1128 
1130  RunInThreadData(int numThreads, int opsPerThread_)
1131  : opsPerThread(opsPerThread_), opsToGo(numThreads * opsPerThread) {}
1132 
1134  deque<pair<int, int>> values;
1135 
1137  int opsToGo;
1138 };
1139 
1141  RunInThreadArg(RunInThreadData* data_, int threadId, int value_)
1142  : data(data_), thread(threadId), value(value_) {}
1143 
1145  int thread;
1146  int value;
1147 };
1148 
1150  arg->data->values.emplace_back(arg->thread, arg->value);
1151  RunInThreadData* data = arg->data;
1152  delete arg;
1153 
1154  if (--data->opsToGo == 0) {
1155  // Break out of the event base loop if we are the last thread running
1156  data->evb.terminateLoopSoon();
1157  }
1158 }
1159 
1160 TEST(EventBaseTest, RunInThread) {
1161  constexpr uint32_t numThreads = 50;
1162  constexpr uint32_t opsPerThread = 100;
1163  RunInThreadData data(numThreads, opsPerThread);
1164 
1165  deque<std::thread> threads;
1166  SCOPE_EXIT {
1167  // Wait on all of the threads.
1168  for (auto& thread : threads) {
1169  thread.join();
1170  }
1171  };
1172 
1173  for (uint32_t i = 0; i < numThreads; ++i) {
1174  threads.emplace_back([i, &data] {
1175  for (int n = 0; n < data.opsPerThread; ++n) {
1176  RunInThreadArg* arg = new RunInThreadArg(&data, i, n);
1178  usleep(10);
1179  }
1180  });
1181  }
1182 
1183  // Add a timeout event to run after 3 seconds.
1184  // Otherwise loop() will return immediately since there are no events to run.
1185  // Once the last thread exits, it will stop the loop(). However, this
1186  // timeout also stops the loop in case there is a bug performing the normal
1187  // stop.
1188  data.evb.tryRunAfterDelay(
1189  std::bind(&EventBase::terminateLoopSoon, &data.evb), 3000);
1190 
1191  TimePoint start;
1192  data.evb.loop();
1193  TimePoint end;
1194 
1195  // Verify that the loop exited because all threads finished and requested it
1196  // to stop. This should happen much sooner than the 3 second timeout.
1197  // Assert that it happens in under a second. (This is still tons of extra
1198  // padding.)
1199 
1200  auto timeTaken =
1201  std::chrono::duration_cast<milliseconds>(end.getTime() - start.getTime());
1202  ASSERT_LT(timeTaken.count(), 1000);
1203  VLOG(11) << "Time taken: " << timeTaken.count();
1204 
1205  // Verify that we have all of the events from every thread
1206  int expectedValues[numThreads];
1207  for (uint32_t n = 0; n < numThreads; ++n) {
1208  expectedValues[n] = 0;
1209  }
1210  for (deque<pair<int, int>>::const_iterator it = data.values.begin();
1211  it != data.values.end();
1212  ++it) {
1213  int threadID = it->first;
1214  int value = it->second;
1215  ASSERT_EQ(expectedValues[threadID], value);
1216  ++expectedValues[threadID];
1217  }
1218  for (uint32_t n = 0; n < numThreads; ++n) {
1219  ASSERT_EQ(expectedValues[n], opsPerThread);
1220  }
1221 }
1222 
1223 // This test simulates some calls, and verifies that the waiting happens by
1224 // triggering what otherwise would be race conditions, and trying to detect
1225 // whether any of the race conditions happened.
1226 TEST(EventBaseTest, RunInEventBaseThreadAndWait) {
1227  const size_t c = 256;
1228  vector<unique_ptr<atomic<size_t>>> atoms(c);
1229  for (size_t i = 0; i < c; ++i) {
1230  auto& atom = atoms.at(i);
1231  atom = std::make_unique<atomic<size_t>>(0);
1232  }
1233  vector<thread> threads;
1234  for (size_t i = 0; i < c; ++i) {
1235  threads.emplace_back([&atoms, i] {
1236  EventBase eb;
1237  auto& atom = *atoms.at(i);
1238  auto ebth = thread([&] { eb.loopForever(); });
1239  eb.waitUntilRunning();
1241  size_t x = 0;
1242  atom.compare_exchange_weak(
1243  x, 1, std::memory_order_release, std::memory_order_relaxed);
1244  });
1245  size_t x = 0;
1246  atom.compare_exchange_weak(
1247  x, 2, std::memory_order_release, std::memory_order_relaxed);
1248  eb.terminateLoopSoon();
1249  ebth.join();
1250  });
1251  }
1252  for (size_t i = 0; i < c; ++i) {
1253  auto& th = threads.at(i);
1254  th.join();
1255  }
1256  size_t sum = 0;
1257  for (auto& atom : atoms) {
1258  sum += *atom;
1259  }
1260  EXPECT_EQ(c, sum);
1261 }
1262 
1263 TEST(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadAndWaitCross) {
1264  EventBase eb;
1265  thread th(&EventBase::loopForever, &eb);
1266  SCOPE_EXIT {
1267  eb.terminateLoopSoon();
1268  th.join();
1269  };
1270  auto mutated = false;
1271  eb.runImmediatelyOrRunInEventBaseThreadAndWait([&] { mutated = true; });
1272  EXPECT_TRUE(mutated);
1273 }
1274 
1275 TEST(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadAndWaitWithin) {
1276  EventBase eb;
1277  thread th(&EventBase::loopForever, &eb);
1278  SCOPE_EXIT {
1279  eb.terminateLoopSoon();
1280  th.join();
1281  };
1283  auto mutated = false;
1284  eb.runImmediatelyOrRunInEventBaseThreadAndWait([&] { mutated = true; });
1285  EXPECT_TRUE(mutated);
1286  });
1287 }
1288 
1289 TEST(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadNotLooping) {
1290  EventBase eb;
1291  auto mutated = false;
1292  eb.runImmediatelyOrRunInEventBaseThreadAndWait([&] { mutated = true; });
1293  EXPECT_TRUE(mutated);
1294 }
1295 
1297 // Tests for runInLoop()
1299 
1301  public:
1303  EventBase* eventBase,
1304  unsigned int count,
1305  std::function<void()> action = std::function<void()>())
1306  : eventBase_(eventBase), count_(count), action_(action) {}
1307 
1308  void runLoopCallback() noexcept override {
1309  --count_;
1310  if (count_ > 0) {
1311  eventBase_->runInLoop(this);
1312  } else if (action_) {
1313  action_();
1314  }
1315  }
1316 
1317  unsigned int getCount() const {
1318  return count_;
1319  }
1320 
1321  private:
1322  EventBase* eventBase_;
1323  unsigned int count_;
1324  std::function<void()> action_;
1325 };
1326 
1327 // Test that EventBase::loop() doesn't exit while there are
1328 // still LoopCallbacks remaining to be invoked.
1329 TEST(EventBaseTest, RepeatedRunInLoop) {
1330  EventBase eventBase;
1331 
1332  CountedLoopCallback c(&eventBase, 10);
1333  eventBase.runInLoop(&c);
1334  // The callback shouldn't have run immediately
1335  ASSERT_EQ(c.getCount(), 10);
1336  eventBase.loop();
1337 
1338  // loop() should loop until the CountedLoopCallback stops
1339  // re-installing itself.
1340  ASSERT_EQ(c.getCount(), 0);
1341 }
1342 
1343 // Test that EventBase::loop() works as expected without time measurements.
1344 TEST(EventBaseTest, RunInLoopNoTimeMeasurement) {
1345  EventBase eventBase(false);
1346 
1347  CountedLoopCallback c(&eventBase, 10);
1348  eventBase.runInLoop(&c);
1349  // The callback shouldn't have run immediately
1350  ASSERT_EQ(c.getCount(), 10);
1351  eventBase.loop();
1352 
1353  // loop() should loop until the CountedLoopCallback stops
1354  // re-installing itself.
1355  ASSERT_EQ(c.getCount(), 0);
1356 }
1357 
1358 // Test runInLoop() calls with terminateLoopSoon()
1359 TEST(EventBaseTest, RunInLoopStopLoop) {
1360  EventBase eventBase;
1361 
1362  CountedLoopCallback c1(&eventBase, 20);
1364  &eventBase, 10, std::bind(&EventBase::terminateLoopSoon, &eventBase));
1365 
1366  eventBase.runInLoop(&c1);
1367  eventBase.runInLoop(&c2);
1368  ASSERT_EQ(c1.getCount(), 20);
1369  ASSERT_EQ(c2.getCount(), 10);
1370 
1371  eventBase.loopForever();
1372 
1373  // c2 should have stopped the loop after 10 iterations
1374  ASSERT_EQ(c2.getCount(), 0);
1375 
1376  // We allow the EventBase to run the loop callbacks in whatever order it
1377  // chooses. We'll accept c1's count being either 10 (if the loop terminated
1378  // after c1 ran on the 10th iteration) or 11 (if c2 terminated the loop
1379  // before c1 ran).
1380  //
1381  // (With the current code, c1 will always run 10 times, but we don't consider
1382  // this a hard API requirement.)
1383  ASSERT_GE(c1.getCount(), 10);
1384  ASSERT_LE(c1.getCount(), 11);
1385 }
1386 
1387 TEST(EventBaseTest, messageAvailableException) {
1388  auto deadManWalking = [] {
1389  EventBase eventBase;
1390  std::thread t([&] {
1391  // Call this from another thread to force use of NotificationQueue in
1392  // runInEventBaseThread
1393  eventBase.runInEventBaseThread(
1394  []() { throw std::runtime_error("boom"); });
1395  });
1396  t.join();
1397  eventBase.loopForever();
1398  };
1399  EXPECT_DEATH(deadManWalking(), ".*");
1400 }
1401 
1402 TEST(EventBaseTest, TryRunningAfterTerminate) {
1403  EventBase eventBase;
1405  &eventBase, 1, std::bind(&EventBase::terminateLoopSoon, &eventBase));
1406  eventBase.runInLoop(&c1);
1407  eventBase.loopForever();
1408  bool ran = false;
1409  eventBase.runInEventBaseThread([&]() { ran = true; });
1410 
1411  ASSERT_FALSE(ran);
1412 }
1413 
1414 // Test cancelling runInLoop() callbacks
1415 TEST(EventBaseTest, CancelRunInLoop) {
1416  EventBase eventBase;
1417 
1418  CountedLoopCallback c1(&eventBase, 20);
1419  CountedLoopCallback c2(&eventBase, 20);
1420  CountedLoopCallback c3(&eventBase, 20);
1421 
1422  std::function<void()> cancelC1Action =
1423  std::bind(&EventBase::LoopCallback::cancelLoopCallback, &c1);
1424  std::function<void()> cancelC2Action =
1425  std::bind(&EventBase::LoopCallback::cancelLoopCallback, &c2);
1426 
1427  CountedLoopCallback cancelC1(&eventBase, 10, cancelC1Action);
1428  CountedLoopCallback cancelC2(&eventBase, 10, cancelC2Action);
1429 
1430  // Install cancelC1 after c1
1431  eventBase.runInLoop(&c1);
1432  eventBase.runInLoop(&cancelC1);
1433 
1434  // Install cancelC2 before c2
1435  eventBase.runInLoop(&cancelC2);
1436  eventBase.runInLoop(&c2);
1437 
1438  // Install c3
1439  eventBase.runInLoop(&c3);
1440 
1441  ASSERT_EQ(c1.getCount(), 20);
1442  ASSERT_EQ(c2.getCount(), 20);
1443  ASSERT_EQ(c3.getCount(), 20);
1444  ASSERT_EQ(cancelC1.getCount(), 10);
1445  ASSERT_EQ(cancelC2.getCount(), 10);
1446 
1447  // Run the loop
1448  eventBase.loop();
1449 
1450  // cancelC1 and cancelC2 should have both fired after 10 iterations and
1451  // stopped re-installing themselves
1452  ASSERT_EQ(cancelC1.getCount(), 0);
1453  ASSERT_EQ(cancelC2.getCount(), 0);
1454  // c3 should have continued on for the full 20 iterations
1455  ASSERT_EQ(c3.getCount(), 0);
1456 
1457  // c1 and c2 should have both been cancelled on the 10th iteration.
1458  //
1459  // Callbacks are always run in the order they are installed,
1460  // so c1 should have fired 10 times, and been canceled after it ran on the
1461  // 10th iteration. c2 should have only fired 9 times, because cancelC2 will
1462  // have run before it on the 10th iteration, and cancelled it before it
1463  // fired.
1464  ASSERT_EQ(c1.getCount(), 10);
1465  ASSERT_EQ(c2.getCount(), 11);
1466 }
1467 
1469  public EventHandler {
1470  public:
1471  TerminateTestCallback(EventBase* eventBase, int fd)
1472  : EventHandler(eventBase, fd),
1473  eventBase_(eventBase),
1474  loopInvocations_(0),
1475  maxLoopInvocations_(0),
1476  eventInvocations_(0),
1477  maxEventInvocations_(0) {}
1478 
1479  void reset(uint32_t maxLoopInvocations, uint32_t maxEventInvocations) {
1480  loopInvocations_ = 0;
1481  maxLoopInvocations_ = maxLoopInvocations;
1482  eventInvocations_ = 0;
1483  maxEventInvocations_ = maxEventInvocations;
1484 
1485  cancelLoopCallback();
1486  unregisterHandler();
1487  }
1488 
1489  void handlerReady(uint16_t /* events */) noexcept override {
1490  // We didn't register with PERSIST, so we will have been automatically
1491  // unregistered already.
1492  ASSERT_FALSE(isHandlerRegistered());
1493 
1494  ++eventInvocations_;
1495  if (eventInvocations_ >= maxEventInvocations_) {
1496  return;
1497  }
1498 
1499  eventBase_->runInLoop(this);
1500  }
1501  void runLoopCallback() noexcept override {
1502  ++loopInvocations_;
1503  if (loopInvocations_ >= maxLoopInvocations_) {
1504  return;
1505  }
1506 
1507  registerHandler(READ);
1508  }
1509 
1511  return loopInvocations_;
1512  }
1514  return eventInvocations_;
1515  }
1516 
1517  private:
1523 };
1524 
1534 TEST(EventBaseTest, LoopTermination) {
1535  EventBase eventBase;
1536 
1537  // Open a pipe and close the write end,
1538  // so the read endpoint will be readable
1539  int pipeFds[2];
1540  int rc = pipe(pipeFds);
1541  ASSERT_EQ(rc, 0);
1542  close(pipeFds[1]);
1543  TerminateTestCallback callback(&eventBase, pipeFds[0]);
1544 
1545  // Test once where the callback will exit after a loop callback
1546  callback.reset(10, 100);
1547  eventBase.runInLoop(&callback);
1548  eventBase.loop();
1549  ASSERT_EQ(callback.getLoopInvocations(), 10);
1550  ASSERT_EQ(callback.getEventInvocations(), 9);
1551 
1552  // Test once where the callback will exit after an fd event callback
1553  callback.reset(100, 7);
1554  eventBase.runInLoop(&callback);
1555  eventBase.loop();
1556  ASSERT_EQ(callback.getLoopInvocations(), 7);
1557  ASSERT_EQ(callback.getEventInvocations(), 7);
1558 
1559  close(pipeFds[0]);
1560 }
1561 
1563 // Tests for latency calculations
1565 
1567  public:
1569  EventBase* base,
1570  std::deque<std::size_t>& timeout)
1571  : AsyncTimeout(base), timeouts_(0), timeout_(timeout) {
1572  scheduleTimeout(1);
1573  }
1574 
1576 
1577  void timeoutExpired() noexcept override {
1578  ++timeouts_;
1579 
1580  if (timeout_.empty()) {
1581  cancelTimeout();
1582  } else {
1583  std::size_t sleepTime = timeout_.front();
1584  timeout_.pop_front();
1585  if (sleepTime) {
1586  usleep(sleepTime);
1587  }
1588  scheduleTimeout(1);
1589  }
1590  }
1591 
1592  int getTimeouts() const {
1593  return timeouts_;
1594  }
1595 
1596  private:
1598  std::deque<std::size_t>& timeout_;
1599 };
1600 
1610 TEST(EventBaseTest, IdleTime) {
1611  EventBase eventBase;
1612  std::deque<std::size_t> timeouts0(4, 8080);
1613  timeouts0.push_front(8000);
1614  timeouts0.push_back(14000);
1615  IdleTimeTimeoutSeries tos0(&eventBase, timeouts0);
1616  std::deque<std::size_t> timeouts(20, 20);
1617  std::unique_ptr<IdleTimeTimeoutSeries> tos;
1618  bool hostOverloaded = false;
1619 
1620  // Loop once before starting the main test. This will run NotificationQueue
1621  // callbacks that get automatically installed when the EventBase is first
1622  // created. We want to make sure they don't interfere with the timing
1623  // operations below.
1624  eventBase.loopOnce(EVLOOP_NONBLOCK);
1625  eventBase.setLoadAvgMsec(1000ms);
1626  eventBase.resetLoadAvg(5900.0);
1627  auto testStart = std::chrono::steady_clock::now();
1628 
1629  int latencyCallbacks = 0;
1630  eventBase.setMaxLatency(6000us, [&]() {
1631  ++latencyCallbacks;
1632  if (latencyCallbacks != 1) {
1633  FAIL() << "Unexpected latency callback";
1634  }
1635 
1636  if (tos0.getTimeouts() < 6) {
1637  // This could only happen if the host this test is running
1638  // on is heavily loaded.
1639  int64_t usElapsed = duration_cast<microseconds>(
1640  std::chrono::steady_clock::now() - testStart)
1641  .count();
1642  EXPECT_LE(43800, usElapsed);
1643  hostOverloaded = true;
1644  return;
1645  }
1646  EXPECT_EQ(6, tos0.getTimeouts());
1647  EXPECT_GE(6100, eventBase.getAvgLoopTime() - 1200);
1648  EXPECT_LE(6100, eventBase.getAvgLoopTime() + 1200);
1649  tos = std::make_unique<IdleTimeTimeoutSeries>(&eventBase, timeouts);
1650  });
1651 
1652  // Kick things off with an "immediate" timeout
1653  tos0.scheduleTimeout(1);
1654 
1655  eventBase.loop();
1656 
1657  if (hostOverloaded) {
1658  SKIP() << "host too heavily loaded to execute test";
1659  }
1660 
1661  ASSERT_EQ(1, latencyCallbacks);
1662  ASSERT_EQ(7, tos0.getTimeouts());
1663  ASSERT_GE(5900, eventBase.getAvgLoopTime() - 1200);
1664  ASSERT_LE(5900, eventBase.getAvgLoopTime() + 1200);
1665  ASSERT_TRUE(!!tos);
1666  ASSERT_EQ(21, tos->getTimeouts());
1667 }
1668 
1672 TEST(EventBaseTest, ThisLoop) {
1673  EventBase eb;
1674  bool runInLoop = false;
1675  bool runThisLoop = false;
1676 
1677  eb.runInLoop(
1678  [&]() {
1679  eb.terminateLoopSoon();
1680  eb.runInLoop([&]() { runInLoop = true; });
1681  eb.runInLoop([&]() { runThisLoop = true; }, true);
1682  },
1683  true);
1684  eb.loopForever();
1685 
1686  // Should not work
1687  ASSERT_FALSE(runInLoop);
1688  // Should work with thisLoop
1689  ASSERT_TRUE(runThisLoop);
1690 }
1691 
1692 TEST(EventBaseTest, EventBaseThreadLoop) {
1693  EventBase base;
1694  bool ran = false;
1695 
1696  base.runInEventBaseThread([&]() { ran = true; });
1697  base.loop();
1698 
1699  ASSERT_TRUE(ran);
1700 }
1701 
1702 TEST(EventBaseTest, EventBaseThreadName) {
1703  EventBase base;
1704  base.setName("foo");
1705  base.loop();
1706 
1707 #if (__GLIBC__ >= 2) && (__GLIBC_MINOR__ >= 12)
1708  char name[16];
1709  pthread_getname_np(pthread_self(), name, 16);
1710  ASSERT_EQ(0, strcmp("foo", name));
1711 #endif
1712 }
1713 
1714 TEST(EventBaseTest, RunBeforeLoop) {
1715  EventBase base;
1716  CountedLoopCallback cb(&base, 1, [&]() { base.terminateLoopSoon(); });
1717  base.runBeforeLoop(&cb);
1718  base.loopForever();
1719  ASSERT_EQ(cb.getCount(), 0);
1720 }
1721 
1722 TEST(EventBaseTest, RunBeforeLoopWait) {
1723  EventBase base;
1724  CountedLoopCallback cb(&base, 1);
1725  base.tryRunAfterDelay([&]() { base.terminateLoopSoon(); }, 500);
1726  base.runBeforeLoop(&cb);
1727  base.loopForever();
1728 
1729  // Check that we only ran once, and did not loop multiple times.
1730  ASSERT_EQ(cb.getCount(), 0);
1731 }
1732 
1733 class PipeHandler : public EventHandler {
1734  public:
1735  PipeHandler(EventBase* eventBase, int fd) : EventHandler(eventBase, fd) {}
1736 
1737  void handlerReady(uint16_t /* events */) noexcept override {
1738  abort();
1739  }
1740 };
1741 
1742 TEST(EventBaseTest, StopBeforeLoop) {
1743  EventBase evb;
1744 
1745  // Give the evb something to do.
1746  int p[2];
1747  ASSERT_EQ(0, pipe(p));
1748  PipeHandler handler(&evb, p[0]);
1749  handler.registerHandler(EventHandler::READ);
1750 
1751  // It's definitely not running yet
1752  evb.terminateLoopSoon();
1753 
1754  // let it run, it should exit quickly.
1755  std::thread t([&] { evb.loop(); });
1756  t.join();
1757 
1758  handler.unregisterHandler();
1759  close(p[0]);
1760  close(p[1]);
1761 
1762  SUCCEED();
1763 }
1764 
1765 TEST(EventBaseTest, RunCallbacksOnDestruction) {
1766  bool ran = false;
1767 
1768  {
1769  EventBase base;
1770  base.runInEventBaseThread([&]() { ran = true; });
1771  }
1772 
1773  ASSERT_TRUE(ran);
1774 }
1775 
1776 TEST(EventBaseTest, LoopKeepAlive) {
1777  EventBase evb;
1778 
1779  bool done = false;
1780  std::thread t([&, loopKeepAlive = getKeepAliveToken(evb)]() mutable {
1781  /* sleep override */ std::this_thread::sleep_for(
1782  std::chrono::milliseconds(100));
1784  [&done, loopKeepAlive = std::move(loopKeepAlive)] { done = true; });
1785  });
1786 
1787  evb.loop();
1788 
1789  ASSERT_TRUE(done);
1790 
1791  t.join();
1792 }
1793 
1794 TEST(EventBaseTest, LoopKeepAliveInLoop) {
1795  EventBase evb;
1796 
1797  bool done = false;
1798  std::thread t;
1799 
1800  evb.runInEventBaseThread([&] {
1801  t = std::thread([&, loopKeepAlive = getKeepAliveToken(evb)]() mutable {
1802  /* sleep override */ std::this_thread::sleep_for(
1803  std::chrono::milliseconds(100));
1805  [&done, loopKeepAlive = std::move(loopKeepAlive)] { done = true; });
1806  });
1807  });
1808 
1809  evb.loop();
1810 
1811  ASSERT_TRUE(done);
1812 
1813  t.join();
1814 }
1815 
1816 TEST(EventBaseTest, LoopKeepAliveWithLoopForever) {
1817  std::unique_ptr<EventBase> evb = std::make_unique<EventBase>();
1818 
1819  bool done = false;
1820 
1821  std::thread evThread([&] {
1822  evb->loopForever();
1823  evb.reset();
1824  done = true;
1825  });
1826 
1827  {
1828  auto* ev = evb.get();
1830  ev->runInEventBaseThreadAndWait(
1831  [&ev, &keepAlive] { keepAlive = getKeepAliveToken(ev); });
1832  ASSERT_FALSE(done) << "Loop finished before we asked it to";
1833  ev->terminateLoopSoon();
1834  /* sleep override */
1835  std::this_thread::sleep_for(std::chrono::milliseconds(30));
1836  ASSERT_FALSE(done) << "Loop terminated early";
1837  ev->runInEventBaseThread([keepAlive = std::move(keepAlive)] {});
1838  }
1839 
1840  evThread.join();
1841  ASSERT_TRUE(done);
1842 }
1843 
1844 TEST(EventBaseTest, LoopKeepAliveShutdown) {
1845  auto evb = std::make_unique<EventBase>();
1846 
1847  bool done = false;
1848 
1849  std::thread t([&done,
1850  loopKeepAlive = getKeepAliveToken(evb.get()),
1851  evbPtr = evb.get()]() mutable {
1852  /* sleep override */ std::this_thread::sleep_for(
1853  std::chrono::milliseconds(100));
1854  evbPtr->runInEventBaseThread(
1855  [&done, loopKeepAlive = std::move(loopKeepAlive)] { done = true; });
1856  });
1857 
1858  evb.reset();
1859 
1860  ASSERT_TRUE(done);
1861 
1862  t.join();
1863 }
1864 
1865 TEST(EventBaseTest, LoopKeepAliveAtomic) {
1866  auto evb = std::make_unique<EventBase>();
1867 
1868  static constexpr size_t kNumThreads = 100;
1869  static constexpr size_t kNumTasks = 100;
1870 
1871  std::vector<std::thread> ts;
1872  std::vector<std::unique_ptr<Baton<>>> batons;
1873  size_t done{0};
1874 
1875  for (size_t i = 0; i < kNumThreads; ++i) {
1876  batons.emplace_back(std::make_unique<Baton<>>());
1877  }
1878 
1879  for (size_t i = 0; i < kNumThreads; ++i) {
1880  ts.emplace_back([evbPtr = evb.get(), batonPtr = batons[i].get(), &done] {
1881  std::vector<Executor::KeepAlive<EventBase>> keepAlives;
1882  for (size_t j = 0; j < kNumTasks; ++j) {
1883  keepAlives.emplace_back(getKeepAliveToken(evbPtr));
1884  }
1885 
1886  batonPtr->post();
1887 
1888  /* sleep override */ std::this_thread::sleep_for(std::chrono::seconds(1));
1889 
1890  for (auto& keepAlive : keepAlives) {
1891  evbPtr->runInEventBaseThread(
1892  [&done, keepAlive = std::move(keepAlive)]() { ++done; });
1893  }
1894  });
1895  }
1896 
1897  for (auto& baton : batons) {
1898  baton->wait();
1899  }
1900 
1901  evb.reset();
1902 
1903  EXPECT_EQ(kNumThreads * kNumTasks, done);
1904 
1905  for (auto& t : ts) {
1906  t.join();
1907  }
1908 }
1909 
1910 TEST(EventBaseTest, LoopKeepAliveCast) {
1911  EventBase evb;
1912  Executor::KeepAlive<> keepAlive = getKeepAliveToken(evb);
1913 }
1914 
1915 TEST(EventBaseTest, DrivableExecutorTest) {
1917  auto f = p.getFuture();
1918  EventBase base;
1919  bool finished = false;
1920 
1921  std::thread t([&] {
1922  /* sleep override */
1923  std::this_thread::sleep_for(std::chrono::microseconds(10));
1924  finished = true;
1925  base.runInEventBaseThread([&]() { p.setValue(true); });
1926  });
1927 
1928  // Ensure drive does not busy wait
1929  base.drive(); // TODO: fix notification queue init() extra wakeup
1930  base.drive();
1931  EXPECT_TRUE(finished);
1932 
1934  auto f2 = p2.getFuture();
1935  // Ensure waitVia gets woken up properly, even from
1936  // a separate thread.
1937  base.runAfterDelay([&]() { p2.setValue(true); }, 10);
1938  f2.waitVia(&base);
1939  EXPECT_TRUE(f2.isReady());
1940 
1941  t.join();
1942 }
1943 
1944 TEST(EventBaseTest, IOExecutorTest) {
1945  EventBase base;
1946 
1947  // Ensure EventBase manages itself as an IOExecutor.
1948  EXPECT_EQ(base.getEventBase(), &base);
1949 }
1950 
1951 TEST(EventBaseTest, RequestContextTest) {
1952  EventBase evb;
1953  auto defaultCtx = RequestContext::get();
1954  std::weak_ptr<RequestContext> rctx_weak_ptr;
1955 
1956  {
1958  rctx_weak_ptr = RequestContext::saveContext();
1959  auto context = RequestContext::get();
1960  EXPECT_NE(defaultCtx, context);
1962  evb.loop();
1963  }
1964 
1965  // Ensure that RequestContext created for the scope has been released and
1966  // deleted.
1967  EXPECT_EQ(rctx_weak_ptr.expired(), true);
1968 
1969  EXPECT_EQ(defaultCtx, RequestContext::get());
1970 }
1971 
1972 TEST(EventBaseTest, CancelLoopCallbackRequestContextTest) {
1973  EventBase evb;
1974  CountedLoopCallback c(&evb, 1);
1975 
1976  auto defaultCtx = RequestContext::get();
1977  EXPECT_EQ(defaultCtx, RequestContext::get());
1978  std::weak_ptr<RequestContext> rctx_weak_ptr;
1979 
1980  {
1982  rctx_weak_ptr = RequestContext::saveContext();
1983  auto context = RequestContext::get();
1984  EXPECT_NE(defaultCtx, context);
1985  evb.runInLoop(&c);
1986  c.cancelLoopCallback();
1987  }
1988 
1989  // Ensure that RequestContext created for the scope has been released and
1990  // deleted.
1991  EXPECT_EQ(rctx_weak_ptr.expired(), true);
1992 
1993  EXPECT_EQ(defaultCtx, RequestContext::get());
1994 }
#define EXPECT_LE(val1, val2)
Definition: gtest.h:1928
#define ASSERT_GE(val1, val2)
Definition: gtest.h:1972
Definition: InvokeTest.cpp:58
std::chrono::steady_clock::time_point getTime() const
Definition: TimeUtil.h:50
#define ASSERT_GT(val1, val2)
Definition: gtest.h:1976
uint32_t getLoopInvocations() const
*than *hazptr_holder h
Definition: Hazptr.h:116
auto f
std::atomic< int64_t > sum(0)
void runInThreadTestFunc(RunInThreadArg *arg)
#define FAIL()
Definition: gtest.h:1822
std::deque< std::size_t > & timeout_
void write(const T &in, folly::io::Appender &appender)
Definition: Types-inl.h:112
RunInThreadData(int numThreads, int opsPerThread_)
#define ASSERT_EQ(val1, val2)
Definition: gtest.h:1956
~IdleTimeTimeoutSeries() override
unsigned int getCount() const
#define ASSERT_LT(val1, val2)
Definition: gtest.h:1968
bool runInEventBaseThreadAndWait(void(*fn)(T *), T *arg)
Definition: EventBase.h:799
void handlerReady(uint16_t events) noexceptoverride
uint32_t getEventInvocations() const
#define EXPECT_EQ(val1, val2)
Definition: gtest.h:1922
context
Definition: CMakeCache.txt:563
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
std::chrono::steady_clock::time_point now()
bool runImmediatelyOrRunInEventBaseThreadAndWait(void(*fn)(T *), T *arg)
Definition: EventBase.h:804
bool tryRunAfterDelay(Func cob, uint32_t milliseconds, InternalEnum internal=InternalEnum::NORMAL)
void timeoutExpired() noexceptoverride
#define ASSERT_LE(val1, val2)
Definition: gtest.h:1964
double getAvgLoopTime() const
Definition: EventBase.h:479
auto begin(TestAdlIterable &instance)
Definition: ForeachTest.cpp:56
#define T_CHECK_TIME_LT(start, end, expectedMS,...)
Definition: Util.h:50
vector< uint32_t > timeouts_
#define SCOPE_EXIT
Definition: ScopeGuard.h:274
static size_t const kNumThreads
IdleTimeTimeoutSeries(EventBase *base, std::deque< std::size_t > &timeout)
void handlerReady(uint16_t events) noexceptoverride
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
PartialWriteHandler(EventBase *eventBase, int fd, size_t writeLength)
CountedLoopCallback(EventBase *eventBase, unsigned int count, std::function< void()> action=std::function< void()>())
requires E e noexcept(noexcept(s.error(std::move(e))))
void schedule(Func &&a)
Alias for add() (for Rx consistency)
#define SKIP()
Definition: TestUtils.h:55
#define EXPECT_GE(val1, val2)
Definition: gtest.h:1932
void scheduleEvents(EventBase *eventBase, int fd, ScheduledEvent *events)
vector< uint32_t >::const_iterator iterator_
void scheduleAt(Func &&fn, TimePoint const &timeout) override
Definition: EventBase.cpp:754
void setMaxLatency(std::chrono::microseconds maxLatency, Func maxLatencyCob)
Definition: EventBase.h:459
vector< TimePoint > timestamps
size_t writeUntilFull(int fd)
PipeHandler(EventBase *eventBase, int fd)
void handlerReady(uint16_t) noexceptoverride
std::vector< std::thread::id > threads
ReschedulingTimeout(EventBase *evb, const vector< uint32_t > &timeouts)
void handler(int, siginfo_t *, void *)
const char * name
Definition: http_parser.c:437
virtual TimePoint now()
Get this executor&#39;s notion of time. Must be threadsafe.
ssize_t writeToFD(int fd, size_t length)
void runInLoop(LoopCallback *callback, bool thisIteration=false)
Definition: EventBase.cpp:520
#define SUCCEED()
Definition: gtest.h:1831
bool loopOnce(int flags=0)
Definition: EventBase.cpp:271
TEST(EventBaseTest, ReadEvent)
size_t read(T &out, folly::io::Cursor &cursor)
Definition: Types-inl.h:258
RunInThreadData * data
auto end(TestAdlIterable &instance)
Definition: ForeachTest.cpp:62
void terminateLoopSoon()
Definition: EventBase.cpp:493
EventBase * getEventBase() override
Implements the IOExecutor interface.
Definition: EventBase.cpp:776
TestHandler(EventBase *eventBase, int fd)
bool runInEventBaseThread(void(*fn)(T *), T *arg)
Definition: EventBase.h:794
Future< T > getFuture()
Definition: Promise-inl.h:97
void timeoutExpired() noexceptoverride
static const char *const value
Definition: Conv.cpp:50
void perform(int fd)
std::enable_if<!std::is_array< T >::value, std::unique_ptr< T > >::type make_unique(Args &&...args)
Definition: Memory.h:259
auto start
void runLoopCallback() noexceptoverride
void reset(uint32_t maxLoopInvocations, uint32_t maxEventInvocations)
int * count
size_t readUntilEmpty(int fd)
#define EXPECT_TRUE(condition)
Definition: gtest.h:1859
std::enable_if< std::is_same< Unit, B >::value, void >::type setValue()
Definition: Promise.h:326
void setLoadAvgMsec(std::chrono::milliseconds ms)
Definition: EventBase.cpp:224
RunInThreadArg(RunInThreadData *data_, int threadId, int value_)
deque< pair< int, int > > values
int bind(NetworkSocket s, const sockaddr *name, socklen_t namelen)
Definition: NetOps.cpp:76
TerminateTestCallback(EventBase *eventBase, int fd)
void handlerReady(uint16_t events) noexceptoverride
#define EXPECT_NE(val1, val2)
Definition: gtest.h:1926
void setName(const std::string &name)
Definition: EventBase.cpp:740
bool scheduleTimeout(uint32_t milliseconds)
void runAfterDelay(Func cob, uint32_t milliseconds, InternalEnum internal=InternalEnum::NORMAL)
void runLoopCallback() noexceptoverride
PartialReadHandler(EventBase *eventBase, int fd, size_t readLength)
void waitUntilRunning()
Definition: EventBase.cpp:249
Executor::KeepAlive< ExecutorT > getKeepAliveToken(ExecutorT *executor)
Definition: Executor.h:200
#define ASSERT_FALSE(condition)
Definition: gtest.h:1868
ssize_t readFromFD(int fd, size_t length)
TestTimeout(EventBase *eventBase)
void handlerReady(uint16_t) noexceptoverride
PUSHMI_INLINE_VAR constexpr detail::get_fn< T > get
Definition: submit.h:391
bool registerHandler(uint16_t events)
Definition: EventHandler.h:100
char c
#define ASSERT_TRUE(condition)
Definition: gtest.h:1865
void runBeforeLoop(LoopCallback *callback)
Definition: EventBase.cpp:548
std::function< void()> action_
TimePoint timestamp
int close(NetworkSocket s)
Definition: NetOps.cpp:90
static constexpr uint64_t data[1]
Definition: Fingerprint.cpp:43
StringPiece data_
deque< EventRecord > log
void drive() override
Implements the DrivableExecutor interface.
Definition: EventBase.h:631
#define T_CHECK_TIMEOUT(start, end, expectedMS,...)
Definition: Util.h:38
void pipe(CPUExecutor cpu, IOExecutor io)
void checkReadUntilEmpty(int fd, size_t expectedLength)
bool isUnset() const
Definition: TimeUtil.h:44
void resetLoadAvg(double value=0.0)
Definition: EventBase.cpp:235
static unordered_set< string > us
EventRecord(uint16_t events_, size_t bytesRead_, size_t bytesWritten_)
void timeoutExpired() noexceptoverride
action
Definition: upload.py:393