proxygen
HPACKQueue.h
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2017-present, Facebook, Inc.
3  * All rights reserved.
4  *
5  * This source code is licensed under the BSD-style license found in the
6  * LICENSE file in the root directory of this source tree. An additional grant
7  * of patent rights can be found in the PATENTS file in the same directory.
8  *
9  */
10 #pragma once
11 
14 
15 #include <deque>
16 #include <memory>
17 #include <tuple>
18 
19 namespace proxygen {
20 
22  public:
24  : codec_(codec) {}
25 
26  void enqueueHeaderBlock(uint32_t seqn, std::unique_ptr<folly::IOBuf> block,
27  size_t length,
28  HPACK::StreamingCallback* streamingCb,
29  bool oooOk) {
30  if (seqn < nextSeqn_) {
32  return;
33  }
34  if (nextSeqn_ == seqn) {
35  // common case, decode immediately
36  if (decodeBlock(seqn, std::move(block), length, streamingCb,
37  false /* in order */)) {
38  return;
39  }
40  drainQueue();
41  } else {
42  // there's a gap, have to queue
43  auto it = queue_.begin();
44  while (it != queue_.end()) {
45  auto qSeqn = std::get<0>(*it);
46  if (seqn == qSeqn) {
48  return;
49  } else if (seqn < qSeqn) {
50  break;
51  }
52  it++;
53  }
54  if (oooOk) {
55  // out-of-order allowed. Decode the block, but make an empty entry in
56  // the queue
57  if (decodeBlock(seqn, std::move(block), length, streamingCb,
58  true /* ooo */)) {
59  return;
60  }
61  length = 0;
62  streamingCb = nullptr;
63  } else {
65  }
66  VLOG(5) << "queued block=" << seqn << " len=" << length <<
67  " placeholder=" << int32_t(oooOk);
68  queuedBytes_ += length;
69  queue_.emplace(it, seqn, std::move(block), length, streamingCb);
70  }
71  }
72 
74  return holBlockCount_;
75  }
76 
78  return queuedBytes_;
79  }
80 
81  private:
82 
83  // Returns true if this object was destroyed by its callback. Callers
84  // should check the result and immediately return.
85  bool decodeBlock(int32_t seqn, std::unique_ptr<folly::IOBuf> block,
86  size_t length, HPACK::StreamingCallback* cb,
87  bool ooo) {
88  if (length > 0) {
89  VLOG(5) << "decodeBlock for block=" << seqn << " len=" << length;
90  folly::io::Cursor c(block.get());
91  folly::DestructorCheck::Safety safety(*this);
92  codec_.decodeStreaming(c, length, cb);
93  if (safety.destroyed()) {
94  return true;
95  }
96  }
97  if (!ooo) {
98  nextSeqn_++;
99  }
100  return false;
101  }
102 
103  void drainQueue() {
104  while (!queue_.empty() && nextSeqn_ == std::get<0>(queue_.front())) {
105  auto& next = queue_.front();
106  auto length = std::get<2>(next);
107  if (decodeBlock(std::get<0>(next), std::move(std::get<1>(next)),
108  length, std::get<3>(next),
109  false /* in order */)) {
110  return;
111  }
112  DCHECK_LE(length, queuedBytes_);
113  queuedBytes_ -= length;
114  queue_.pop_front();
115  }
116  }
117 
118  size_t nextSeqn_{0};
121  std::deque<std::tuple<uint32_t, std::unique_ptr<folly::IOBuf>, size_t,
124 };
125 
126 }
HPACKQueue(HPACKCodec &codec)
Definition: HPACKQueue.h:23
bool decodeBlock(int32_t seqn, std::unique_ptr< folly::IOBuf > block, size_t length, HPACK::StreamingCallback *cb, bool ooo)
Definition: HPACKQueue.h:85
void decodeStreaming(folly::io::Cursor &cursor, uint32_t length, HPACK::StreamingCallback *streamingCb) noexcept
Definition: HPACKCodec.cpp:67
uint64_t getQueuedBytes() const
Definition: HPACKQueue.h:77
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
CodecFactory codec
void enqueueHeaderBlock(uint32_t seqn, std::unique_ptr< folly::IOBuf > block, size_t length, HPACK::StreamingCallback *streamingCb, bool oooOk)
Definition: HPACKQueue.h:26
std::deque< std::tuple< uint32_t, std::unique_ptr< folly::IOBuf >, size_t, HPACK::StreamingCallback * > > queue_
Definition: HPACKQueue.h:122
uint64_t getHolBlockCount() const
Definition: HPACKQueue.h:73
uint64_t holBlockCount_
Definition: HPACKQueue.h:119
virtual void onDecodeError(HPACK::DecodeError decodeError)=0
char c
HPACKCodec & codec_
Definition: HPACKQueue.h:123
def next(obj)
Definition: ast.py:58