proxygen
FlowControlFilter.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2015-present, Facebook, Inc.
3  * All rights reserved.
4  *
5  * This source code is licensed under the BSD-style license found in the
6  * LICENSE file in the root directory of this source tree. An additional grant
7  * of patent rights can be found in the PATENTS file in the same directory.
8  *
9  */
11 
12 namespace proxygen {
13 
14 namespace {
15 HTTPException getException(const std::string& msg) {
16  HTTPException ex(HTTPException::Direction::INGRESS_AND_EGRESS, msg);
17  ex.setCodecStatusCode(ErrorCode::FLOW_CONTROL_ERROR);
18  return ex;
19 }
20 
21 }
22 
26  uint32_t recvCapacity):
27  notify_(callback),
28  recvWindow_(codec->getDefaultWindowSize()),
29  sendWindow_(codec->getDefaultWindowSize()),
30  error_(false),
31  sendsBlocked_(false) {
32  if (recvCapacity > 0) {
33  if (recvCapacity < codec->getDefaultWindowSize()) {
34  VLOG(4) << "Ignoring low conn-level recv window size of " << recvCapacity;
35  } else if (recvCapacity > codec->getDefaultWindowSize()) {
36  auto delta = recvCapacity - codec->getDefaultWindowSize();
37  VLOG(4) << "Incrementing default conn-level recv window by " << delta;
38  CHECK(recvWindow_.setCapacity(recvCapacity));
39  codec->generateWindowUpdate(writeBuf, 0, delta);
40  }
41  }
42 }
43 
45  uint32_t capacity) {
46  if (capacity < recvWindow_.getCapacity()) {
47  VLOG(4) << "Ignoring low conn-level recv window size of " << capacity;
48  return;
49  }
50  int32_t delta = capacity - recvWindow_.getCapacity();
51  if (delta < 0) {
52  // For now, we're disallowing shrinking the window, since it can lead
53  // to FLOW_CONTROL_ERRORs if there is data in flight.
54  VLOG(4) << "Refusing to shrink the recv window";
55  return;
56  }
57  VLOG(4) << "Incrementing default conn-level recv window by " << delta;
58  if (!recvWindow_.setCapacity(capacity)) {
59  VLOG(2) << "Failed setting conn-level recv window capacity to " << capacity;
60  return;
61  }
62  toAck_ += delta;
63  if (toAck_ > 0) {
64  call_->generateWindowUpdate(writeBuf, 0, delta);
65  toAck_ = 0;
66  }
67 }
68 
70  uint32_t delta) {
71  toAck_ += delta;
72  bool willAck = (toAck_ > 0 &&
74  VLOG(4) << "processed " << delta << " toAck_=" << toAck_
75  << " bytes, will ack=" << willAck;
76  if (willAck) {
77  CHECK(recvWindow_.free(toAck_));
78  call_->generateWindowUpdate(writeBuf, 0, toAck_);
79  toAck_ = 0;
80  return true;
81  }
82  return false;
83 }
84 
87 }
88 
90  if (error_) {
91  return false;
92  }
93  return call_->isReusable();
94 }
95 
97  std::unique_ptr<folly::IOBuf> chain,
98  uint16_t padding) {
99  uint64_t amount = chain->computeChainDataLength();
100  if (!recvWindow_.reserve(amount + padding)) {
101  error_ = true;
102  HTTPException ex = getException(
103  folly::to<std::string>(
104  "Failed to reserve receive window, window size=",
105  recvWindow_.getSize(), ", amount=", amount));
106  callback_->onError(0, ex, false);
107  } else {
108  if (VLOG_IS_ON(4) && recvWindow_.getSize() == 0) {
109  VLOG(4) << "recvWindow full";
110  }
111  toAck_ += padding;
112  CHECK(recvWindow_.free(padding));
113  callback_->onBody(stream, std::move(chain), padding);
114  }
115 }
116 
118  if (!stream) {
119  bool success = sendWindow_.free(amount);
120  VLOG(4) << "Remote side ack'd " << amount << " bytes, sendWindow=" <<
122  if (!success) {
123  LOG(WARNING) << "Remote side sent connection-level WINDOW_UPDATE "
124  << "that could not be applied. Aborting session.";
125  // If something went wrong applying the flow control change, abort
126  // the entire session.
127  error_ = true;
128  HTTPException ex = getException(
129  folly::to<std::string>(
130  "Failed to update send window, outstanding=",
131  sendWindow_.getOutstanding(), ", amount=", amount));
132  callback_->onError(stream, ex, false);
133  }
135  VLOG(4) << "Send window opened";
136  sendsBlocked_ = false;
138  }
139  // Don't forward.
140  } else {
141  callback_->onWindowUpdate(stream, amount);
142  }
143 }
144 
146  StreamID stream,
147  std::unique_ptr<folly::IOBuf> chain,
148  folly::Optional<uint8_t> padding,
149  bool eom) {
150  uint8_t padLen = padding ? *padding : 0;
151  bool success = sendWindow_.reserve(
152  chain->computeChainDataLength() + padLen);
153  VLOG(5) << "Sending " << chain->computeChainDataLength()
154  << " bytes, sendWindow=" << sendWindow_.getSize();
155 
156  // In the future, maybe make this DCHECK
157  CHECK(success) << "Session-level send window underflowed! "
158  << "Too much data sent without WINDOW_UPDATES!";
159 
160  if (sendWindow_.getNonNegativeSize() == 0) {
161  // Need to inform when the send window is no longer full
162  VLOG(4) << "Send window closed";
163  sendsBlocked_ = true;
165  }
166 
167  return call_->generateBody(writeBuf, stream, std::move(chain), padding,
168  eom);
169 }
170 
172  StreamID stream,
173  uint32_t delta) {
174  CHECK(stream) << " someone tried to manually manipulate a conn-level window";
175  return call_->generateWindowUpdate(writeBuf, stream, delta);
176 }
177 
178 }
uint32_t getCapacity() const
Definition: Window.cpp:32
void onBody(StreamID stream, std::unique_ptr< folly::IOBuf > chain, uint16_t padding) override
void setReceiveWindowSize(folly::IOBufQueue &writeBuf, uint32_t capacity)
size_t generateWindowUpdate(folly::IOBufQueue &writeBuf, StreamID stream, uint32_t delta) override
uint32_t getOutstanding() const
Definition: Window.cpp:37
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
CodecFactory codec
void onWindowUpdate(StreamID stream, uint32_t amount) override
int32_t getSize() const
Definition: Window.cpp:23
bool reserve(uint32_t amount, bool strict=true)
Definition: Window.cpp:41
void writeBuf(const Buf &buf, folly::io::Appender &out)
virtual void onConnectionSendWindowClosed()=0
virtual uint32_t getDefaultWindowSize() const
Definition: HTTPCodec.h:680
uint32_t getDefaultWindowSize() const override
FlowControlFilter(Callback &callback, folly::IOBufQueue &writeBuf, HTTPCodec *codec, uint32_t recvCapacity=0)
std::size_t computeChainDataLength() const
Definition: IOBuf.cpp:501
const char * string
Definition: Conv.cpp:212
bool setCapacity(uint32_t capacity)
Definition: Window.cpp:84
bool isReusable() const override
uint32_t getAvailableSend() const
virtual size_t generateWindowUpdate(folly::IOBufQueue &, StreamID, uint32_t)
Definition: HTTPCodec.h:610
bool free(uint32_t amount)
Definition: Window.cpp:63
size_t generateBody(folly::IOBufQueue &writeBuf, StreamID stream, std::unique_ptr< folly::IOBuf > chain, folly::Optional< uint8_t > padding, bool eom) override
bool ingressBytesProcessed(folly::IOBufQueue &writeBuf, uint32_t delta)
uint32_t getNonNegativeSize() const
Definition: Window.cpp:27