proxygen
AsyncFizzBase.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2018-present, Facebook, Inc.
3  * All rights reserved.
4  *
5  * This source code is licensed under the BSD-style license found in the
6  * LICENSE file in the root directory of this source tree.
7  */
8 
10 
11 #include <folly/Conv.h>
12 #include <folly/io/Cursor.h>
13 
14 namespace fizz {
15 
17 
21 static const uint32_t kMinReadSize = 1460;
22 static const uint32_t kMaxReadSize = 4000;
23 
28 static const uint32_t kMaxBufSize = 64 * 1024;
29 
31  : folly::WriteChainAsyncTransportWrapper<folly::AsyncTransportWrapper>(
32  std::move(transport)),
33  handshakeTimeout_(*this, transport_->getEventBase()) {}
34 
36  transport_->setReadCB(nullptr);
37 }
38 
40  transport_->closeNow();
41  transport_->setReadCB(nullptr);
43 }
44 
46  return readCallback_;
47 }
48 
50  readCallback_ = callback;
51 
52  if (readCallback_) {
53  if (appDataBuf_) {
54  deliverAppData(nullptr);
55  }
56 
57  if (!good()) {
60  "setReadCB() called with transport in bad state");
61  deliverError(ex);
62  } else {
63  // The read callback may have been unset earlier if our buffer was full.
65  }
66  }
67 }
68 
71  std::unique_ptr<folly::IOBuf>&& buf,
73  appBytesWritten_ += buf->computeChainDataLength();
74 
75  // TODO: break up buf into multiple records
76 
77  writeAppData(callback, std::move(buf), flags);
78 }
79 
81  return appBytesWritten_;
82 }
83 
85  return appBytesReceived_;
86 }
87 
89  transport_->setReadCB(this);
90 }
91 
92 void AsyncFizzBase::startHandshakeTimeout(std::chrono::milliseconds timeout) {
94 }
95 
98 }
99 
100 void AsyncFizzBase::deliverAppData(std::unique_ptr<folly::IOBuf> data) {
101  if (data) {
103  }
104 
105  if (appDataBuf_) {
106  if (data) {
107  appDataBuf_->prependChain(std::move(data));
108  }
109  data = std::move(appDataBuf_);
110  }
111 
112  if (readCallback_ && data) {
115  } else {
116  folly::io::Cursor cursor(data.get());
117  size_t available = 0;
118  while ((available = cursor.totalLength()) != 0 && readCallback_) {
119  void* buf = nullptr;
120  size_t buflen = 0;
121  try {
122  readCallback_->getReadBuffer(&buf, &buflen);
123  } catch (const AsyncSocketException& ase) {
124  return deliverError(ase);
125  } catch (const std::exception& e) {
128  folly::to<std::string>("getReadBuffer() threw ", e.what()));
129  return deliverError(ase);
130  } catch (...) {
133  "getReadBuffer() threw unknown exception");
134  return deliverError(ase);
135  }
136  if (buflen == 0 || buf == nullptr) {
139  "getReadBuffer() returned empty buffer");
140  return deliverError(ase);
141  }
142 
143  size_t bytesToRead = std::min(buflen, available);
144  cursor.pull(buf, bytesToRead);
145  readCallback_->readDataAvailable(bytesToRead);
146  }
147  if (available != 0) {
148  cursor.clone(appDataBuf_, available);
149  }
150  }
151  } else if (data) {
152  appDataBuf_ = std::move(data);
153  }
154 
155  checkBufLen();
156 }
157 
159  const AsyncSocketException& ex,
160  bool closeTransport) {
161  DelayedDestruction::DestructorGuard dg(this);
162 
163  if (readCallback_) {
164  auto readCallback = readCallback_;
165  readCallback_ = nullptr;
167  readCallback->readEOF();
168  } else {
169  readCallback->readErr(ex);
170  }
171  }
172  if (closeTransport) {
173  transport_->close();
174  }
175 }
176 
177 void AsyncFizzBase::getReadBuffer(void** bufReturn, size_t* lenReturn) {
178  std::pair<void*, uint32_t> readSpace =
179  transportReadBuf_.preallocate(kMinReadSize, kMaxReadSize);
180  *bufReturn = readSpace.first;
181  *lenReturn = readSpace.second;
182 }
183 
185  DelayedDestruction::DestructorGuard dg(this);
186 
189  checkBufLen();
190 }
191 
193  return true;
194 }
195 
197  std::unique_ptr<folly::IOBuf> data) noexcept {
198  DelayedDestruction::DestructorGuard dg(this);
199 
202  checkBufLen();
203 }
204 
207  transportError(eof);
208 }
209 
211  transportError(ex);
212 }
213 
215 
217  size_t /* bytesWritten */,
219  transportError(ex);
220 }
221 
223  if (!readCallback_ &&
224  (transportReadBuf_.chainLength() >= kMaxBufSize ||
225  (appDataBuf_ && appDataBuf_->computeChainDataLength() >= kMaxBufSize))) {
226  transport_->setReadCB(nullptr);
227  }
228 }
229 
232  AsyncSocketException::TIMED_OUT, "handshake timeout expired");
233  transportError(eof);
234 }
235 } // namespace fizz
virtual void readBufferAvailable(std::unique_ptr< IOBuf >) noexcept
virtual void readDataAvailable(size_t len) noexcept=0
void append(std::unique_ptr< folly::IOBuf > &&buf, bool pack=false)
Definition: IOBufQueue.cpp:143
virtual void deliverError(const folly::AsyncSocketException &ex, bool closeTransport=true)
folly::IOBufQueue transportReadBuf_
static const uint32_t kMaxReadSize
flags
Definition: http_parser.h:127
size_t chainLength() const
Definition: IOBufQueue.h:492
std::unique_ptr< folly::IOBuf > appDataBuf_
void destroy() override
ReadCallback * getReadCallback() const override
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
EventBase * getEventBase()
AsyncFizzBase(folly::AsyncTransportWrapper::UniquePtr transport)
STL namespace.
void readErr(const folly::AsyncSocketException &ex) noexceptoverride
virtual bool isBufferMovable() noexcept
virtual void startHandshakeTimeout(std::chrono::milliseconds)
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
requires E e noexcept(noexcept(s.error(std::move(e))))
bool good() const override=0
void readEOF() noexceptoverride
static const uint32_t kMinReadSize
std::pair< void *, std::size_t > preallocate(std::size_t min, std::size_t newAllocationSize, std::size_t max=std::numeric_limits< std::size_t >::max())
Definition: IOBufQueue.h:356
void setReadCB(ReadCallback *callback) override
static void destroy()
virtual void cancelHandshakeTimeout()
void writeErr(size_t bytesWritten, const folly::AsyncSocketException &ex) noexceptoverride
~AsyncFizzBase() override
std::unique_ptr< AsyncTransportWrapper, Destructor > UniquePtr
LogLevel min
Definition: LogLevel.cpp:30
void writeSuccess() noexceptoverride
virtual void getReadBuffer(void **bufReturn, size_t *lenReturn)=0
void readDataAvailable(size_t len) noexceptoverride
void handshakeTimeoutExpired() noexcept
Definition: Actions.h:16
HandshakeTimeout handshakeTimeout_
void writeChain(folly::AsyncTransportWrapper::WriteCallback *callback, std::unique_ptr< folly::IOBuf > &&buf, folly::WriteFlags flags=folly::WriteFlags::NONE) override
size_t getAppBytesWritten() const override
virtual void startTransportReads()
bool isBufferMovable() noexceptoverride
std::size_t computeChainDataLength() const
Definition: IOBuf.cpp:501
void getReadBuffer(void **bufReturn, size_t *lenReturn) override
AsyncFizzClient::UniquePtr transport_
bool scheduleTimeout(uint32_t milliseconds)
void readBufferAvailable(std::unique_ptr< folly::IOBuf > data) noexceptoverride
size_t getAppBytesReceived() const override
static const uint32_t kMaxBufSize
AsyncSocketExceptionType getType() const noexcept
virtual void transportError(const folly::AsyncSocketException &ex)=0
void postallocate(std::size_t n)
Definition: IOBufQueue.h:380
virtual void readEOF() noexcept=0
virtual void transportDataAvailable()=0
static constexpr uint64_t data[1]
Definition: Fingerprint.cpp:43
virtual void writeAppData(folly::AsyncTransportWrapper::WriteCallback *callback, std::unique_ptr< folly::IOBuf > &&buf, folly::WriteFlags flags=folly::WriteFlags::NONE)=0
ReadCallback * readCallback_
virtual void deliverAppData(std::unique_ptr< folly::IOBuf > buf)