proxygen
ProxyHandler.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2015-present, Facebook, Inc.
3  * All rights reserved.
4  *
5  * This source code is licensed under the BSD-style license found in the
6  * LICENSE file in the root directory of this source tree. An additional grant
7  * of patent rights can be found in the PATENTS file in the same directory.
8  *
9  */
10 #include "ProxyHandler.h"
11 
15 #include <proxygen/lib/utils/URL.h>
18 
19 #include "ProxyStats.h"
20 
21 using namespace proxygen;
22 using std::string;
23 using std::unique_ptr;
24 
25 DEFINE_int32(proxy_connect_timeout, 1000,
26  "connect timeout in milliseconds");
27 
28 namespace {
29 static const uint32_t kMinReadSize = 1460;
30 static const uint32_t kMaxReadSize = 4000;
31 
32 static const uint8_t READS_SHUTDOWN = 1;
33 static const uint8_t WRITES_SHUTDOWN = 2;
34 static const uint8_t CLOSED = READS_SHUTDOWN | WRITES_SHUTDOWN;
35 }
36 
37 namespace ProxyService {
38 
39 ProxyHandler::ProxyHandler(ProxyStats* stats, folly::HHWheelTimer* timer):
40  stats_(stats),
41  connector_{this, timer},
42  serverHandler_(*this) {
43 }
44 
46  VLOG(4) << "deleting ProxyHandler";
47 }
48 
49 void ProxyHandler::onRequest(std::unique_ptr<HTTPMessage> headers) noexcept {
50  // This HTTP proxy does not obey the rules in the spec, such as stripping
51  // hop-by-hop headers. Example only!
52 
54  request_ = std::move(headers);
55  proxygen::URL url(request_->getURL());
56 
58  try {
59  // Note, this does a synchronous DNS lookup which is bad in event driven
60  // code
61  addr.setFromHostPort(url.getHost(), url.getPort());
62  } catch (...) {
64  .status(503, "Bad Gateway")
65  .body(folly::to<string>("Could not parse server from URL: ",
66  request_->getURL()))
67  .sendWithEOM();
68  return;
69  }
70 
71 
73  LOG(INFO) << "Trying to connect to " << addr;
75  if (request_->getMethod() == HTTPMethod::CONNECT) {
77  upstreamSock_->connect(this, addr, FLAGS_proxy_connect_timeout);
78  } else {
79  // A more sophisticated proxy would have a connection pool here
81  {{SOL_SOCKET, SO_REUSEADDR}, 1}};
84  std::chrono::milliseconds(FLAGS_proxy_connect_timeout),
85  opts);
86  }
87 }
88 
89 void ProxyHandler::onBody(std::unique_ptr<folly::IOBuf> body) noexcept {
90  if (txn_) {
91  LOG(INFO) << "Forwarding " <<
92  ((body) ? body->computeChainDataLength() : 0) << " body bytes to server";
93  txn_->sendBody(std::move(body));
94  } else if (upstreamSock_) {
95  upstreamEgressPaused_ = true;
96  upstreamSock_->writeChain(this, std::move(body));
100  }
101  } else {
102  LOG(WARNING) << "Dropping " <<
103  ((body) ? body->computeChainDataLength() : 0) << " body bytes to server";
104  }
105 }
106 
108  if (txn_) {
109  LOG(INFO) << "Forwarding client EOM to server";
110  txn_->sendEOM();
111  } else if (upstreamSock_) {
112  LOG(INFO) << "Closing upgraded socket";
113  sockStatus_ |= WRITES_SHUTDOWN;
114  upstreamSock_->shutdownWrite();
115  } else {
116  LOG(INFO) << "Dropping client EOM to server";
117  }
118 }
119 
121  LOG(INFO) << "Established " << *session;
122  session_ = std::make_unique<SessionWrapper>(session);
123  txn_ = session->newTransaction(&serverHandler_);
124  LOG(INFO) << "Forwarding client request: " << request_->getURL()
125  << " to server";
126  txn_->sendHeaders(*request_);
128 }
129 
131  LOG(ERROR) << "Failed to connect: " << folly::exceptionStr(ex);
132  if (!clientTerminated_) {
134  .status(503, "Bad Gateway")
135  .sendWithEOM();
136  } else {
137  abortDownstream();
139  }
140 }
141 
143  unique_ptr<HTTPMessage> msg) noexcept {
144  CHECK(!clientTerminated_);
145  LOG(INFO) << "Forwarding " << msg->getStatusCode() << " response to client";
146  downstream_->sendHeaders(*msg);
147 }
148 
149 void ProxyHandler::onServerBody(std::unique_ptr<folly::IOBuf> chain) noexcept {
150  CHECK(!clientTerminated_);
151  LOG(INFO) << "Forwarding " <<
152  ((chain) ? chain->computeChainDataLength() : 0) << " body bytes to client";
153  downstream_->sendBody(std::move(chain));
154 }
155 
157  if (!clientTerminated_) {
158  LOG(INFO) << "Forwarding server EOM to client";
159  downstream_->sendEOM();
160  }
161 }
162 
164  txn_ = nullptr;
166 }
167 
169  LOG(ERROR) << "Server error: " << error;
170  abortDownstream();
171 }
172 
174  if (!clientTerminated_) {
176  }
177 }
178 
180  if (!clientTerminated_) {
182  }
183 }
184 
186  clientTerminated_ = true;
188 }
189 
191  LOG(ERROR) << "Client error: " << proxygen::getErrorString(err);
192  clientTerminated_ = true;
193  if (txn_) {
194  LOG(ERROR) << "Aborting server txn: " << *txn_;
195  txn_->sendAbort();
196  } else if (upstreamSock_) {
197  upstreamSock_.reset();
198  }
200 }
201 
203  if (txn_) {
204  txn_->pauseIngress();
205  } else if (upstreamSock_) {
206  upstreamSock_->setReadCB(nullptr);
207  }
208 }
209 
211  if (txn_) {
212  txn_->resumeIngress();
213  } else if (upstreamSock_) {
214  upstreamSock_->setReadCB(this);
215  }
216 }
217 
219  if (!clientTerminated_) {
221  }
222 }
223 
225  if (clientTerminated_ && !txn_ &&
226  (!upstreamSock_ || (sockStatus_ == CLOSED && !upstreamEgressPaused_))) {
227  delete this;
228  return true;
229  }
230  return false;
231 }
232 
234  LOG(INFO) << "Connected to upstream " << upstreamSock_;
236  .status(200, "OK")
237  .send();
238  upstreamSock_->setReadCB(this);
240 }
241 
243  connectError(ex);
244 }
245 
246 void ProxyHandler::getReadBuffer(void** bufReturn, size_t* lenReturn) {
247  std::pair<void*,uint32_t> readSpace = body_.preallocate(kMinReadSize,
248  kMaxReadSize);
249  *bufReturn = readSpace.first;
250  *lenReturn = readSpace.second;
251 }
252 
254  body_.postallocate(len);
256 }
257 
259  sockStatus_ |= READS_SHUTDOWN;
260  onServerEOM();
261 }
262 
264  LOG(ERROR) << "Server read error: " << folly::exceptionStr(ex);
265  abortDownstream();
266  upstreamSock_.reset();
268 }
269 
271  upstreamEgressPaused_ = false;
273  downstreamIngressPaused_ = false;
275  }
277 }
278 
279 void ProxyHandler::writeErr(size_t /*bytesWritten*/,
281  LOG(ERROR) << "Server write error: " << folly::exceptionStr(ex);;
282  upstreamEgressPaused_ = false;
283  abortDownstream();
284  upstreamSock_.reset();
286 }
287 
288 
289 }
void onServerEgressPaused() noexcept
void onEgressResumed() noexceptoverride
static const uint32_t kMaxReadSize
ResponseBuilder & status(uint16_t code, const std::string &message)
HTTPTransaction * newTransaction(HTTPTransaction::Handler *handler) override
EventBase * getEventBase() const
virtual void sendEOM() noexcept=0
proxygen::HTTPTransaction * txn_
Definition: ProxyHandler.h:134
ServerTransactionHandler serverHandler_
Definition: ProxyHandler.h:132
ProxyStats *const stats_
Definition: ProxyHandler.h:130
void writeErr(size_t bytesWritten, const folly::AsyncSocketException &ex) noexceptoverride
std::shared_ptr< folly::AsyncSocket > upstreamSock_
Definition: ProxyHandler.h:140
fbstring exceptionStr(const std::exception &e)
void onBody(std::unique_ptr< folly::IOBuf > body) noexceptoverride
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
EventBase * getEventBase()
void getReadBuffer(void **bufReturn, size_t *lenReturn) override
std::unique_ptr< folly::IOBuf > move()
Definition: IOBufQueue.h:459
std::unique_ptr< SessionWrapper > session_
Definition: ProxyHandler.h:133
ResponseBuilder & body(std::unique_ptr< folly::IOBuf > bodyIn)
requires E e noexcept(noexcept(s.error(std::move(e))))
void onError(proxygen::ProxygenError err) noexceptoverride
void onEOM() noexceptoverride
std::unique_ptr< proxygen::HTTPMessage > request_
Definition: ProxyHandler.h:137
requires And< SemiMovable< VN >... > &&SemiMovable< E > auto error(E e)
Definition: error.h:48
static const uint32_t kMinReadSize
static EventBaseManager * get()
std::pair< void *, std::size_t > preallocate(std::size_t min, std::size_t newAllocationSize, std::size_t max=std::numeric_limits< std::size_t >::max())
Definition: IOBufQueue.h:356
virtual void sendAbort() noexcept=0
void onServerHeadersComplete(std::unique_ptr< proxygen::HTTPMessage > msg) noexcept
virtual void resumeIngress() noexcept=0
folly::IOBufQueue body_
Definition: ProxyHandler.h:142
virtual void sendHeaders(HTTPMessage &msg) noexcept=0
void onServerBody(std::unique_ptr< folly::IOBuf > chain) noexcept
std::map< OptionKey, int > OptionMap
Definition: AsyncSocket.h:376
void requestComplete() noexceptoverride
proxygen::HTTPConnector connector_
Definition: ProxyHandler.h:131
void onServerEgressResumed() noexcept
void readDataAvailable(size_t len) noexceptoverride
virtual void recordRequest()
Definition: ProxyStats.h:29
void connectSuccess() noexceptoverride
virtual void sendBody(std::unique_ptr< folly::IOBuf > body) noexcept=0
void onRequest(std::unique_ptr< proxygen::HTTPMessage > headers) noexceptoverride
virtual void sendBody(std::unique_ptr< folly::IOBuf > body)
void onServerError(const proxygen::HTTPException &error) noexcept
void connectError(const folly::AsyncSocketException &ex) override
void writeSuccess() noexceptoverride
DEFINE_int32(proxy_connect_timeout, 1000,"connect timeout in milliseconds")
void connectErr(const folly::AsyncSocketException &ex) noexceptoverride
static std::shared_ptr< AsyncSocket > newSocket(EventBase *evb)
Definition: AsyncSocket.h:281
void readEOF() noexceptoverride
const char * string
Definition: Conv.cpp:212
void connect(folly::EventBase *eventBase, const folly::SocketAddress &connectAddr, std::chrono::milliseconds timeoutMs=std::chrono::milliseconds(0), const folly::AsyncSocket::OptionMap &socketOptions=folly::AsyncSocket::emptyOptionMap, const folly::SocketAddress &bindAddr=folly::AsyncSocket::anyAddress())
void readErr(const folly::AsyncSocketException &ex) noexceptoverride
virtual void pauseIngress() noexcept=0
void postallocate(std::size_t n)
Definition: IOBufQueue.h:380
ResponseHandler * downstream_
void onEgressPaused() noexceptoverride
ThreadPoolListHook * addr
void detachServerTransaction() noexcept
const char * getErrorString(ProxygenError error)