proxygen
QPACKScheme.h
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2015-present, Facebook, Inc.
3  * All rights reserved.
4  *
5  * This source code is licensed under the BSD-style license found in the
6  * LICENSE file in the root directory of this source tree. An additional grant
7  * of patent rights can be found in the PATENTS file in the same directory.
8  *
9  */
10 #pragma once
11 
15 
16 namespace proxygen { namespace compress {
17 
19  public:
20  explicit QPACKScheme(CompressionSimulator* sim, uint32_t tableSize,
21  uint32_t maxBlocking)
22  : CompressionScheme(sim) {
27  client_.setMaxVulnerable(maxBlocking);
28  server_.setMaxBlocking(maxBlocking);
29  }
30 
32  CHECK_EQ(server_.getQueuedBytes(), 0);
33  }
34 
35  struct QPACKAck : public CompressionScheme::Ack {
36  explicit QPACKAck(uint16_t n,
37  uint16_t an,
38  std::unique_ptr<folly::IOBuf> hAck,
39  std::unique_ptr<folly::IOBuf> cAck) :
40  seqn(n),
41  ackSeqn(an),
42  headerAck(std::move(hAck)),
43  controlAck(std::move(cAck)) {
44  }
47  std::unique_ptr<folly::IOBuf> headerAck;
48  std::unique_ptr<folly::IOBuf> controlAck;
49  };
50 
51  std::unique_ptr<Ack> getAck(uint16_t seqn) override {
52  VLOG(4) << "Sending ack for seqn=" << seqn;
53  auto res = std::make_unique<QPACKAck>(seqn, sendAck_++,
56  return std::move(res);
57  }
58  void recvAck(std::unique_ptr<Ack> ack) override {
59  CHECK(ack);
60  auto qpackAck = dynamic_cast<QPACKAck*>(ack.get());
61  CHECK_NOTNULL(qpackAck);
62  VLOG(4) << "Received ack for seqn=" << qpackAck->seqn;
63  CHECK(qpackAck->headerAck);
64  if (qpackAck->controlAck) {
65  qpackAck->headerAck->prependChain(std::move(qpackAck->controlAck));
66  }
67  // The decoder stream must be processed in order
68  acks_.emplace(qpackAck->ackSeqn, std::move(qpackAck->headerAck));
69  do {
70  auto it = acks_.begin();
71  if (it->first != recvAck_) {
72  break;
73  }
74  CHECK_EQ(client_.decodeDecoderStream(std::move(it->second)),
76  recvAck_++;
77  acks_.erase(it);
78  } while (!acks_.empty());
79  }
80 
81  std::pair<FrameFlags, std::unique_ptr<folly::IOBuf>> encode(
82  bool /*newPacket*/,
83  std::vector<compress::Header> allHeaders,
84  SimStats& stats) override {
85  index++;
86  auto result = client_.encode(allHeaders, index);
87  uint16_t len = 0;
88  folly::IOBufQueue queue;
89  static const uint32_t growth = 1400; // chosen arbitrarily
90  folly::io::QueueAppender cursor(&queue, growth);
91  if (result.control) {
92  VLOG(5) << "Writing encodeControlIndex_=" << encodeControlIndex_;
93  len = result.control->computeChainDataLength();
94  cursor.writeBE<uint16_t>(len);
96  cursor.insert(std::move(result.control));
97  // Don't count the framing against the compression ratio, for now
98  // stats.compressed += 3 * sizeof(uint16_t);
99  } else {
100  cursor.writeBE<uint16_t>(0);
101  }
102  if (result.stream) {
103  len = result.stream->computeChainDataLength();
104  }
105  cursor.writeBE<uint16_t>(index);
106  cursor.writeBE<uint16_t>(len);
107  cursor.insert(std::move(result.stream));
110  // OOO is allowed if there has not been an eviction
111  FrameFlags flags(false, false);
112  return {flags, queue.move()};
113  }
114 
116  std::unique_ptr<folly::IOBuf> encodedReq,
117  SimStats& stats,
118  SimStreamingCallback& callback) override {
119  folly::io::Cursor cursor(encodedReq.get());
120  auto toTrim = sizeof(uint16_t) * 3;
121  auto len = cursor.readBE<uint16_t>();
122  if (len > 0) {
123  // check decode result
124  auto controlIndex = cursor.readBE<uint16_t>();
125  toTrim += sizeof(uint16_t);
126  std::unique_ptr<folly::IOBuf> control;
127  cursor.clone(control, len);
128  if (controlIndex == decodeControlIndex_) {
129  // next expected control block, decode
130  VLOG(5) << "decode controlIndex=" << controlIndex;
133  while (!controlQueue_.empty() &&
134  controlQueue_.begin()->first == decodeControlIndex_) {
135  // drain the queue
136  VLOG(5) << "decode controlIndex=" << controlQueue_.begin()->first;
137  auto it = controlQueue_.begin();
140  controlQueue_.erase(it);
141  }
142  } else {
143  // out of order control block, queue it
144  controlQueue_.emplace(controlIndex, std::move(control));
145  }
146  toTrim += len;
147  }
148  auto seqn = cursor.readBE<uint16_t>();
149  callback.seqn = seqn;
150  VLOG(1) << "Decoding request=" << callback.requestIndex
151  << " header seqn=" << seqn
152  << " allowOOO=" << uint32_t(flags.allowOOO);
153  len = cursor.readBE<uint16_t>();
154  folly::IOBufQueue queue;
155  queue.append(std::move(encodedReq));
156  queue.trimStart(toTrim);
157  server_.decodeStreaming(seqn, queue.move(), len, &callback);
158  callback.maybeMarkHolDelay();
161  }
162  }
163 
164  uint32_t getHolBlockCount() const override {
165  return server_.getHolBlockCount();
166  }
167 
170  std::map<uint16_t, std::unique_ptr<folly::IOBuf>> controlQueue_;
173  std::map<uint16_t, std::unique_ptr<folly::IOBuf>> acks_;
176 };
177 
178 }} // namespace proxygen::compress
std::unique_ptr< Ack > getAck(uint16_t seqn) override
Definition: QPACKScheme.h:51
void append(std::unique_ptr< folly::IOBuf > &&buf, bool pack=false)
Definition: IOBufQueue.cpp:143
flags
Definition: http_parser.h:127
std::map< uint16_t, std::unique_ptr< folly::IOBuf > > acks_
Definition: QPACKScheme.h:173
std::unique_ptr< folly::IOBuf > controlAck
Definition: QPACKScheme.h:48
QPACKAck(uint16_t n, uint16_t an, std::unique_ptr< folly::IOBuf > hAck, std::unique_ptr< folly::IOBuf > cAck)
Definition: QPACKScheme.h:36
uint32_t getHolBlockCount() const override
Definition: QPACKScheme.h:164
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
std::map< uint16_t, std::unique_ptr< folly::IOBuf > > controlQueue_
Definition: QPACKScheme.h:170
void decode(FrameFlags flags, std::unique_ptr< folly::IOBuf > encodedReq, SimStats &stats, SimStreamingCallback &callback) override
Definition: QPACKScheme.h:115
STL namespace.
std::unique_ptr< folly::IOBuf > move()
Definition: IOBufQueue.h:459
std::pair< FrameFlags, std::unique_ptr< folly::IOBuf > > encode(bool, std::vector< compress::Header > allHeaders, SimStats &stats) override
Definition: QPACKScheme.h:81
void setMaxVulnerable(uint32_t maxVulnerable)
Definition: QPACKCodec.h:121
QPACKEncoder::EncodeResult encode(std::vector< compress::Header > &headers, uint64_t id) noexcept
Definition: QPACKCodec.cpp:47
void recvAck(std::unique_ptr< Ack > ack) override
Definition: QPACKScheme.h:58
void insert(std::unique_ptr< folly::IOBuf > buf)
Definition: Cursor.h:1169
std::unique_ptr< folly::IOBuf > headerAck
Definition: QPACKScheme.h:47
HPACK::DecodeError decodeEncoderStream(std::unique_ptr< folly::IOBuf > buf)
Definition: QPACKCodec.h:45
void setDecoderHeaderTableMaxSize(uint32_t size)
Definition: QPACKCodec.h:62
std::unique_ptr< folly::IOBuf > encodeHeaderAck(uint64_t streamId)
Definition: QPACKCodec.h:82
static const NoPathIndexingStrategy * getInstance()
HPACK::DecodeError decodeDecoderStream(std::unique_ptr< folly::IOBuf > buf)
Definition: QPACKCodec.h:67
void setMaxBlocking(uint32_t maxBlocking)
Definition: QPACKCodec.h:125
void setHeaderIndexingStrategy(const HeaderIndexingStrategy *indexingStrat)
Definition: QPACKCodec.h:106
uint64_t getQueuedBytes() const
Definition: QPACKCodec.h:117
std::unique_ptr< folly::IOBuf > encodeTableStateSync()
Definition: QPACKCodec.h:78
void setEncoderHeaderTableSize(uint32_t size)
Definition: QPACKCodec.h:58
QPACKScheme(CompressionSimulator *sim, uint32_t tableSize, uint32_t maxBlocking)
Definition: QPACKScheme.h:20
const HTTPHeaderSize & getEncodedSize()
Definition: HeaderCodec.h:62
uint64_t getHolBlockCount() const
Definition: QPACKCodec.h:113
void decodeStreaming(uint64_t streamId, std::unique_ptr< folly::IOBuf > block, uint32_t length, HPACK::StreamingCallback *streamingCb) noexcept
Definition: QPACKCodec.cpp:56
void writeBE(T value)
Definition: Cursor.h:744