28 #include <sys/eventfd.h> 31 using namespace folly;
35 vector<thread>
threads(nthreads);
37 threads[
i] = thread(cb,
i);
54 _handlerReady(events);
75 write(efd, &val,
sizeof(val));
80 read(efd, &val,
sizeof(val));
86 const size_t writes = 4;
87 size_t readsRemaining = writes;
96 if (--readsRemaining == 0) {
107 const size_t writes = 200;
108 const size_t nproducers = 20;
109 size_t readsRemaining = writes;
120 if (--readsRemaining == 0) {
128 for (
size_t i = 0;
i < writes / nproducers; ++
i) {
129 this_thread::sleep_for(std::chrono::milliseconds(1));
140 const size_t writes = 200;
141 const size_t nproducers = 8;
142 const size_t nconsumers = 20;
143 atomic<size_t> writesRemaining(writes);
144 atomic<size_t> readsRemaining(writes);
151 size_t thReadsRemaining = writes / nconsumers;
158 if (!queue.readIfNotEmpty(val)) {
163 if (--thReadsRemaining == 0) {
172 for (
size_t i = 0;
i < writes / nproducers; ++
i) {
173 this_thread::sleep_for(std::chrono::milliseconds(1));
174 queue.blockingWrite(
nullptr);
200 void runClient(std::function<
void(
int fd)> clientOps) {
201 clientThread = std::thread([serverPortFuture = serverReady.get_future(),
202 clientOps]()
mutable {
203 int clientFd =
socket(AF_INET, SOCK_STREAM, 0);
207 struct hostent* he{
nullptr};
208 struct sockaddr_in server;
210 std::array<const char, 10> hostname = {
"localhost"};
211 he = gethostbyname(hostname.data());
214 memcpy(&server.sin_addr, he->h_addr_list[0], he->h_length);
215 server.sin_family = AF_INET;
218 server.sin_port = serverPortFuture.get();
219 LOG(
INFO) <<
"Server is ready";
222 ::
connect(clientFd, (
struct sockaddr*)&server,
sizeof(server)) == 0);
223 LOG(
INFO) <<
"Server connection available";
235 int listenfd =
socket(AF_INET, SOCK_STREAM, 0);
239 PCHECK(listenfd != -1) <<
"unable to open socket";
241 struct sockaddr_in sin;
242 sin.sin_port = htons(0);
243 sin.sin_addr.s_addr = INADDR_ANY;
244 sin.sin_family = AF_INET;
246 PCHECK(
bind(listenfd, (
struct sockaddr*)&sin,
sizeof(sin)) >= 0)
247 <<
"Can't bind to port";
250 struct sockaddr_in findSockName;
251 socklen_t sz =
sizeof(findSockName);
252 getsockname(listenfd, (
struct sockaddr*)&findSockName, &sz);
253 serverReady.set_value(findSockName.sin_port);
255 struct sockaddr_in cli_addr;
256 socklen_t clilen =
sizeof(cli_addr);
257 serverFd =
accept(listenfd, (
struct sockaddr*)&cli_addr, &clilen);
258 PCHECK(serverFd >= 0) <<
"can't accept";
261 void SetUp()
override {}
263 void TearDown()
override {
269 std::thread clientThread;
270 std::promise<decltype(sockaddr_in::sin_port)> serverReady;
277 TEST_F(EventHandlerOobTest, EPOLLPRI) {
278 auto clientOps = [](
int fd) {
280 int n =
send(fd, buffer, strlen(buffer) + 1, MSG_OOB);
281 LOG(
INFO) <<
"Client send finished";
285 runClient(clientOps);
292 EXPECT_TRUE(EventHandler::EventFlags::PRI & events);
293 std::array<char, 255>
buffer;
294 int n =
read(fd_, buffer.data(), buffer.size());
302 n =
recv(fd_, buffer.data(), buffer.size(), MSG_OOB);
308 } sockHandler(&eb, serverFd);
310 sockHandler.registerHandler(EventHandler::EventFlags::PRI);
311 LOG(
INFO) <<
"Registered Handler";
318 TEST_F(EventHandlerOobTest, OOB_AND_NORMAL_DATA) {
319 auto clientOps = [](
int sockfd) {
322 std::array<char, 2>
buffer = {
"X"};
323 int n =
send(sockfd, buffer.data(), 1, MSG_OOB);
328 std::array<char, 7>
buffer = {
"banana"};
329 int n =
send(sockfd, buffer.data(), buffer.size(), 0);
334 runClient(clientOps);
341 std::array<char, 255>
buffer;
342 if (events & EventHandler::EventFlags::PRI) {
343 int n =
recv(fd_, buffer.data(), buffer.size(), MSG_OOB);
346 registerHandler(EventHandler::EventFlags::READ);
350 if (events & EventHandler::EventFlags::READ) {
351 int n =
recv(fd_, buffer.data(), buffer.size(), 0);
354 eb_->terminateLoopSoon();
362 } sockHandler(&eb, serverFd);
363 sockHandler.registerHandler(
364 EventHandler::EventFlags::PRI | EventHandler::EventFlags::READ);
365 LOG(
INFO) <<
"Registered Handler";
372 TEST_F(EventHandlerOobTest, SWALLOW_OOB) {
373 auto clientOps = [](
int sockfd) {
375 std::array<char, 2>
buffer = {
"X"};
376 int n =
send(sockfd, buffer.data(), 1, MSG_OOB);
381 std::array<char, 7>
buffer = {
"banana"};
382 int n =
send(sockfd, buffer.data(), buffer.size(), 0);
387 runClient(clientOps);
394 std::array<char, 255>
buffer;
395 ASSERT_TRUE(events & EventHandler::EventFlags::READ);
396 int n =
recv(fd_, buffer.data(), buffer.size(), 0);
403 } sockHandler(&eb, serverFd);
404 sockHandler.registerHandler(EventHandler::EventFlags::READ);
405 LOG(
INFO) <<
"Registered Handler";
internal::GtMatcher< Rhs > Gt(Rhs x)
std::vector< uint8_t > buffer(kBufferSize+16)
void efd_write(uint64_t val)
void write(const T &in, folly::io::Appender &appender)
int connect(NetworkSocket s, const sockaddr *name, socklen_t namelen)
#define eventfd(initval, flags)
#define EXPECT_EQ(val1, val2)
—— Concurrent Priority Queue Implementation ——
requires E e noexcept(noexcept(s.error(std::move(e))))
std::vector< std::thread::id > threads
int getsockname(NetworkSocket s, sockaddr *name, socklen_t *namelen)
EventHandlerMock(EventBase *eb, int fd)
PolymorphicAction< internal::InvokeAction< FunctionImpl > > Invoke(FunctionImpl function_impl)
size_t read(T &out, folly::io::Cursor &cursor)
void runInThreadsAndWait(size_t nthreads, function< void(size_t)> cb)
ssize_t send(NetworkSocket s, const void *buf, size_t len, int flags)
NetworkSocket socket(int af, int type, int protocol)
int listen(NetworkSocket s, int backlog)
TEST_F(EventHandlerTest, simple)
#define EXPECT_TRUE(condition)
void handlerReady(uint16_t events) noexceptoverride
#define ASSERT_THAT(value, matcher)
#define MOCK_METHOD1(m,...)
int bind(NetworkSocket s, const sockaddr *name, socklen_t namelen)
#define EXPECT_CALL(obj, call)
const internal::AnythingMatcher _
ssize_t recv(NetworkSocket s, void *buf, size_t len, int flags)
bool registerHandler(uint16_t events)
#define ASSERT_TRUE(condition)
int close(NetworkSocket s)
NetworkSocket accept(NetworkSocket s, sockaddr *addr, socklen_t *addrlen)