proxygen
AsyncSocketTest.h
Go to the documentation of this file.
1 /*
2  * Copyright 2015-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17 
21 
22 #include <memory>
23 
25 
26 typedef std::function<void()> VoidCallback;
27 
29  public:
32  exception(folly::AsyncSocketException::UNKNOWN, "none") {}
33 
34  void connectSuccess() noexcept override {
36  if (successCallback) {
38  }
39  }
40 
41  void connectErr(const folly::AsyncSocketException& ex) noexcept override {
43  exception = ex;
44  if (errorCallback) {
45  errorCallback();
46  }
47  }
48 
53 };
54 
56  public:
59  bytesWritten(0),
60  exception(folly::AsyncSocketException::UNKNOWN, "none") {}
61 
62  void writeSuccess() noexcept override {
64  if (successCallback) {
66  }
67  }
68 
69  void writeErr(
70  size_t nBytesWritten,
71  const folly::AsyncSocketException& ex) noexcept override {
72  LOG(ERROR) << ex.what();
74  this->bytesWritten = nBytesWritten;
75  exception = ex;
76  if (errorCallback) {
77  errorCallback();
78  }
79  }
80 
82  std::atomic<size_t> bytesWritten;
86 };
87 
89  public:
90  explicit ReadCallback(size_t _maxBufferSz = 4096)
92  exception(folly::AsyncSocketException::UNKNOWN, "none"),
93  buffers(),
94  maxBufferSz(_maxBufferSz) {}
95 
96  ~ReadCallback() override {
97  for (std::vector<Buffer>::iterator it = buffers.begin();
98  it != buffers.end();
99  ++it) {
100  it->free();
101  }
102  currentBuffer.free();
103  }
104 
105  void getReadBuffer(void** bufReturn, size_t* lenReturn) override {
106  if (!currentBuffer.buffer) {
107  currentBuffer.allocate(maxBufferSz);
108  }
109  *bufReturn = currentBuffer.buffer;
110  *lenReturn = currentBuffer.length;
111  }
112 
113  void readDataAvailable(size_t len) noexcept override {
114  currentBuffer.length = len;
115  buffers.push_back(currentBuffer);
116  currentBuffer.reset();
117  if (dataAvailableCallback) {
118  dataAvailableCallback();
119  }
120  }
121 
122  void readEOF() noexcept override {
124  }
125 
126  void readErr(const folly::AsyncSocketException& ex) noexcept override {
128  exception = ex;
129  }
130 
131  void verifyData(const char* expected, size_t expectedLen) const {
132  size_t offset = 0;
133  for (size_t idx = 0; idx < buffers.size(); ++idx) {
134  const auto& buf = buffers[idx];
135  size_t cmpLen = std::min(buf.length, expectedLen - offset);
136  CHECK_EQ(memcmp(buf.buffer, expected + offset, cmpLen), 0);
137  CHECK_EQ(cmpLen, buf.length);
138  offset += cmpLen;
139  }
140  CHECK_EQ(offset, expectedLen);
141  }
142 
143  size_t dataRead() const {
144  size_t ret = 0;
145  for (const auto& buf : buffers) {
146  ret += buf.length;
147  }
148  return ret;
149  }
150 
151  class Buffer {
152  public:
153  Buffer() : buffer(nullptr), length(0) {}
154  Buffer(char* buf, size_t len) : buffer(buf), length(len) {}
155 
156  void reset() {
157  buffer = nullptr;
158  length = 0;
159  }
160  void allocate(size_t len) {
161  assert(buffer == nullptr);
162  this->buffer = static_cast<char*>(malloc(len));
163  this->length = len;
164  }
165  void free() {
166  ::free(buffer);
167  reset();
168  }
169 
170  char* buffer;
171  size_t length;
172  };
173 
176  std::vector<Buffer> buffers;
179  const size_t maxBufferSz;
180 };
181 
183  public:
184  BufferCallback() : buffered_(false), bufferCleared_(false) {}
185 
186  void onEgressBuffered() override {
187  buffered_ = true;
188  }
189 
190  void onEgressBufferCleared() override {
191  bufferCleared_ = true;
192  }
193 
194  bool hasBuffered() const {
195  return buffered_;
196  }
197 
198  bool hasBufferCleared() const {
199  return bufferCleared_;
200  }
201 
202  private:
203  bool buffered_{false};
204  bool bufferCleared_{false};
205 };
206 
207 class ReadVerifier {};
208 
211  public:
213  : flags_(flags),
214  writeFlags_(folly::WriteFlags::NONE),
215  dataSize_(dataSize),
216  data_(data),
217  queriedFlags_(false),
218  queriedData_(false) {}
219 
220  void reset(int flags) {
221  flags_ = flags;
222  writeFlags_ = folly::WriteFlags::NONE;
223  queriedFlags_ = false;
224  queriedData_ = false;
225  }
226 
229  int /*defaultFlags*/) noexcept override {
230  queriedFlags_ = true;
231  if (writeFlags_ == folly::WriteFlags::NONE) {
232  writeFlags_ = flags;
233  } else {
234  assert(flags == writeFlags_);
235  }
236  return flags_;
237  }
238 
240  queriedData_ = true;
241  if (writeFlags_ == folly::WriteFlags::NONE) {
242  writeFlags_ = flags;
243  } else {
244  assert(flags == writeFlags_);
245  }
246  assert(data != nullptr);
247  memcpy(data, data_, dataSize_);
248  }
249 
251  if (writeFlags_ == folly::WriteFlags::NONE) {
252  writeFlags_ = flags;
253  } else {
254  assert(flags == writeFlags_);
255  }
256  return dataSize_;
257  }
258 
259  int flags_;
262  void* data_;
265 };
266 
267 class TestServer {
268  public:
269  // Create a TestServer.
270  // This immediately starts listening on an ephemeral port.
271  explicit TestServer(bool enableTFO = false, int bufSize = -1) : fd_(-1) {
272  namespace fsp = folly::portability::sockets;
273  fd_ = fsp::socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
274  if (fd_ < 0) {
277  "failed to create test server socket",
278  errno);
279  }
280  if (fcntl(fd_, F_SETFL, O_NONBLOCK) != 0) {
283  "failed to put test server socket in "
284  "non-blocking mode",
285  errno);
286  }
287  if (enableTFO) {
288 #if FOLLY_ALLOW_TFO
289  folly::detail::tfo_enable(fd_, 100);
290 #endif
291  }
292 
293  struct addrinfo hints, *res;
294  memset(&hints, 0, sizeof(hints));
295  hints.ai_family = AF_INET;
296  hints.ai_socktype = SOCK_STREAM;
297  hints.ai_flags = AI_PASSIVE;
298 
299  if (getaddrinfo(nullptr, "0", &hints, &res)) {
302  "Attempted to bind address to socket with "
303  "bad getaddrinfo",
304  errno);
305  }
306 
307  SCOPE_EXIT {
308  freeaddrinfo(res);
309  };
310 
311  if (bufSize > 0) {
312  setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &bufSize, sizeof(bufSize));
313  setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &bufSize, sizeof(bufSize));
314  }
315 
316  if (bind(fd_, res->ai_addr, res->ai_addrlen)) {
319  "failed to bind to async server socket for port 10",
320  errno);
321  }
322 
323  if (listen(fd_, 10) != 0) {
326  "failed to listen on test server socket",
327  errno);
328  }
329 
330  address_.setFromLocalAddress(fd_);
331  // The local address will contain 0.0.0.0.
332  // Change it to 127.0.0.1, so it can be used to connect to the server
333  address_.setFromIpPort("127.0.0.1", address_.getPort());
334  }
335 
337  if (fd_ != -1) {
338  close(fd_);
339  }
340  }
341 
342  // Get the address for connecting to the server
344  return address_;
345  }
346 
347  int acceptFD(int timeout = 50) {
348  namespace fsp = folly::portability::sockets;
349  struct pollfd pfd;
350  pfd.fd = fd_;
351  pfd.events = POLLIN;
352  int ret = poll(&pfd, 1, timeout);
353  if (ret == 0) {
356  "test server accept() timed out");
357  } else if (ret < 0) {
360  "test server accept() poll failed",
361  errno);
362  }
363 
364  int acceptedFd = fsp::accept(fd_, nullptr, nullptr);
365  if (acceptedFd < 0) {
368  "test server accept() failed",
369  errno);
370  }
371 
372  return acceptedFd;
373  }
374 
375  std::shared_ptr<BlockingSocket> accept(int timeout = 50) {
376  int fd = acceptFD(timeout);
377  return std::make_shared<BlockingSocket>(fd);
378  }
379 
380  std::shared_ptr<folly::AsyncSocket> acceptAsync(
381  folly::EventBase* evb,
382  int timeout = 50) {
383  int fd = acceptFD(timeout);
384  return folly::AsyncSocket::newSocket(evb, fd);
385  }
386 
391  void verifyConnection(const char* buf, size_t len) {
392  // accept a connection
393  std::shared_ptr<BlockingSocket> acceptedSocket = accept();
394  // read the data and compare it to the specified buffer
395  std::unique_ptr<uint8_t[]> readbuf(new uint8_t[len]);
396  acceptedSocket->readAll(readbuf.get(), len);
397  CHECK_EQ(memcmp(buf, readbuf.get(), len), 0);
398  // make sure we get EOF next
399  uint32_t bytesRead = acceptedSocket->read(readbuf.get(), len);
400  CHECK_EQ(bytesRead, 0);
401  }
402 
403  private:
404  int fd_;
406 };
std::shared_ptr< folly::AsyncSocket > acceptAsync(folly::EventBase *evb, int timeout=50)
std::vector< uint8_t > buffer(kBufferSize+16)
void connectSuccess() noexceptoverride
void writeSuccess() noexceptoverride
void getAncillaryData(folly::WriteFlags flags, void *data) noexceptoverride
flags
Definition: http_parser.h:127
void verifyData(const char *expected, size_t expectedLen) const
folly::AsyncSocketException exception
std::function< void()> VoidCallback
int setsockopt(NetworkSocket s, int level, int optname, const void *optval, socklen_t optlen)
Definition: NetOps.cpp:384
ReadCallback(size_t _maxBufferSz=4096)
std::shared_ptr< BlockingSocket > accept(int timeout=50)
Buffer currentBuffer
folly::AsyncSocketException exception
void readErr(const folly::AsyncSocketException &ex) noexceptoverride
void connectErr(const folly::AsyncSocketException &ex) noexceptoverride
void getReadBuffer(void **bufReturn, size_t *lenReturn) override
#define SCOPE_EXIT
Definition: ScopeGuard.h:274
bool hasBuffered() const
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
TestServer(bool enableTFO=false, int bufSize=-1)
VoidCallback dataAvailableCallback
requires E e noexcept(noexcept(s.error(std::move(e))))
VoidCallback errorCallback
#define nullptr
Definition: http_parser.c:41
const size_t maxBufferSz
~ReadCallback() override
string UNKNOWN
Definition: tokenize.py:53
VoidCallback successCallback
VoidCallback errorCallback
void readEOF() noexceptoverride
VoidCallback successCallback
LogLevel min
Definition: LogLevel.cpp:30
StateEnum state
bool hasBufferCleared() const
int getFlagsImpl(folly::WriteFlags flags, int) noexceptoverride
StateEnum
folly::SocketAddress address_
void allocate(size_t len)
folly::WriteFlags writeFlags_
NetworkSocket socket(int af, int type, int protocol)
Definition: NetOps.cpp:412
int listen(NetworkSocket s, int backlog)
Definition: NetOps.cpp:137
void free()
int acceptFD(int timeout=50)
StateEnum state
TestSendMsgParamsCallback(int flags, uint32_t dataSize, void *data)
int tfo_enable(int, size_t)
StateEnum state
static std::shared_ptr< AsyncSocket > newSocket(EventBase *evb)
Definition: AsyncSocket.h:281
size_t dataRead() const
const folly::SocketAddress & getAddress() const
int poll(PollDescriptor fds[], nfds_t nfds, int timeout)
Definition: NetOps.cpp:141
int bind(NetworkSocket s, const sockaddr *name, socklen_t namelen)
Definition: NetOps.cpp:76
void readDataAvailable(size_t len) noexceptoverride
void onEgressBuffered() override
void writeErr(size_t nBytesWritten, const folly::AsyncSocketException &ex) noexceptoverride
int close(NetworkSocket s)
Definition: NetOps.cpp:90
static constexpr uint64_t data[1]
Definition: Fingerprint.cpp:43
folly::AsyncSocketException exception
void verifyConnection(const char *buf, size_t len)
std::vector< Buffer > buffers
Buffer(char *buf, size_t len)
void onEgressBufferCleared() override
StringPiece data_
state
Definition: http_parser.c:272
uint32_t getAncillaryDataSize(folly::WriteFlags flags) noexceptoverride
NetworkSocket accept(NetworkSocket s, sockaddr *addr, socklen_t *addrlen)
Definition: NetOps.cpp:71
std::atomic< size_t > bytesWritten