41 using std::unique_ptr;
43 using std::chrono::duration_cast;
44 using std::chrono::microseconds;
45 using std::chrono::milliseconds;
49 using namespace folly;
59 auto bufv = vector<char>(length);
60 auto buf = bufv.data();
61 memset(buf,
'a', length);
62 ssize_t rc =
write(fd, buf, length);
69 size_t bytesWritten = 0;
71 memset(buf,
'a',
sizeof(buf));
73 ssize_t rc =
write(fd, buf,
sizeof(buf));
75 CHECK_EQ(errno, EAGAIN);
86 auto buf = vector<char>(length);
87 return read(fd, buf.data(), length);
95 int rc =
read(fd, buf,
sizeof(buf));
97 CHECK(
false) <<
"unexpected EOF";
99 CHECK_EQ(errno, EAGAIN);
119 if (events & EventHandler::READ) {
126 if (events & EventHandler::WRITE) {
149 ssize_t bytesRead = 0;
150 ssize_t bytesWritten = 0;
156 if (events & WRITE) {
162 log.emplace_back(events, bytesRead, bytesWritten);
169 bytesRead(bytesRead_),
170 bytesWritten(bytesWritten_) {}
187 TEST(EventBaseTest, ReadEvent) {
197 {10, EventHandler::WRITE, 2345, 0},
198 {160, EventHandler::WRITE, 99, 0},
215 handler.
log[0].timestamp,
216 milliseconds(events[0].milliseconds),
221 start, end, milliseconds(events[1].milliseconds), milliseconds(30));
225 ASSERT_EQ(bytesRemaining, events[1].length);
231 TEST(EventBaseTest, ReadPersist) {
241 {10, EventHandler::WRITE, 1024, 0},
242 {20, EventHandler::WRITE, 2211, 0},
243 {30, EventHandler::WRITE, 4096, 0},
244 {100, EventHandler::WRITE, 100, 0},
260 for (
int n = 0; n < 3; ++n) {
263 start, handler.
log[n].timestamp, milliseconds(events[n].milliseconds));
271 ASSERT_EQ(bytesRemaining, events[3].length);
277 TEST(EventBaseTest, ReadImmediate) {
283 size_t dataLength = 1234;
292 {10, EventHandler::WRITE, 2345, 0},
316 start, handler.
log[1].timestamp, milliseconds(events[0].milliseconds));
326 TEST(EventBaseTest, WriteEvent) {
339 {10, EventHandler::READ, 0, 0},
340 {60, EventHandler::READ, 0, 0},
355 start, handler.
log[0].timestamp, milliseconds(events[0].milliseconds));
360 ASSERT_EQ(events[0].result, initialBytesWritten);
361 ASSERT_EQ(events[1].result, handler.
log[0].bytesWritten);
367 TEST(EventBaseTest, WritePersist) {
380 {10, EventHandler::READ, 0, 0},
381 {40, EventHandler::READ, 0, 0},
382 {70, EventHandler::READ, 0, 0},
383 {100, EventHandler::READ, 0, 0},
399 ASSERT_EQ(events[0].result, initialBytesWritten);
400 for (
int n = 0; n < 3; ++n) {
403 start, handler.
log[n].timestamp, milliseconds(events[n].milliseconds));
414 TEST(EventBaseTest, WriteImmediate) {
424 {10, EventHandler::READ, 0, 0},
430 int64_t unregisterTimeout = 40;
451 start, handler.
log[1].timestamp, milliseconds(events[0].milliseconds));
461 TEST(EventBaseTest, ReadWrite) {
474 {10, EventHandler::WRITE, 2345, 0},
475 {40, EventHandler::READ, 0, 0},
491 start, handler.
log[0].timestamp, milliseconds(events[0].milliseconds));
494 ASSERT_EQ(events[1].result, sock0WriteLength);
501 TEST(EventBaseTest, WriteRead) {
513 size_t sock1WriteLength = 2345;
515 {10, EventHandler::READ, 0, 0},
516 {40, EventHandler::WRITE, sock1WriteLength, 0},
532 start, handler.
log[0].timestamp, milliseconds(events[0].milliseconds));
535 ASSERT_EQ(events[0].result, sock0WriteLength);
536 ASSERT_EQ(events[1].result, sock1WriteLength);
541 ASSERT_EQ(bytesRemaining, events[1].length);
548 TEST(EventBaseTest, ReadWriteSimultaneous) {
561 {10, EventHandler::READ | EventHandler::WRITE, 0, 0},
577 ASSERT_EQ(handler.
log[0].events, EventHandler::READ | EventHandler::WRITE);
579 start, handler.
log[0].timestamp, milliseconds(events[0].milliseconds));
588 TEST(EventBaseTest, ReadWritePersist) {
595 EventHandler::READ | EventHandler::WRITE | EventHandler::PERSIST);
599 {10, EventHandler::WRITE, 2345, 0},
600 {20, EventHandler::READ, 0, 0},
601 {35, EventHandler::WRITE, 200, 0},
602 {45, EventHandler::WRITE, 15, 0},
603 {55, EventHandler::READ, 0, 0},
604 {120, EventHandler::WRITE, 2345, 0},
627 for (
int n = 1; n < 6; ++n) {
630 start, handler.
log[n].timestamp, milliseconds(event->milliseconds));
631 if (event->events == EventHandler::READ) {
645 ASSERT_EQ(bytesRemaining, events[5].length);
651 :
TestHandler(eventBase, fd), fd_(fd), readLength_(readLength) {}
654 assert(events == EventHandler::READ);
655 ssize_t bytesRead =
readFromFD(fd_, readLength_);
656 log.emplace_back(events, bytesRead, 0);
669 TEST(EventBaseTest, ReadPartial) {
674 size_t readLength = 100;
681 {10, EventHandler::WRITE, (3 * readLength) + (readLength / 2), 0},
697 for (
int n = 0; n < 3; ++n) {
700 start, handler.
log[n].timestamp, milliseconds(events[0].milliseconds));
707 start, handler.
log[3].timestamp, milliseconds(events[0].milliseconds));
715 :
TestHandler(eventBase, fd), fd_(fd), writeLength_(writeLength) {}
718 assert(events == EventHandler::WRITE);
719 ssize_t bytesWritten =
writeToFD(fd_, writeLength_);
720 log.emplace_back(events, 0, bytesWritten);
733 TEST(EventBaseTest, WritePartial) {
741 size_t writeLength = 100;
747 {10, EventHandler::READ, 0, 0},
764 ASSERT_EQ(events[0].result, initialBytesWritten);
767 for (
int n = 0; n < numChecked; ++n) {
770 start, handler.
log[n].timestamp, milliseconds(events[0].milliseconds));
779 TEST(EventBaseTest, DestroyHandler) {
785 void timeoutExpired()
noexcept override {
810 DestroyHandler dh(&eb, handler);
811 dh.scheduleTimeout(25);
831 TEST(EventBaseTest, RunAfterDelay) {
855 TEST(EventBaseTest, RunAfterDelayDestruction) {
905 TEST(EventBaseTest, BasicTimeouts) {
935 timestamps.emplace_back();
940 if (iterator_ != timeouts_.end()) {
943 scheduleTimeout(timeout);
957 TEST(EventBaseTest, ReuseTimeout) {
960 vector<uint32_t> timeouts;
961 timeouts.push_back(10);
962 timeouts.push_back(30);
963 timeouts.push_back(15);
975 milliseconds tolerance{6};
979 for (
size_t n = 0; n < timeouts.size(); ++n) {
980 total += timeouts[n];
989 TEST(EventBaseTest, RescheduleTimeout) {
1001 &AsyncTimeout::scheduleTimeout);
1021 TEST(EventBaseTest, CancelTimeout) {
1024 vector<uint32_t> timeouts;
1025 timeouts.push_back(10);
1026 timeouts.push_back(30);
1027 timeouts.push_back(25);
1046 TEST(EventBaseTest, DestroyTimeout) {
1052 void timeoutExpired()
noexcept override {
1065 DestroyTimeout dt(&eb, t1);
1066 dt.scheduleTimeout(10);
1098 TEST(EventBaseTest, ScheduledFnAt) {
1106 std::bind(&TimePoint::reset, ×tamp1), eb.
now() - milliseconds(5));
1108 std::bind(&TimePoint::reset, ×tamp1), eb.
now() + milliseconds(9));
1110 std::bind(&TimePoint::reset, ×tamp2), eb.
now() + milliseconds(19));
1112 std::bind(&TimePoint::reset, ×tamp3), eb.
now() + milliseconds(39));
1131 : opsPerThread(opsPerThread_), opsToGo(numThreads * opsPerThread) {}
1142 :
data(data_), thread(threadId),
value(value_) {}
1161 constexpr
uint32_t numThreads = 50;
1162 constexpr
uint32_t opsPerThread = 100;
1168 for (
auto& thread : threads) {
1174 threads.emplace_back([
i, &data] {
1189 std::bind(&EventBase::terminateLoopSoon, &data.
evb), 3000);
1201 std::chrono::duration_cast<milliseconds>(end.
getTime() - start.
getTime());
1203 VLOG(11) <<
"Time taken: " << timeTaken.count();
1206 int expectedValues[numThreads];
1207 for (
uint32_t n = 0; n < numThreads; ++n) {
1208 expectedValues[n] = 0;
1213 int threadID = it->first;
1214 int value = it->second;
1215 ASSERT_EQ(expectedValues[threadID], value);
1216 ++expectedValues[threadID];
1218 for (
uint32_t n = 0; n < numThreads; ++n) {
1219 ASSERT_EQ(expectedValues[n], opsPerThread);
1226 TEST(EventBaseTest, RunInEventBaseThreadAndWait) {
1227 const size_t c = 256;
1228 vector<unique_ptr<atomic<size_t>>> atoms(c);
1229 for (
size_t i = 0;
i <
c; ++
i) {
1230 auto& atom = atoms.at(
i);
1231 atom = std::make_unique<atomic<size_t>>(0);
1234 for (
size_t i = 0;
i <
c; ++
i) {
1235 threads.emplace_back([&atoms,
i] {
1237 auto& atom = *atoms.at(
i);
1242 atom.compare_exchange_weak(
1243 x, 1, std::memory_order_release, std::memory_order_relaxed);
1246 atom.compare_exchange_weak(
1247 x, 2, std::memory_order_release, std::memory_order_relaxed);
1252 for (
size_t i = 0;
i <
c; ++
i) {
1253 auto& th = threads.at(
i);
1257 for (
auto& atom : atoms) {
1263 TEST(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadAndWaitCross) {
1265 thread th(&EventBase::loopForever, &eb);
1270 auto mutated =
false;
1275 TEST(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadAndWaitWithin) {
1277 thread th(&EventBase::loopForever, &eb);
1283 auto mutated =
false;
1289 TEST(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadNotLooping) {
1291 auto mutated =
false;
1305 std::function<
void()>
action = std::function<
void()>())
1306 : eventBase_(eventBase), count_(count), action_(
action) {}
1311 eventBase_->runInLoop(
this);
1312 }
else if (action_) {
1323 unsigned int count_;
1329 TEST(EventBaseTest, RepeatedRunInLoop) {
1344 TEST(EventBaseTest, RunInLoopNoTimeMeasurement) {
1359 TEST(EventBaseTest, RunInLoopStopLoop) {
1364 &eventBase, 10,
std::bind(&EventBase::terminateLoopSoon, &eventBase));
1387 TEST(EventBaseTest, messageAvailableException) {
1388 auto deadManWalking = [] {
1394 []() {
throw std::runtime_error(
"boom"); });
1399 EXPECT_DEATH(deadManWalking(),
".*");
1402 TEST(EventBaseTest, TryRunningAfterTerminate) {
1405 &eventBase, 1,
std::bind(&EventBase::terminateLoopSoon, &eventBase));
1415 TEST(EventBaseTest, CancelRunInLoop) {
1422 std::function<void()> cancelC1Action =
1423 std::bind(&EventBase::LoopCallback::cancelLoopCallback, &c1);
1424 std::function<void()> cancelC2Action =
1425 std::bind(&EventBase::LoopCallback::cancelLoopCallback, &c2);
1473 eventBase_(eventBase),
1474 loopInvocations_(0),
1475 maxLoopInvocations_(0),
1476 eventInvocations_(0),
1477 maxEventInvocations_(0) {}
1480 loopInvocations_ = 0;
1481 maxLoopInvocations_ = maxLoopInvocations;
1482 eventInvocations_ = 0;
1483 maxEventInvocations_ = maxEventInvocations;
1485 cancelLoopCallback();
1486 unregisterHandler();
1494 ++eventInvocations_;
1495 if (eventInvocations_ >= maxEventInvocations_) {
1499 eventBase_->runInLoop(
this);
1503 if (loopInvocations_ >= maxLoopInvocations_) {
1507 registerHandler(READ);
1511 return loopInvocations_;
1514 return eventInvocations_;
1534 TEST(EventBaseTest, LoopTermination) {
1540 int rc =
pipe(pipeFds);
1546 callback.
reset(10, 100);
1553 callback.
reset(100, 7);
1570 std::deque<std::size_t>& timeout)
1571 :
AsyncTimeout(base), timeouts_(0), timeout_(timeout) {
1580 if (timeout_.empty()) {
1583 std::size_t sleepTime = timeout_.front();
1584 timeout_.pop_front();
1612 std::deque<std::size_t> timeouts0(4, 8080);
1613 timeouts0.push_front(8000);
1614 timeouts0.push_back(14000);
1616 std::deque<std::size_t> timeouts(20, 20);
1617 std::unique_ptr<IdleTimeTimeoutSeries> tos;
1618 bool hostOverloaded =
false;
1624 eventBase.
loopOnce(EVLOOP_NONBLOCK);
1629 int latencyCallbacks = 0;
1632 if (latencyCallbacks != 1) {
1633 FAIL() <<
"Unexpected latency callback";
1639 int64_t usElapsed = duration_cast<microseconds>(
1643 hostOverloaded =
true;
1649 tos = std::make_unique<IdleTimeTimeoutSeries>(&eventBase, timeouts);
1657 if (hostOverloaded) {
1658 SKIP() <<
"host too heavily loaded to execute test";
1674 bool runInLoop =
false;
1675 bool runThisLoop =
false;
1680 eb.
runInLoop([&]() { runInLoop =
true; });
1681 eb.
runInLoop([&]() { runThisLoop =
true; },
true);
1692 TEST(EventBaseTest, EventBaseThreadLoop) {
1702 TEST(EventBaseTest, EventBaseThreadName) {
1707 #if (__GLIBC__ >= 2) && (__GLIBC_MINOR__ >= 12) 1709 pthread_getname_np(pthread_self(), name, 16);
1714 TEST(EventBaseTest, RunBeforeLoop) {
1722 TEST(EventBaseTest, RunBeforeLoopWait) {
1742 TEST(EventBaseTest, StopBeforeLoop) {
1755 std::thread
t([&] { evb.
loop(); });
1765 TEST(EventBaseTest, RunCallbacksOnDestruction) {
1776 TEST(EventBaseTest, LoopKeepAlive) {
1781 std::this_thread::sleep_for(
1782 std::chrono::milliseconds(100));
1784 [&done, loopKeepAlive =
std::move(loopKeepAlive)] { done =
true; });
1794 TEST(EventBaseTest, LoopKeepAliveInLoop) {
1802 std::this_thread::sleep_for(
1803 std::chrono::milliseconds(100));
1805 [&done, loopKeepAlive =
std::move(loopKeepAlive)] { done =
true; });
1816 TEST(EventBaseTest, LoopKeepAliveWithLoopForever) {
1817 std::unique_ptr<EventBase> evb = std::make_unique<EventBase>();
1821 std::thread evThread([&] {
1828 auto* ev = evb.get();
1830 ev->runInEventBaseThreadAndWait(
1832 ASSERT_FALSE(done) <<
"Loop finished before we asked it to";
1833 ev->terminateLoopSoon();
1835 std::this_thread::sleep_for(std::chrono::milliseconds(30));
1837 ev->runInEventBaseThread([keepAlive =
std::move(keepAlive)] {});
1844 TEST(EventBaseTest, LoopKeepAliveShutdown) {
1845 auto evb = std::make_unique<EventBase>();
1849 std::thread
t([&done,
1851 evbPtr = evb.get()]()
mutable {
1852 std::this_thread::sleep_for(
1853 std::chrono::milliseconds(100));
1854 evbPtr->runInEventBaseThread(
1855 [&done, loopKeepAlive =
std::move(loopKeepAlive)] { done =
true; });
1865 TEST(EventBaseTest, LoopKeepAliveAtomic) {
1866 auto evb = std::make_unique<EventBase>();
1869 static constexpr
size_t kNumTasks = 100;
1871 std::vector<std::thread> ts;
1872 std::vector<std::unique_ptr<Baton<>>> batons;
1880 ts.emplace_back([evbPtr = evb.get(), batonPtr = batons[
i].get(), &done] {
1881 std::vector<Executor::KeepAlive<EventBase>> keepAlives;
1882 for (
size_t j = 0; j < kNumTasks; ++j) {
1888 std::this_thread::sleep_for(std::chrono::seconds(1));
1890 for (
auto& keepAlive : keepAlives) {
1891 evbPtr->runInEventBaseThread(
1892 [&done, keepAlive =
std::move(keepAlive)]() { ++done; });
1897 for (
auto& baton : batons) {
1903 EXPECT_EQ(kNumThreads * kNumTasks, done);
1905 for (
auto&
t : ts) {
1910 TEST(EventBaseTest, LoopKeepAliveCast) {
1915 TEST(EventBaseTest, DrivableExecutorTest) {
1919 bool finished =
false;
1923 std::this_thread::sleep_for(std::chrono::microseconds(10));
1944 TEST(EventBaseTest, IOExecutorTest) {
1954 std::weak_ptr<RequestContext> rctx_weak_ptr;
1958 rctx_weak_ptr = RequestContext::saveContext();
1967 EXPECT_EQ(rctx_weak_ptr.expired(),
true);
1972 TEST(EventBaseTest, CancelLoopCallbackRequestContextTest) {
1978 std::weak_ptr<RequestContext> rctx_weak_ptr;
1982 rctx_weak_ptr = RequestContext::saveContext();
1991 EXPECT_EQ(rctx_weak_ptr.expired(),
true);
#define EXPECT_LE(val1, val2)
#define ASSERT_GE(val1, val2)
std::chrono::steady_clock::time_point getTime() const
#define ASSERT_GT(val1, val2)
uint32_t getLoopInvocations() const
std::atomic< int64_t > sum(0)
void runInThreadTestFunc(RunInThreadArg *arg)
std::deque< std::size_t > & timeout_
void write(const T &in, folly::io::Appender &appender)
RunInThreadData(int numThreads, int opsPerThread_)
#define ASSERT_EQ(val1, val2)
~IdleTimeTimeoutSeries() override
unsigned int getCount() const
#define ASSERT_LT(val1, val2)
bool runInEventBaseThreadAndWait(void(*fn)(T *), T *arg)
void handlerReady(uint16_t events) noexceptoverride
uint32_t getEventInvocations() const
#define EXPECT_EQ(val1, val2)
uint32_t maxEventInvocations_
constexpr detail::Map< Move > move
std::chrono::steady_clock::time_point now()
bool runImmediatelyOrRunInEventBaseThreadAndWait(void(*fn)(T *), T *arg)
bool tryRunAfterDelay(Func cob, uint32_t milliseconds, InternalEnum internal=InternalEnum::NORMAL)
void timeoutExpired() noexceptoverride
#define ASSERT_LE(val1, val2)
double getAvgLoopTime() const
auto begin(TestAdlIterable &instance)
#define T_CHECK_TIME_LT(start, end, expectedMS,...)
vector< uint32_t > timeouts_
static size_t const kNumThreads
IdleTimeTimeoutSeries(EventBase *base, std::deque< std::size_t > &timeout)
void handlerReady(uint16_t events) noexceptoverride
—— Concurrent Priority Queue Implementation ——
PartialWriteHandler(EventBase *eventBase, int fd, size_t writeLength)
CountedLoopCallback(EventBase *eventBase, unsigned int count, std::function< void()> action=std::function< void()>())
requires E e noexcept(noexcept(s.error(std::move(e))))
void schedule(Func &&a)
Alias for add() (for Rx consistency)
#define EXPECT_GE(val1, val2)
void scheduleEvents(EventBase *eventBase, int fd, ScheduledEvent *events)
vector< uint32_t >::const_iterator iterator_
void scheduleAt(Func &&fn, TimePoint const &timeout) override
void setMaxLatency(std::chrono::microseconds maxLatency, Func maxLatencyCob)
vector< TimePoint > timestamps
size_t writeUntilFull(int fd)
PipeHandler(EventBase *eventBase, int fd)
void handlerReady(uint16_t) noexceptoverride
std::vector< std::thread::id > threads
ReschedulingTimeout(EventBase *evb, const vector< uint32_t > &timeouts)
void handler(int, siginfo_t *, void *)
virtual TimePoint now()
Get this executor's notion of time. Must be threadsafe.
ssize_t writeToFD(int fd, size_t length)
void runInLoop(LoopCallback *callback, bool thisIteration=false)
uint32_t eventInvocations_
void cancelLoopCallback()
bool loopOnce(int flags=0)
TEST(EventBaseTest, ReadEvent)
size_t read(T &out, folly::io::Cursor &cursor)
auto end(TestAdlIterable &instance)
EventBase * getEventBase() override
Implements the IOExecutor interface.
TestHandler(EventBase *eventBase, int fd)
bool runInEventBaseThread(void(*fn)(T *), T *arg)
uint32_t loopInvocations_
void timeoutExpired() noexceptoverride
static const char *const value
uint32_t maxLoopInvocations_
std::enable_if<!std::is_array< T >::value, std::unique_ptr< T > >::type make_unique(Args &&...args)
void runLoopCallback() noexceptoverride
void reset(uint32_t maxLoopInvocations, uint32_t maxEventInvocations)
size_t readUntilEmpty(int fd)
#define EXPECT_TRUE(condition)
std::enable_if< std::is_same< Unit, B >::value, void >::type setValue()
void setLoadAvgMsec(std::chrono::milliseconds ms)
RunInThreadArg(RunInThreadData *data_, int threadId, int value_)
deque< pair< int, int > > values
int bind(NetworkSocket s, const sockaddr *name, socklen_t namelen)
TerminateTestCallback(EventBase *eventBase, int fd)
void handlerReady(uint16_t events) noexceptoverride
#define EXPECT_NE(val1, val2)
void setName(const std::string &name)
bool scheduleTimeout(uint32_t milliseconds)
void runAfterDelay(Func cob, uint32_t milliseconds, InternalEnum internal=InternalEnum::NORMAL)
void runLoopCallback() noexceptoverride
PartialReadHandler(EventBase *eventBase, int fd, size_t readLength)
Executor::KeepAlive< ExecutorT > getKeepAliveToken(ExecutorT *executor)
#define ASSERT_FALSE(condition)
ssize_t readFromFD(int fd, size_t length)
TestTimeout(EventBase *eventBase)
void handlerReady(uint16_t) noexceptoverride
PUSHMI_INLINE_VAR constexpr detail::get_fn< T > get
bool registerHandler(uint16_t events)
#define ASSERT_TRUE(condition)
void runBeforeLoop(LoopCallback *callback)
std::function< void()> action_
int close(NetworkSocket s)
static constexpr uint64_t data[1]
void drive() override
Implements the DrivableExecutor interface.
#define T_CHECK_TIMEOUT(start, end, expectedMS,...)
void pipe(CPUExecutor cpu, IOExecutor io)
void checkReadUntilEmpty(int fd, size_t expectedLength)
void resetLoadAvg(double value=0.0)
static unordered_set< string > us
EventRecord(uint16_t events_, size_t bytesRead_, size_t bytesWritten_)
void timeoutExpired() noexceptoverride