72 LOG(ERROR) << ex.what();
74 this->bytesWritten = nBytesWritten;
94 maxBufferSz(_maxBufferSz) {}
97 for (std::vector<Buffer>::iterator it = buffers.begin();
102 currentBuffer.free();
106 if (!currentBuffer.buffer) {
107 currentBuffer.allocate(maxBufferSz);
109 *bufReturn = currentBuffer.buffer;
110 *lenReturn = currentBuffer.length;
114 currentBuffer.length = len;
115 buffers.push_back(currentBuffer);
116 currentBuffer.reset();
117 if (dataAvailableCallback) {
118 dataAvailableCallback();
131 void verifyData(
const char* expected,
size_t expectedLen)
const {
133 for (
size_t idx = 0; idx < buffers.size(); ++idx) {
134 const auto& buf = buffers[idx];
135 size_t cmpLen =
std::min(buf.length, expectedLen - offset);
136 CHECK_EQ(memcmp(buf.buffer, expected + offset, cmpLen), 0);
137 CHECK_EQ(cmpLen, buf.length);
140 CHECK_EQ(offset, expectedLen);
145 for (
const auto& buf : buffers) {
161 assert(
buffer ==
nullptr);
162 this->
buffer =
static_cast<char*
>(malloc(len));
191 bufferCleared_ =
true;
199 return bufferCleared_;
203 bool buffered_{
false};
204 bool bufferCleared_{
false};
217 queriedFlags_(false),
218 queriedData_(false) {}
223 queriedFlags_ =
false;
224 queriedData_ =
false;
230 queriedFlags_ =
true;
234 assert(flags == writeFlags_);
244 assert(
flags == writeFlags_);
246 assert(
data !=
nullptr);
254 assert(
flags == writeFlags_);
271 explicit TestServer(
bool enableTFO =
false,
int bufSize = -1) : fd_(-1) {
273 fd_ =
fsp::socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
277 "failed to create test server socket",
280 if (fcntl(fd_, F_SETFL, O_NONBLOCK) != 0) {
283 "failed to put test server socket in " 293 struct addrinfo hints, *res;
294 memset(&hints, 0,
sizeof(hints));
295 hints.ai_family = AF_INET;
296 hints.ai_socktype = SOCK_STREAM;
297 hints.ai_flags = AI_PASSIVE;
299 if (getaddrinfo(
nullptr,
"0", &hints, &res)) {
302 "Attempted to bind address to socket with " 312 setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &bufSize,
sizeof(bufSize));
313 setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &bufSize,
sizeof(bufSize));
316 if (
bind(fd_, res->ai_addr, res->ai_addrlen)) {
319 "failed to bind to async server socket for port 10",
323 if (
listen(fd_, 10) != 0) {
326 "failed to listen on test server socket",
330 address_.setFromLocalAddress(fd_);
333 address_.setFromIpPort(
"127.0.0.1", address_.getPort());
352 int ret =
poll(&pfd, 1, timeout);
356 "test server accept() timed out");
357 }
else if (ret < 0) {
360 "test server accept() poll failed",
364 int acceptedFd =
fsp::accept(fd_,
nullptr,
nullptr);
365 if (acceptedFd < 0) {
368 "test server accept() failed",
375 std::shared_ptr<BlockingSocket>
accept(
int timeout = 50) {
376 int fd = acceptFD(timeout);
377 return std::make_shared<BlockingSocket>(fd);
383 int fd = acceptFD(timeout);
393 std::shared_ptr<BlockingSocket> acceptedSocket =
accept();
395 std::unique_ptr<uint8_t[]> readbuf(
new uint8_t[len]);
396 acceptedSocket->readAll(readbuf.get(), len);
397 CHECK_EQ(memcmp(buf, readbuf.get(), len), 0);
399 uint32_t bytesRead = acceptedSocket->read(readbuf.get(), len);
400 CHECK_EQ(bytesRead, 0);
std::shared_ptr< folly::AsyncSocket > acceptAsync(folly::EventBase *evb, int timeout=50)
std::vector< uint8_t > buffer(kBufferSize+16)
void connectSuccess() noexceptoverride
void writeSuccess() noexceptoverride
void getAncillaryData(folly::WriteFlags flags, void *data) noexceptoverride
void verifyData(const char *expected, size_t expectedLen) const
folly::AsyncSocketException exception
std::function< void()> VoidCallback
int setsockopt(NetworkSocket s, int level, int optname, const void *optval, socklen_t optlen)
ReadCallback(size_t _maxBufferSz=4096)
std::shared_ptr< BlockingSocket > accept(int timeout=50)
folly::AsyncSocketException exception
void readErr(const folly::AsyncSocketException &ex) noexceptoverride
void connectErr(const folly::AsyncSocketException &ex) noexceptoverride
void getReadBuffer(void **bufReturn, size_t *lenReturn) override
—— Concurrent Priority Queue Implementation ——
TestServer(bool enableTFO=false, int bufSize=-1)
VoidCallback dataAvailableCallback
requires E e noexcept(noexcept(s.error(std::move(e))))
VoidCallback errorCallback
VoidCallback successCallback
VoidCallback errorCallback
void readEOF() noexceptoverride
VoidCallback successCallback
bool hasBufferCleared() const
int getFlagsImpl(folly::WriteFlags flags, int) noexceptoverride
folly::SocketAddress address_
void allocate(size_t len)
folly::WriteFlags writeFlags_
NetworkSocket socket(int af, int type, int protocol)
int listen(NetworkSocket s, int backlog)
int acceptFD(int timeout=50)
TestSendMsgParamsCallback(int flags, uint32_t dataSize, void *data)
int tfo_enable(int, size_t)
static std::shared_ptr< AsyncSocket > newSocket(EventBase *evb)
const folly::SocketAddress & getAddress() const
int poll(PollDescriptor fds[], nfds_t nfds, int timeout)
int bind(NetworkSocket s, const sockaddr *name, socklen_t namelen)
void readDataAvailable(size_t len) noexceptoverride
void onEgressBuffered() override
void writeErr(size_t nBytesWritten, const folly::AsyncSocketException &ex) noexceptoverride
int close(NetworkSocket s)
static constexpr uint64_t data[1]
folly::AsyncSocketException exception
void verifyConnection(const char *buf, size_t len)
std::vector< Buffer > buffers
Buffer(char *buf, size_t len)
void onEgressBufferCleared() override
uint32_t getAncillaryDataSize(folly::WriteFlags flags) noexceptoverride
NetworkSocket accept(NetworkSocket s, sockaddr *addr, socklen_t *addrlen)
std::atomic< size_t > bytesWritten