40 class UDPAcceptor :
public AsyncUDPServerSocket::Callback {
43 :
evb_(evb), n_(n), changePortForWrites_(changePortForWrites) {}
50 std::shared_ptr<folly::AsyncUDPSocket>
socket,
52 std::unique_ptr<folly::IOBuf>
data,
55 lastMsg_ =
data->clone()->moveToFbString().toStdString();
57 auto len =
data->computeChainDataLength();
58 VLOG(4) <<
"Worker " << n_ <<
" read " << len <<
" bytes " 59 <<
"(trun:" << truncated <<
") from " << client.describe() <<
" - " 68 if (changePortForWrites_) {
69 writeSocket = std::make_shared<folly::AsyncUDPSocket>(
evb_);
70 writeSocket->setReuseAddr(
false);
74 }
catch (
const std::exception& ex) {
75 VLOG(4) <<
"Failed to send PONG " << ex.what();
83 bool changePortForWrites_{
true};
92 :
evb_(evb), addr_(addr), evbs_(n) {}
97 socket_ = std::make_unique<AsyncUDPServerSocket>(
evb_, 1500);
100 socket_->bind(addr_);
101 VLOG(4) <<
"Server listening on " << socket_->address().describe();
102 }
catch (
const std::exception& ex) {
103 LOG(FATAL) << ex.what();
106 acceptors_.reserve(evbs_.size());
107 threads_.reserve(evbs_.size());
111 for (
auto& evb : evbs_) {
112 acceptors_.emplace_back(&evb, i, changePortForWrites_);
114 std::thread
t([&]() { evb.loopForever(); });
116 evb.waitUntilRunning();
118 socket_->addListener(&evb, &acceptors_[i]);
135 for (
auto& evb : evbs_) {
136 evb.terminateLoopSoon();
139 for (
auto&
t : threads_) {
154 changePortForWrites_ = changePortForWrites;
161 std::unique_ptr<AsyncUDPServerSocket>
socket_;
162 std::vector<std::thread> threads_;
163 std::vector<folly::EventBase> evbs_;
164 std::vector<UDPAcceptor> acceptors_;
165 bool changePortForWrites_{
true};
182 VLOG(2) <<
"Client bound to " <<
socket_->address().describe();
183 }
catch (
const std::exception& ex) {
184 LOG(FATAL) << ex.what();
196 int ret =
socket_->connect(*connectAddr_);
201 VLOG(2) <<
"Client connected to address=" << *connectAddr_;
223 virtual void writePing(std::unique_ptr<folly::IOBuf> buf) {
236 VLOG(4) <<
"Read " << len <<
" bytes (trun:" << truncated <<
") from " 237 << client.describe() <<
" - " <<
std::string(buf_, len);
238 VLOG(4) << n_ <<
" left";
246 VLOG(4) << ex.what();
253 CHECK(
false) <<
"We unregister reads before closing";
257 VLOG(4) <<
"Timeout expired";
270 connectAddr_ = connectAddr;
278 std::unique_ptr<AsyncUDPSocket>
socket_;
295 void writePing(std::unique_ptr<folly::IOBuf> buf)
override {
297 size_t iovec_len = buf->
fillIov(vec,
sizeof(vec) /
sizeof(vec[0]));
300 vec[0].iov_base =
const_cast<uint8_t*
>(buf->
data());
301 vec[0].iov_len = buf->
length();
306 msg.msg_name =
nullptr;
308 msg.msg_iov =
const_cast<struct iovec*
>(
vec);
309 msg.msg_iovlen = iovec_len;
310 msg.msg_control =
nullptr;
311 msg.msg_controllen = 0;
316 if (errno != EAGAIN || errno != EWOULDBLOCK) {
328 server = std::make_unique<UDPServer>(
333 std::make_unique<std::thread>([
this]() { sevb.loopForever(); });
336 sevb.waitUntilRunning();
341 sevb.runInEventBaseThreadAndWait([&]() { server->start(); });
342 LOG(INFO) <<
"Server listening=" << server->address();
347 sevb.runInEventBaseThread([&]() {
349 sevb.terminateLoopSoon();
353 serverThread->join();
356 std::unique_ptr<UDPClient> performPingPongTest(
358 bool useConnectedWrite);
369 bool useConnectedWrite) {
370 if (useConnectedWrite) {
372 client = std::make_unique<ConnectedWriteUDPClient>(&cevb);
373 client->setShouldConnect(*connectedAddress);
375 client = std::make_unique<UDPClient>(&cevb);
376 if (connectedAddress) {
377 client->setShouldConnect(*connectedAddress);
381 auto clientThread = std::thread([
this]() { cevb.loopForever(); });
384 cevb.waitUntilRunning();
387 cevb.runInEventBaseThread([&]() { client->start(server->address(), 100); });
396 auto pingClient = performPingPongTest(
folly::none,
false);
402 server->setChangePortForWrites(
false);
404 auto pingClient = performPingPongTest(server->address(),
false);
410 server->setChangePortForWrites(
true);
412 auto pingClient = performPingPongTest(server->address(),
false);
418 server->setChangePortForWrites(
false);
421 server->address().getIPAddress(), server->address().getPort() + 1);
422 auto pingClient = performPingPongTest(connectAddr,
false);
428 server->setChangePortForWrites(
false);
430 auto pingClient = performPingPongTest(server->address(),
true);
436 server->setChangePortForWrites(
true);
438 auto pingClient = performPingPongTest(server->address(),
true);
447 server->pauseAccepting();
448 auto pausedClient = performPingPongTest(
folly::none,
false);
452 server->resumeAccepting();
453 auto pingClient = performPingPongTest(
folly::none,
false);
478 errMessageError_(ex);
488 getReadBuffer_(buf, len);
498 onDataAvailable_(client, len, truncated);
533 socket_->setErrMessageCallback(&err);
535 bool errRecvd =
false;
536 #ifdef FOLLY_HAVE_MSG_ERRQUEUE 538 .WillOnce(
Invoke([
this, &errRecvd](
auto& cmsg) {
539 if ((cmsg.cmsg_level == SOL_IP && cmsg.cmsg_type == IP_RECVERR) ||
540 (cmsg.cmsg_level == SOL_IPV6 && cmsg.cmsg_type == IPV6_RECVERR)) {
541 const struct sock_extended_err* serr =
542 reinterpret_cast<const struct sock_extended_err*>(
545 (serr->ee_origin == SO_EE_ORIGIN_ICMP || SO_EE_ORIGIN_ICMP6);
546 LOG(ERROR) <<
"errno " << errnoStr(serr->ee_errno);
550 #endif // FOLLY_HAVE_MSG_ERRQUEUE 558 socket_->setErrMessageCallback(&err);
559 socket_->setErrMessageCallback(
nullptr);
570 socket_->setErrMessageCallback(&err);
572 bool errRecvd =
false;
587 bool errRecvd =
false;
590 .WillRepeatedly(
Invoke([&readBuf](
void** buf,
size_t* len) {
592 *buf = readSpace.first;
593 *len = readSpace.second;
595 ON_CALL(readCb, onReadError_(
_)).WillByDefault(
Invoke([&errRecvd](
auto& ex) {
596 LOG(ERROR) << ex.what();
610 socket.
bind(address);
633 auto writeSocket = std::make_shared<folly::AsyncUDPSocket>(&
evb_);
635 writeSocket->bind(address);
636 std::array<uint8_t, 1024>
data;
637 std::atomic<int> packetsRecvd{0};
639 .WillRepeatedly(
Invoke([&](
void** buf,
size_t* len) {
648 while (packetsRecvd != 1) {
658 while (packetsRecvd != 2 ||
660 now + std::chrono::milliseconds(10)) {
661 std::this_thread::sleep_for(std::chrono::milliseconds(1));
void onDataAvailable(std::shared_ptr< folly::AsyncUDPSocket > socket, const folly::SocketAddress &client, std::unique_ptr< folly::IOBuf > data, bool truncated) noexceptoverride
#define ASSERT_GT(val1, val2)
AsyncUDPSocket & getSocket()
size_t readBuf(Buf &buf, folly::io::Cursor &cursor)
size_t writePing(IOBufQueue &queue, uint64_t opaqueData, bool ack) noexcept
int connect(NetworkSocket s, const sockaddr *name, socklen_t namelen)
ssize_t sendmsg(NetworkSocket socket, const msghdr *message, int flags)
#define ASSERT_EQ(val1, val2)
TEST_F(TestInfoTest, Names)
bool runInEventBaseThreadAndWait(void(*fn)(T *), T *arg)
void writePing(std::unique_ptr< folly::IOBuf > buf) override
void errMessageError(const folly::AsyncSocketException &ex) noexceptoverride
void errMessage(const cmsghdr &cmsg) noexceptoverride
#define EXPECT_EQ(val1, val2)
void onListenStarted() noexceptoverride
std::chrono::steady_clock::time_point now()
constexpr detail::Map< Move > move
virtual void writePing(std::unique_ptr< folly::IOBuf > buf)
std::unique_ptr< UDPServer > server
void onDataAvailable(const folly::SocketAddress &client, size_t len, bool truncated) noexceptoverride
const uint8_t * data() const
void onListenStopped() noexceptoverride
UDPServer(EventBase *evb, folly::SocketAddress addr, int n)
requires E e noexcept(noexcept(s.error(std::move(e))))
std::unique_ptr< UDPClient > performPingPongTest(folly::Optional< folly::SocketAddress > connectedAddress, bool useConnectedWrite)
void start(const folly::SocketAddress &server, int n)
folly::SocketAddress addr_
std::pair< void *, std::size_t > preallocate(std::size_t min, std::size_t newAllocationSize, std::size_t max=std::numeric_limits< std::size_t >::max())
#define MOCK_METHOD3(m,...)
UDPAcceptor(EventBase *evb, int n, bool changePortForWrites)
size_t fillIov(struct iovec *iov, size_t len) const
TestAsyncUDPSocket(EventBase *evb)
bool isInEventBaseThread() const
folly::SocketAddress lastClient_
bool loopOnce(int flags=0)
PolymorphicAction< internal::InvokeAction< FunctionImpl > > Invoke(FunctionImpl function_impl)
void onReadClosed() noexceptoverride
AsyncServerSocket::UniquePtr socket_
std::size_t length() const
void timeoutExpired() noexceptoverride
FOLLY_CPP14_CONSTEXPR bool hasValue() const noexcept
bool runInEventBaseThread(void(*fn)(T *), T *arg)
void onReadError(const folly::AsyncSocketException &ex) noexceptoverride
NetworkSocket socket(int af, int type, int protocol)
virtual void bind(const folly::SocketAddress &address)
void scheduleTimeoutFn(F fn, std::chrono::milliseconds timeout)
void onReadClosed() noexceptoverride
fbstring errnoStr(int err)
#define EXPECT_TRUE(condition)
void setShouldConnect(const folly::SocketAddress &connectAddr)
#define ON_CALL(obj, call)
std::unique_ptr< std::thread > serverThread
#define MOCK_METHOD1(m,...)
MockErrMessageCallback err
void setChangePortForWrites(bool changePortForWrites)
#define EXPECT_CALL(obj, call)
const internal::AnythingMatcher _
UDPClient(EventBase *evb)
void onReadError(const folly::AsyncSocketException &ex) noexceptoverride
void onDataAvailable(const folly::SocketAddress &client, size_t len, bool truncated) noexceptoverride
#define EXPECT_FALSE(condition)
void getReadBuffer(void **buf, size_t *len) noexceptoverride
void sendPong(std::shared_ptr< folly::AsyncUDPSocket > socket) noexcept
virtual bool isBound() const
void getReadBuffer(void **buf, size_t *len) noexceptoverride
static std::unique_ptr< IOBuf > copyBuffer(const void *buf, std::size_t size, std::size_t headroom=0, std::size_t minTailroom=0)
#define MOCK_METHOD2(m,...)
std::shared_ptr< AsyncUDPSocket > socket_
static constexpr uint64_t data[1]
ThreadPoolListHook * addr
folly::SocketAddress address() const
ConnectedWriteUDPClient(EventBase *evb)
std::unique_ptr< UDPClient > client
MockUDPReadCallback readCb
#define MOCK_METHOD0(m,...)