proxygen
QPACKDecoder.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2015-present, Facebook, Inc.
3  * All rights reserved.
4  *
5  * This source code is licensed under the BSD-style license found in the
6  * LICENSE file in the root directory of this source tree. An additional grant
7  * of patent rights can be found in the PATENTS file in the same directory.
8  *
9  */
12 
13 using folly::IOBuf;
14 using folly::io::Cursor;
15 using std::unique_ptr;
17 
18 namespace {
19 const uint32_t kGrowth = 100;
20 }
21 
22 namespace proxygen {
23 
24 // Blocking implementation - may queue
27  std::unique_ptr<folly::IOBuf> block,
28  uint32_t totalBytes,
29  HPACK::StreamingCallback* streamingCb) {
30  Cursor cursor(block.get());
31  HPACKDecodeBuffer dbuf(cursor, totalBytes, maxUncompressed_);
33  uint32_t largestReference = handleBaseIndex(dbuf);
34  if (largestReference > table_.getBaseIndex()) {
35  VLOG(5) << "largestReference=" << largestReference << " > baseIndex=" <<
36  table_.getBaseIndex() << ", queuing";
37  if (queue_.size() >= maxBlocking_) {
38  VLOG(2) << "QPACK queue is full size=" << queue_.size()
39  << " maxBlocking_=" << maxBlocking_;
41  completeDecode(HeaderCodec::Type::QPACK, streamingCb, 0, 0);
42  } else {
44  q.append(std::move(block));
45  q.trimStart(dbuf.consumedBytes());
46  enqueueHeaderBlock(streamID, largestReference, baseIndex_,
47  dbuf.consumedBytes(), q.move(),
48  totalBytes - dbuf.consumedBytes(), streamingCb);
49  }
50  } else {
51  decodeStreamingImpl(largestReference, 0, dbuf, streamingCb);
52  }
53 }
54 
56  uint64_t largestReference;
57  err_ = dbuf.decodeInteger(largestReference);
59  LOG(ERROR) << "Decode error decoding largest reference err_=" << err_;
60  return 0;
61  }
62  VLOG(5) << "Decoded largestReference=" << largestReference;
63  uint64_t delta = 0;
64  if (dbuf.empty()) {
65  LOG(ERROR) << "Invalid prefix, no delta-base";
67  return 0;
68  }
69  bool neg = dbuf.peek() & HPACK::Q_DELTA_BASE_NEG;
70  err_ = dbuf.decodeInteger(HPACK::Q_DELTA_BASE.prefixLength, delta);
72  LOG(ERROR) << "Decode error decoding delta base=" << err_;
73  return 0;
74  }
75  if (neg) {
76  if (delta > largestReference) {
77  LOG(ERROR) << "Invalid delta=" << delta << " largestReference="
78  << largestReference;
80  return 0;
81  }
82  baseIndex_ = largestReference - delta;
83  } else {
84  baseIndex_ = largestReference + delta;
85  }
86  VLOG(5) << "Decoded baseIndex_=" << baseIndex_;
87  return largestReference;
88 }
89 
91  uint32_t largestReference,
92  uint32_t consumed, HPACKDecodeBuffer& dbuf,
93  HPACK::StreamingCallback* streamingCb) {
94  uint32_t emittedSize = 0;
95 
96  while (!hasError() && !dbuf.empty()) {
97  emittedSize += decodeHeaderQ(dbuf, streamingCb);
98  if (emittedSize > maxUncompressed_) {
99  LOG(ERROR) << "exceeded uncompressed size limit of "
100  << maxUncompressed_ << " bytes";
102  break;
103  }
104  emittedSize += 2;
105  }
106 
107  if (!hasError()) {
108  // This is a little premature, since the ack doesn't get generated here.
109  // lastAcked_ is only read in encodeTableStateSync, so all completed header
110  // blocks must be call encodeHeaderAck BEFORE calling encodeTableStateSync.
111  lastAcked_ = std::max(lastAcked_, largestReference);
112  }
114  consumed + dbuf.consumedBytes(), emittedSize);
115 }
116 
118  HPACKDecodeBuffer& dbuf,
119  HPACK::StreamingCallback* streamingCb) {
120  uint8_t byte = dbuf.peek();
121  if (byte & HPACK::Q_INDEXED.code) {
122  return decodeIndexedHeaderQ(
123  dbuf, HPACK::Q_INDEXED.prefixLength, false, streamingCb, nullptr);
124  } else if (byte & HPACK::Q_LITERAL_NAME_REF.code) {
125  return decodeLiteralHeaderQ(
126  dbuf, false, true, HPACK::Q_LITERAL_NAME_REF.prefixLength, false,
127  streamingCb);
128  } else if (byte & HPACK::Q_LITERAL.code) {
129  return decodeLiteralHeaderQ(
130  dbuf, false, false, HPACK::Q_LITERAL.prefixLength, false, streamingCb);
131  } else if (byte & HPACK::Q_INDEXED_POST.code) {
132  return decodeIndexedHeaderQ(
133  dbuf, HPACK::Q_INDEXED_POST.prefixLength, true, streamingCb, nullptr);
134  } else { // Q_LITERAL_NAME_REF_POST
135  return decodeLiteralHeaderQ(
136  dbuf, false, true, HPACK::Q_LITERAL_NAME_REF_POST.prefixLength, true,
137  streamingCb);
138  }
139 }
140 
142  std::unique_ptr<folly::IOBuf> buf) {
143  ingress_.append(std::move(buf));
144  Cursor cursor(ingress_.front());
146  VLOG(6) << "Decoding control block";
147  baseIndex_ = 0;
149  while (!hasError() && !dbuf.empty()) {
153  drainQueue();
155  }
156  }
157  ingress_.trimStart(dbuf.consumedBytes());
158  if (hasError()) {
159  return err_;
160  } else {
161  drainQueue();
163  }
164 }
165 
167  uint8_t byte = dbuf.peek();
169  if (partial_.state == Partial::VALUE ||
171  // If partial state is VALUE, it might have been a NO_NAME_REF instruction,
172  // but we've already parsed the name, so it doesn't matter
174  dbuf, true, true, HPACK::Q_INSERT_NAME_REF.prefixLength, false,
175  nullptr);
176  } else if (byte & HPACK::Q_INSERT_NO_NAME_REF.code) {
178  dbuf, true, false, HPACK::Q_INSERT_NO_NAME_REF.prefixLength, false,
179  nullptr);
180  } else if (byte & HPACK::Q_TABLE_SIZE_UPDATE.code) {
182  } else { // must be Q_DUPLICATE=000
183  headers_t emitted;
185  dbuf, HPACK::Q_DUPLICATE.prefixLength, false, nullptr, &emitted);
186  if (!hasError()) {
187  CHECK(!emitted.empty());
188  table_.add(std::move(emitted[0]));
189  }
190  }
191 }
192 
194  HPACKDecodeBuffer& dbuf,
195  bool indexing,
196  bool nameIndexed,
197  uint8_t prefixLength,
198  bool aboveBase,
199  HPACK::StreamingCallback* streamingCb) {
200  bool allowPartial = (streamingCb == nullptr);
201  Partial localPartial;
202  Partial* partial = (allowPartial) ? &partial_ : &localPartial;
203  if (partial->state == Partial::NAME) {
204  if (nameIndexed) {
205  uint64_t nameIndex = 0;
206  bool isStaticName = !aboveBase && (dbuf.peek() & (1 << prefixLength));
207  err_ = dbuf.decodeInteger(prefixLength, nameIndex);
208  if (allowPartial && err_ == HPACK::DecodeError::BUFFER_UNDERFLOW) {
209  return 0;
210  }
212  LOG(ERROR) << "Decode error decoding index err_=" << err_;
213  return 0;
214  }
215  if (!isStaticName) {
216  nameIndex++;
217  }
218  // validate the index
219  if (!isValid(isStaticName, nameIndex, aboveBase)) {
220  LOG(ERROR) << "received invalid index: " << nameIndex;
222  return 0;
223  }
224  partial->header.name = getHeader(
225  isStaticName, nameIndex, baseIndex_, aboveBase).name;
226  } else {
227  folly::fbstring headerName;
228  err_ = dbuf.decodeLiteral(prefixLength, headerName);
229  if (allowPartial && err_ == HPACK::DecodeError::BUFFER_UNDERFLOW) {
230  return 0;
231  }
233  LOG(ERROR) << "Error decoding header name err_=" << err_;
234  return 0;
235  }
236  partial->header.name = headerName;
237  }
238  partial->state = Partial::VALUE;
239  partial->consumed = dbuf.consumedBytes();
240  }
241  // value
242  err_ = dbuf.decodeLiteral(partial->header.value);
243  if (allowPartial && err_ == HPACK::DecodeError::BUFFER_UNDERFLOW) {
244  return 0;
245  }
247  LOG(ERROR) << "Error decoding header value name=" << partial->header.name
248  << " err_=" << err_;
249  return 0;
250  }
251  partial->state = Partial::NAME;
252 
253  uint32_t emittedSize = emit(partial->header, streamingCb, nullptr);
254 
255  if (indexing) {
256  table_.add(std::move(partial->header));
257  }
258 
259  return emittedSize;
260 }
261 
263  HPACKDecodeBuffer& dbuf,
264  uint32_t prefixLength,
265  bool aboveBase,
266  HPACK::StreamingCallback* streamingCb,
267  headers_t* emitted) {
268  uint64_t index;
269  bool isStatic = !aboveBase && (dbuf.peek() & (1 << prefixLength));
270  err_ = dbuf.decodeInteger(prefixLength, index);
272  LOG(ERROR) << "Decode error decoding index err_=" << err_;
273  return 0;
274  }
275  if (!isStatic) {
276  index++;
277  }
278  // validate the index
279  if (index == 0 || !isValid(isStatic, index, aboveBase)) {
280  LOG(ERROR) << "received invalid index: " << index;
282  return 0;
283  }
284 
285  auto& header = getHeader(isStatic, index, baseIndex_, aboveBase);
286  return emit(header, streamingCb, emitted);
287 }
288 
289 bool QPACKDecoder::isValid(bool isStatic, uint32_t index, bool aboveBase) {
290  if (isStatic) {
291  return getStaticTable().isValid(index);
292  } else {
293  uint32_t baseIndex = baseIndex_;
294  if (aboveBase) {
295  baseIndex = baseIndex + index;
296  index = 1;
297  }
298  return table_.isValid(index, baseIndex);
299  }
300 }
301 
302 std::unique_ptr<folly::IOBuf> QPACKDecoder::encodeTableStateSync() {
304  if (toAck > 0) {
305  VLOG(6) << "encodeTableStateSync toAck=" << toAck;
306  HPACKEncodeBuffer ackEncoder(kGrowth, false);
307  ackEncoder.encodeInteger(toAck, HPACK::Q_TABLE_STATE_SYNC);
309  return ackEncoder.release();
310  } else {
311  return nullptr;
312  }
313 }
314 
315 std::unique_ptr<folly::IOBuf> QPACKDecoder::encodeHeaderAck(
316  uint64_t streamId) const {
317  HPACKEncodeBuffer ackEncoder(kGrowth, false);
318  VLOG(6) << "encodeHeaderAck id=" << streamId;
319  ackEncoder.encodeInteger(streamId, HPACK::Q_HEADER_ACK);
320  return ackEncoder.release();
321 }
322 
323 std::unique_ptr<folly::IOBuf> QPACKDecoder::encodeCancelStream(
324  uint64_t streamId) {
325  // Remove this stream from the queue
326  auto it = queue_.begin();
327  while (it != queue_.end()) {
328  if (it->second.streamID == streamId) {
329  it = queue_.erase(it);
330  } else {
331  it++;
332  }
333  }
334  HPACKEncodeBuffer ackEncoder(kGrowth, false);
335  ackEncoder.encodeInteger(streamId, HPACK::Q_CANCEL_STREAM);
336  return ackEncoder.release();
337 }
338 
341  uint32_t largestReference,
342  uint32_t baseIndex,
343  uint32_t consumed,
344  std::unique_ptr<folly::IOBuf> block,
345  size_t length,
346  HPACK::StreamingCallback* streamingCb) {
347  // TDOO: this queue is currently unbounded and has no timeouts
348  CHECK_GT(largestReference, table_.getBaseIndex());
349  queue_.emplace(
350  std::piecewise_construct,
351  std::forward_as_tuple(largestReference),
352  std::forward_as_tuple(streamID, baseIndex, length, consumed,
353  std::move(block), streamingCb));
354  holBlockCount_++;
355  VLOG(5) << "queued block=" << largestReference << " len=" << length;
356  queuedBytes_ += length;
357 }
358 
359 bool QPACKDecoder::decodeBlock(uint32_t largestReference,
360  const PendingBlock& pending) {
361  if (pending.length > 0) {
362  VLOG(5) << "decodeBlock len=" << pending.length;
363  folly::io::Cursor cursor(pending.block.get());
364  HPACKDecodeBuffer dbuf(cursor, pending.length, maxUncompressed_);
365  DCHECK_LE(pending.length, queuedBytes_);
366  queuedBytes_ -= pending.length;
367  baseIndex_ = pending.baseIndex;
368  folly::DestructorCheck::Safety safety(*this);
369  decodeStreamingImpl(largestReference, pending.consumed, dbuf, pending.cb);
370  // The callback way destroy this, if so stop queue processing
371  if (safety.destroyed()) {
372  return true;
373  }
374  }
375  return false;
376 }
377 
379  auto it = queue_.begin();
380  while (!queue_.empty() && it->first <= table_.getBaseIndex() &&
381  !hasError()) {
382  if (decodeBlock(it->first, it->second)) {
383  return;
384  }
385  queue_.erase(it);
386  it = queue_.begin();
387  }
388 }
389 
390 }
const Instruction Q_TABLE_STATE_SYNC
const Instruction Q_INSERT_NO_NAME_REF
const folly::IOBuf * front() const
Definition: IOBufQueue.h:476
void append(std::unique_ptr< folly::IOBuf > &&buf, bool pack=false)
Definition: IOBufQueue.cpp:143
QPACKHeaderTable table_
Definition: QPACKContext.h:54
size_t chainLength() const
Definition: IOBufQueue.h:492
std::unique_ptr< folly::IOBuf > encodeTableStateSync()
std::unique_ptr< folly::IOBuf > release()
HPACK::DecodeError decodeEncoderStream(std::unique_ptr< folly::IOBuf > buf)
uint32_t decodeLiteralHeaderQ(HPACKDecodeBuffer &dbuf, bool indexing, bool nameIndexed, uint8_t prefixLength, bool aboveBase, HPACK::StreamingCallback *streamingCb)
LogLevel max
Definition: LogLevel.cpp:31
std::vector< HPACKHeader > headers_t
void enqueueHeaderBlock(uint64_t streamId, uint32_t largestReference, uint32_t baseIndex, uint32_t consumed, std::unique_ptr< folly::IOBuf > block, size_t length, HPACK::StreamingCallback *streamingCb)
folly::IOBufQueue ingress_
Definition: QPACKDecoder.h:136
std::multimap< uint32_t, PendingBlock > queue_
Definition: QPACKDecoder.h:126
const Instruction Q_DUPLICATE
const Instruction Q_INDEXED_POST
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
void handleTableSizeUpdate(HPACKDecodeBuffer &dbuf, HeaderTable &table)
uint32_t decodeHeaderQ(HPACKDecodeBuffer &dbuf, HPACK::StreamingCallback *streamingCb)
const Instruction Q_LITERAL_NAME_REF_POST
void decodeEncoderStreamInstruction(HPACKDecodeBuffer &dbuf)
std::unique_ptr< folly::IOBuf > move()
Definition: IOBufQueue.h:459
const Instruction Q_LITERAL
uint32_t getBaseIndex() const
bool isValid(uint32_t index) const
uint32_t consumedBytes() const
uint32_t decodeIndexedHeaderQ(HPACKDecodeBuffer &dbuf, uint32_t prefixLength, bool aboveBase, HPACK::StreamingCallback *streamingCb, headers_t *emitted)
bool add(HPACKHeader header) override
const HPACKHeader & getHeader(bool isStatic, uint32_t index, uint32_t base, bool aboveBase)
const Instruction Q_TABLE_SIZE_UPDATE
auto partial(F &&f, Args &&...args) -> detail::partial::Partial< typename std::decay< F >::type, std::tuple< typename std::decay< Args >::type... >>
Definition: Partial.h:119
void decodeStreaming(uint64_t streamId, std::unique_ptr< folly::IOBuf > block, uint32_t totalBytes, HPACK::StreamingCallback *streamingCb)
uint32_t emit(const HPACKHeader &header, HPACK::StreamingCallback *streamingCb, headers_t *emitted)
HPACK::DecodeError decodeLiteral(folly::fbstring &literal)
const Instruction Q_HEADER_ACK
const Instruction Q_INSERT_NAME_REF
HPACKHeaderName name
Definition: HPACKHeader.h:82
uint32_t encodeInteger(uint64_t value, uint8_t instruction, uint8_t nbit)
bool decodeBlock(uint32_t largestReference, const PendingBlock &pending)
bool isValid(bool isStatic, uint32_t index, bool aboveBase)
const Instruction Q_DELTA_BASE
bool isValid(uint32_t index, uint32_t base=0) const
folly::fbstring value
Definition: HPACKHeader.h:83
void completeDecode(HeaderCodec::Type type, HPACK::StreamingCallback *streamingCb, uint32_t compressedSize, uint32_t emittedSize)
const StaticHeaderTable & getStaticTable() const
Definition: QPACKContext.h:50
std::unique_ptr< folly::IOBuf > block
Definition: QPACKDecoder.h:111
void decodeStreamingImpl(uint32_t largestReference, uint32_t consumed, HPACKDecodeBuffer &dbuf, HPACK::StreamingCallback *streamingCb)
std::unique_ptr< folly::IOBuf > encodeHeaderAck(uint64_t streamId) const
void trimStart(size_t amount)
Definition: IOBufQueue.cpp:255
enum proxygen::QPACKDecoder::Partial::@105 NAME
HPACK::DecodeError decodeInteger(uint8_t nbit, uint64_t &integer)
uint32_t handleBaseIndex(HPACKDecodeBuffer &dbuf)
const uint8_t Q_DELTA_BASE_NEG
HPACK::StreamingCallback * cb
Definition: QPACKDecoder.h:112
uint32_t streamID
Definition: SPDYCodec.cpp:131
const Instruction Q_INDEXED
std::unique_ptr< folly::IOBuf > encodeCancelStream(uint64_t streamId)
const Instruction Q_LITERAL_NAME_REF
const Instruction Q_CANCEL_STREAM