proxygen
ZeroCopy.h
Go to the documentation of this file.
1 /*
2  * Copyright 2017-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <folly/ExceptionWrapper.h>
20 #include <folly/SocketAddress.h>
21 #include <folly/io/IOBufQueue.h>
25 
26 namespace folly {
27 
29  public:
31  size_t* counter,
32  folly::EventBase* evb,
33  int numLoops,
34  size_t bufferSize,
35  bool zeroCopy)
36  : counter_(counter),
37  evb_(evb),
38  numLoops_(numLoops),
39  sock_(new folly::AsyncSocket(evb)),
40  callback_(this),
41  client_(true) {
42  setBufferSize(bufferSize);
43  setZeroCopy(zeroCopy);
44  }
45 
47  size_t* counter,
48  folly::EventBase* evb,
49  int fd,
50  int numLoops,
51  size_t bufferSize,
52  bool zeroCopy)
53  : counter_(counter),
54  evb_(evb),
55  numLoops_(numLoops),
56  sock_(new folly::AsyncSocket(evb, fd)),
57  callback_(this),
58  client_(false) {
59  setBufferSize(bufferSize);
60  setZeroCopy(zeroCopy);
61  // enable reads
62  if (sock_) {
63  sock_->setReadCB(&callback_);
64  }
65  }
66 
68  clearBuffers();
69  }
70 
71  void connect(const folly::SocketAddress& remote) {
72  if (sock_) {
73  sock_->connect(&callback_, remote);
74  }
75  }
76 
78  return sock_->isZeroCopyWriteInProgress();
79  }
80 
81  private:
82  void setZeroCopy(bool enable) {
83  zeroCopy_ = enable;
84  if (sock_) {
85  sock_->setZeroCopy(zeroCopy_);
86  }
87  }
88 
89  void setBufferSize(size_t bufferSize) {
90  clearBuffers();
91  bufferSize_ = bufferSize;
92 
93  readBuffer_ = new char[bufferSize_];
94  }
95 
98  public:
100 
101  void connectSuccess() noexcept override {
102  parent_->sock_->setReadCB(this);
103  parent_->onConnected();
104  }
105 
107  LOG(ERROR) << "Connect error: " << ex.what();
109  }
110 
111  void getReadBuffer(void** bufReturn, size_t* lenReturn) override {
112  parent_->getReadBuffer(bufReturn, lenReturn);
113  }
114 
115  void readDataAvailable(size_t len) noexcept override {
117  }
118 
119  void readEOF() noexcept override {
121  }
122 
123  void readErr(const folly::AsyncSocketException& ex) noexcept override {
125  }
126 
127  private:
129  };
130 
131  void clearBuffers() {
132  if (readBuffer_) {
133  delete[] readBuffer_;
134  }
135  }
136 
137  void getReadBuffer(void** bufReturn, size_t* lenReturn) {
138  *bufReturn = readBuffer_ + readOffset_;
139  *lenReturn = bufferSize_ - readOffset_;
140  }
141 
142  void readDataAvailable(size_t len) noexcept {
143  readOffset_ += len;
144  if (readOffset_ == bufferSize_) {
145  readOffset_ = 0;
146  onDataReady();
147  }
148  }
149 
150  void onConnected() {
152  writeBuffer();
153  }
154 
155  void onDataReady() {
156  currLoop_++;
157  if (client_ && currLoop_ >= numLoops_) {
158  evb_->runInLoop(
159  [this] {
160  if (counter_ && (0 == --(*counter_))) {
162  }
163  },
164  false /*thisIteration*/);
165  return;
166  }
167  writeBuffer();
168  }
169 
171  if (client_) {
172  if (counter_ && (0 == --(*counter_))) {
174  }
175  }
176  }
177 
178  bool writeBuffer() {
179  // use calloc to make sure the memory is touched
180  // if the memory is just malloc'd, running the zeroCopyOn
181  // and the zeroCopyOff back to back on a system that does not support
182  // zerocopy leads to the second test being much slower
183  writeBuffer_ =
185 
186  if (sock_ && writeBuffer_) {
187  sock_->writeChain(
188  nullptr,
191  }
192 
193  return true;
194  }
195 
196  size_t* counter_{nullptr};
198  int numLoops_{0};
199  int currLoop_{0};
200  bool zeroCopy_{false};
201 
204 
205  size_t bufferSize_{0};
206  size_t readOffset_{0};
207  char* readBuffer_{nullptr};
208  std::unique_ptr<folly::IOBuf> writeBuffer_;
209 
210  bool client_;
211 };
212 
214  public:
216  folly::EventBase* evb,
217  int numLoops,
218  size_t bufferSize,
219  bool zeroCopy)
220  : evb_(evb),
221  numLoops_(numLoops),
222  bufferSize_(bufferSize),
223  zeroCopy_(zeroCopy) {}
224 
226  sock.addAcceptCallback(this, evb_);
227  }
228 
230  int fd,
231  const folly::SocketAddress& /* unused */) noexcept override {
232  auto client = std::make_shared<ZeroCopyTestAsyncSocket>(
233  nullptr, evb_, fd, numLoops_, bufferSize_, zeroCopy_);
234  clients_[client.get()] = client;
235  }
236 
237  void acceptError(const std::exception&) noexcept override {}
238 
239  private:
242  size_t bufferSize_;
243  bool zeroCopy_;
244  std::unique_ptr<ZeroCopyTestAsyncSocket> client_;
245  std::unordered_map<
247  std::shared_ptr<ZeroCopyTestAsyncSocket>>
249 };
250 
252  public:
253  explicit ZeroCopyTest(
254  size_t numClients,
255  int numLoops,
256  bool zeroCopy,
257  size_t bufferSize);
258  bool run();
259 
260  private:
261  void connectAll() {
262  SocketAddress addr = listenSock_->getAddress();
263  for (auto& client : clients_) {
264  client->connect(addr);
265  }
266  }
267 
268  size_t numClients_;
269  size_t counter_;
271  bool zeroCopy_;
272  size_t bufferSize_;
273 
275  std::vector<std::unique_ptr<ZeroCopyTestAsyncSocket>> clients_;
278 };
279 
280 } // namespace folly
void connect(const folly::SocketAddress &remote)
Definition: ZeroCopy.h:71
void connectSuccess() noexceptoverride
Definition: ZeroCopy.h:101
std::unordered_map< ZeroCopyTestAsyncSocket *, std::shared_ptr< ZeroCopyTestAsyncSocket > > clients_
Definition: ZeroCopy.h:248
bool isZeroCopyWriteInProgress() const
Definition: ZeroCopy.h:77
void setZeroCopy(bool enable)
Definition: ZeroCopy.h:82
void addCallbackToServerSocket(folly::AsyncServerSocket &sock)
Definition: ZeroCopy.h:225
void connectionAccepted(int fd, const folly::SocketAddress &) noexceptoverride
Definition: ZeroCopy.h:229
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
ZeroCopyTestServer server_
Definition: ZeroCopy.h:277
ZeroCopyTestServer(folly::EventBase *evb, int numLoops, size_t bufferSize, bool zeroCopy)
Definition: ZeroCopy.h:215
folly::EventBase * evb_
Definition: ZeroCopy.h:197
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
requires E e noexcept(noexcept(s.error(std::move(e))))
Callback(ZeroCopyTestAsyncSocket *parent)
Definition: ZeroCopy.h:99
void setBufferSize(size_t bufferSize)
Definition: ZeroCopy.h:89
void connectErr(const folly::AsyncSocketException &ex) noexceptoverride
Definition: ZeroCopy.h:106
void runInLoop(LoopCallback *callback, bool thisIteration=false)
Definition: EventBase.cpp:520
void readEOF() noexceptoverride
Definition: ZeroCopy.h:119
folly::EventBase * evb_
Definition: ZeroCopy.h:240
static void run(EventBaseManager *ebm, EventBase *eb, folly::Baton<> *stop, const StringPiece &name)
void readErr(const folly::AsyncSocketException &ex) noexceptoverride
Definition: ZeroCopy.h:123
void readDataAvailable(size_t len) noexcept
Definition: ZeroCopy.h:142
void terminateLoopSoon()
Definition: EventBase.cpp:493
void getReadBuffer(void **bufReturn, size_t *lenReturn) override
Definition: ZeroCopy.h:111
folly::AsyncSocket::UniquePtr sock_
Definition: ZeroCopy.h:202
std::unique_ptr< AsyncServerSocket, Destructor > UniquePtr
std::unique_ptr< folly::IOBuf > writeBuffer_
Definition: ZeroCopy.h:208
std::vector< std::unique_ptr< ZeroCopyTestAsyncSocket > > clients_
Definition: ZeroCopy.h:275
void getReadBuffer(void **bufReturn, size_t *lenReturn)
Definition: ZeroCopy.h:137
static std::unique_ptr< IOBuf > takeOwnership(void *buf, std::size_t capacity, FreeFunction freeFn=nullptr, void *userData=nullptr, bool freeOnError=true)
Definition: IOBuf.h:304
std::atomic< int > counter
void acceptError(const std::exception &) noexceptoverride
Definition: ZeroCopy.h:237
ZeroCopyTestAsyncSocket * parent_
Definition: ZeroCopy.h:128
void onDataFinish(folly::exception_wrapper)
Definition: ZeroCopy.h:170
void readDataAvailable(size_t len) noexceptoverride
Definition: ZeroCopy.h:115
ZeroCopyTestAsyncSocket(size_t *counter, folly::EventBase *evb, int numLoops, size_t bufferSize, bool zeroCopy)
Definition: ZeroCopy.h:30
ThreadPoolListHook * addr
std::unique_ptr< AsyncSocket, Destructor > UniquePtr
Definition: AsyncSocket.h:83
EventBase evb_
Definition: ZeroCopy.h:274
folly::Function< void()> parent
Definition: AtFork.cpp:34
std::unique_ptr< ZeroCopyTestAsyncSocket > client_
Definition: ZeroCopy.h:244
ZeroCopyTestAsyncSocket(size_t *counter, folly::EventBase *evb, int fd, int numLoops, size_t bufferSize, bool zeroCopy)
Definition: ZeroCopy.h:46
virtual void addAcceptCallback(AcceptCallback *callback, EventBase *eventBase, uint32_t maxAtOnce=kDefaultCallbackAcceptAtOnce)
folly::AsyncServerSocket::UniquePtr listenSock_
Definition: ZeroCopy.h:276