proxygen
QPACKInterop.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2018-present, Facebook, Inc.
3  * All rights reserved.
4  *
5  * This source code is licensed under the BSD-style license found in the
6  * LICENSE file in the root directory of this source tree. An additional grant
7  * of patent rights can be found in the PATENTS file in the same directory.
8  *
9  */
10 #include <folly/File.h>
11 #include <folly/FileUtil.h>
12 #include <folly/init/Init.h>
13 #include <folly/io/Cursor.h>
19 #include <fstream>
20 
21 using namespace proxygen;
22 using namespace proxygen::compress;
23 using namespace folly;
24 using namespace folly::io;
25 
26 DEFINE_string(output, "compress.out", "Output file for encoding");
27 DEFINE_string(input, "compress.in", "Input file for decoding");
28 DEFINE_string(har, "", "HAR file to compress or compare");
29 DEFINE_string(mode, "encode", "<encode|decode>");
30 DEFINE_bool(ack, true, "Encoder assumes immediate ack of all frames");
31 DEFINE_int32(table_size, 4096, "Dynamic table size");
32 DEFINE_int32(max_blocking, 100, "Max blocking streams");
33 DEFINE_bool(public, false, "Public HAR file");
34 
35 namespace {
36 
37 void writeFrame(folly::io::QueueAppender& appender,
38  uint64_t streamId,
39  std::unique_ptr<folly::IOBuf> buf) {
40  appender.writeBE<uint64_t>(streamId);
41  appender.writeBE<uint32_t>(buf->computeChainDataLength());
42  appender.insert(std::move(buf));
43 }
44 
45 void encodeBlocks(QPACKCodec& decoder,
46  std::vector<std::vector<compress::Header>>& blocks) {
47  uint64_t streamId = 1;
48  QPACKCodec encoder;
49  encoder.setMaxVulnerable(FLAGS_max_blocking);
50  encoder.setEncoderHeaderTableSize(FLAGS_table_size);
51  folly::File outputF(FLAGS_output, O_CREAT | O_RDWR | O_TRUNC);
52  IOBufQueue outbuf;
53  QueueAppender appender(&outbuf, 1000);
54  uint64_t bytesIn = 0;
55  uint64_t bytesOut = 0;
56  for (auto& block : blocks) {
57  auto result = encoder.encode(block, streamId);
58  // always write stream before control to test decoder blocking
59  if (result.stream) {
60  decoder.decodeStreaming(streamId, result.stream->clone(),
61  result.stream->computeChainDataLength(),
62  nullptr);
63  writeFrame(appender, streamId, std::move(result.stream));
64  if (FLAGS_ack) {
65  encoder.decodeDecoderStream(decoder.encodeHeaderAck(streamId));
66  }
67  }
68  if (result.control) {
69  decoder.decodeEncoderStream(result.control->clone());
70  writeFrame(appender, 0, std::move(result.control));
71  if (FLAGS_ack) {
72  // There can be TSS when the decoder is non-blocking
73  auto res = decoder.encodeTableStateSync();
74  if (res) {
75  encoder.decodeDecoderStream(std::move(res));
76  }
77  }
78  }
79  bytesIn += encoder.getEncodedSize().uncompressed;
80  auto out = outbuf.move();
81  auto iov = out->getIov();
82  bytesOut += writevFull(outputF.fd(), iov.data(), iov.size());
83  streamId++;
84  }
85  LOG(INFO) << "Encoded " << (streamId - 1) << " streams. Bytes in="
86  << bytesIn << " Bytes out=" << bytesOut << " Ratio="
87  << int32_t(100 * (1 - (bytesOut / double(bytesIn))));
88 }
89 
90 void encodeHar(QPACKCodec& decoder, const proxygen::HTTPArchive& har) {
91  std::vector<std::vector<compress::Header>> blocks;
92  std::vector<std::vector<std::string>> cookies{har.requests.size()};
93  uint32_t i = 0;
94  for (auto& req : har.requests) {
95  blocks.emplace_back(prepareMessageForCompression(req, cookies[i++]));
96  }
97  encodeBlocks(decoder, blocks);
98 }
99 
100 class Reader {
101  std::string filename;
102 
103  public:
104  explicit Reader(const std::string& fname)
105  : filename(fname) {}
106  virtual ~Reader() {}
107 
108  virtual ssize_t read() {
109  ssize_t rc = -1;
111  folly::File inputF(filename, O_RDONLY);
112  do {
113  auto pre = inbuf.preallocate(4096, 4096);
114  rc = readNoInt(inputF.fd(), pre.first, pre.second);
115  if (rc < 0) {
116  LOG(ERROR) << "Read failed on " << FLAGS_input;
117  return 1;
118  }
119  inbuf.postallocate(rc);
120  onIngress(inbuf);
121  } while (rc != 0);
122  if (!inbuf.empty()) {
123  LOG(ERROR) << "Premature end of file";
124  return 1;
125  }
126 
127  return rc;
128  }
129 
130  virtual void onIngress(folly::IOBufQueue& inbuf) = 0;
131 };
132 
133 class CompressedReader : public Reader {
134  enum { HEADER, DATA } state{HEADER};
135  uint64_t streamId{0};
136  uint32_t length{0};
137  std::function<void(uint64_t, uint32_t,
138  std::unique_ptr<folly::IOBuf>)> callback;
139 
140  public:
141  explicit CompressedReader(
142  std::function<void(uint64_t, uint32_t,
143  std::unique_ptr<folly::IOBuf>)> cb)
144  : Reader(FLAGS_input),
145  callback(cb) {}
146 
147  void onIngress(folly::IOBufQueue& inbuf) override {
148  while (true) {
149  if (state == HEADER) {
150  if (inbuf.chainLength() < (sizeof(uint64_t) + sizeof(uint32_t))) {
151  return;
152  }
153  Cursor c(inbuf.front());
154  streamId = c.readBE<uint64_t>();
155  length = c.readBE<uint32_t>();
156  inbuf.trimStart(sizeof(uint64_t) + sizeof(uint32_t));
157  state = DATA;
158  }
159  if (state == DATA) {
160  if (inbuf.chainLength() < length) {
161  return;
162  }
163  auto buf = inbuf.split(length);
164  callback(streamId, length, std::move(buf));
165  state = HEADER;
166  }
167  }
168  }
169 };
170 
171 int decodeAndVerify(QPACKCodec& decoder, const proxygen::HTTPArchive& har) {
172  std::map<uint64_t, SimStreamingCallback> streams;
173  CompressedReader creader(
174  [&] (uint64_t streamId, uint32_t length,
175  std::unique_ptr<folly::IOBuf> buf) {
176  if (streamId == 0) {
177  CHECK_EQ(decoder.decodeEncoderStream(std::move(buf)),
179  } else {
180  auto res = streams.emplace(std::piecewise_construct,
181  std::forward_as_tuple(streamId),
182  std::forward_as_tuple(streamId, nullptr,
183  FLAGS_public));
184  decoder.decodeStreaming(
185  streamId, std::move(buf), length, &res.first->second);
186  }
187  });
188  if (creader.read()) {
189  return 1;
190  }
191  size_t i = 0;
192  for (const auto& req : streams) {
193  if (req.second.error != HPACK::DecodeError::NONE) {
194  LOG(ERROR) << "request=" << req.first
195  << " failed to decode error=" << req.second.error;
196  return 1;
197  }
198  if (!(req.second.msg == har.requests[i])) {
199  LOG(ERROR) << "requests are not equal, got=" << req.second.msg
200  << " expected=" << har.requests[i];
201  }
202  i++;
203  }
204  LOG(INFO) << "Verified " << i << " streams.";
205  return 0;
206 }
207 
208 class QIFCallback : public HPACK::StreamingCallback {
209  public:
210  QIFCallback(uint64_t id_, std::ofstream& of_) :
211  id(id_),
212  of(of_) {}
213 
214  void onHeader(const folly::fbstring& name,
215  const folly::fbstring& value) override {
216  if (first) {
217  of << "# stream " << id << std::endl;
218  first = false;
219  }
220  of << name << "\t" << value << std::endl;
221  }
222  void onHeadersComplete(HTTPHeaderSize /*decodedSize*/) override {
223  of << std::endl;
224  complete = true;
225  }
226  void onDecodeError(HPACK::DecodeError decodeError) override {
227  LOG(FATAL) << "Decode error with stream=" << id << " err=" << decodeError;
228  }
229 
230  uint64_t id{0};
231  std::ofstream& of;
232  bool first{true};
233  bool complete{false};
234 };
235 
236 int decodeToQIF(QPACKCodec& decoder) {
237  std::ofstream of(FLAGS_output, std::ofstream::trunc);
238  std::map<uint64_t, QIFCallback> streams;
239  uint64_t encoderStreamBytes = 0;
240  CompressedReader creader(
241  [&] (uint64_t streamId, uint32_t length,
242  std::unique_ptr<folly::IOBuf> buf) {
243  if (streamId == 0) {
244  CHECK_EQ(decoder.decodeEncoderStream(std::move(buf)),
246  encoderStreamBytes += length;
247  } else {
248  auto res = streams.emplace(std::piecewise_construct,
249  std::forward_as_tuple(streamId),
250  std::forward_as_tuple(streamId, of));
251  decoder.decodeStreaming(
252  streamId, std::move(buf), length, &res.first->second);
253  }
254  });
255  if (creader.read()) {
256  return 1;
257  }
258 
259  for (const auto& stream : streams) {
260  CHECK(stream.second.complete) << "Stream " << stream.first <<
261  " didn't complete";
262  }
263  LOG(INFO) << "encoderStreamBytes=" << encoderStreamBytes;
264  return 0;
265 }
266 
267 int interopHAR(QPACKCodec& decoder) {
268  std::unique_ptr<HTTPArchive> har = (FLAGS_public) ?
269  HTTPArchive::fromPublicFile(FLAGS_har) :
270  HTTPArchive::fromFile(FLAGS_har);
271  if (!har) {
272  LOG(ERROR) << "Failed to read har file='" << FLAGS_har << "'";
273  return 1;
274  }
275  if (FLAGS_mode == "encode") {
276  encodeHar(decoder, *har);
277  } else if (FLAGS_mode == "decode") {
278  return decodeAndVerify(decoder, *har);
279  } else {
280  LOG(ERROR) << "Usage" << std::endl;
281  return 1;
282  }
283  return 0;
284 }
285 
286 struct QIFReader : public Reader {
287 
288  std::vector<std::string> strings;
289  std::vector<std::vector<Header>> blocks{1};
290  enum { LINESTART, COMMENT, NAME, VALUE, EOL } state_{LINESTART};
291  bool seenR{false};
292 
293  QIFReader()
294  : Reader(FLAGS_input) {
295  strings.reserve(32768);
296  }
297 
298  ssize_t read() override {
299  ssize_t rc = Reader::read();
300  if (rc != 0) {
301  return rc;
302  }
303  CHECK(blocks.back().empty());
304  blocks.pop_back();
305  return 0;
306  }
307 
308  static bool iseol(uint8_t ch) {
309  return ch == '\r' || ch == '\n';
310  }
311 
312  void onIngress(folly::IOBufQueue& input) override {
313  Cursor c(input.front());
314  while (!c.isAtEnd()) {
315  switch (state_) {
316  case LINESTART:
317  {
318  seenR = false;
319  auto p = c.peek();
320  switch (p.first[0]) {
321  case '#':
322  state_ = COMMENT;
323  break;
324  case '\r':
325  case '\n':
326  if (!blocks.back().empty()) {
327  blocks.emplace_back();
328  }
329  state_ = EOL;
330  break;
331  default:
332  state_ = NAME;
333  strings.emplace_back();
334  }
335  break;
336  }
337  case COMMENT:
338  c.skipWhile([] (uint8_t ch) {
339  return !iseol(ch);
340  });
341  if (!c.isAtEnd()) {
342  state_ = EOL;
343  }
344  break;
345  case EOL:
346  {
347  auto p = c.peek();
348  if (p.first[0] == '\n') {
349  c.skip(1);
350  state_ = LINESTART;
351  } else if (seenR) { // \r followed by anything but \n -> mac newline
352  state_ = LINESTART;
353  } else if (p.first[0] == '\r') {
354  c.skip(1);
355  seenR = true;
356  }
357  break;
358  }
359  case NAME:
360  strings.back() += c.readWhile([] (uint8_t ch) {
361  return ch != '\t';
362  });
363  if (!c.isAtEnd()) {
364  c.skip(1);
365  state_ = VALUE;
366  strings.emplace_back();
367  }
368  break;
369  case VALUE:
370  strings.back() += c.readWhile([] (uint8_t ch) {
371  return !iseol(ch);
372  });
373  if (!c.isAtEnd()) {
374  CHECK_GE(strings.size(), 2);
375  blocks.back().emplace_back(
377  *(strings.rbegin() + 1), *strings.rbegin()));
378  state_ = EOL;
379  }
380  break;
381  }
382  }
383  input.move();
384  }
385 
386 };
387 
388 int interopQIF(QPACKCodec& decoder) {
389  if (FLAGS_mode == "encode") {
390  QIFReader reader;
391  if (reader.read() != 0) {
392  LOG(ERROR) << "Failed to read QIF file='" << FLAGS_input << "'";
393  return 1;
394  }
395  encodeBlocks(decoder, reader.blocks);
396  } else if (FLAGS_mode == "decode") {
397  decodeToQIF(decoder);
398  } else {
399  LOG(ERROR) << "Usage" << std::endl;
400  return 1;
401  }
402 
403  return 0;
404 }
405 
406 }
407 
408 int main(int argc, char** argv) {
409  folly::init(&argc, &argv, true);
410  QPACKCodec decoder;
411  decoder.setMaxBlocking(FLAGS_max_blocking);
412  decoder.setDecoderHeaderTableMaxSize(FLAGS_table_size);
413  if (!FLAGS_har.empty()) {
414  return interopHAR(decoder);
415  } else {
416  return interopQIF(decoder);
417  }
418 }
std::unique_ptr< folly::IOBuf > split(size_t n)
Definition: IOBufQueue.h:420
const folly::IOBuf * front() const
Definition: IOBufQueue.h:476
size_t chainLength() const
Definition: IOBufQueue.h:492
DEFINE_string(output,"compress.out","Output file for encoding")
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
ssize_t readNoInt(int fd, void *buf, size_t count)
Definition: FileUtil.cpp:102
std::unique_ptr< folly::IOBuf > move()
Definition: IOBufQueue.h:459
void setMaxVulnerable(uint32_t maxVulnerable)
Definition: QPACKCodec.h:121
static std::unique_ptr< HTTPArchive > fromPublicFile(const std::string &fname)
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
std::vector< compress::Header > prepareMessageForCompression(const HTTPMessage &msg, std::vector< string > &cookies)
QPACKEncoder::EncodeResult encode(std::vector< compress::Header > &headers, uint64_t id) noexcept
Definition: QPACKCodec.cpp:47
auto ch
void insert(std::unique_ptr< folly::IOBuf > buf)
Definition: Cursor.h:1169
ssize_t writevFull(int fd, iovec *iov, int count)
Definition: FileUtil.cpp:150
folly::Optional< PskKeyExchangeMode > mode
void init(int *argc, char ***argv, bool removeFlags)
Definition: Init.cpp:34
const char * name
Definition: http_parser.c:437
char ** argv
DEFINE_bool(ack, true,"Encoder assumes immediate ack of all frames")
static Options cacheChainLength()
Definition: IOBufQueue.h:83
DEFINE_int32(table_size, 4096,"Dynamic table size")
size_t read(T &out, folly::io::Cursor &cursor)
Definition: Types-inl.h:258
static std::unique_ptr< HTTPArchive > fromFile(const std::string &filename)
HPACK::DecodeError decodeEncoderStream(std::unique_ptr< folly::IOBuf > buf)
Definition: QPACKCodec.h:45
void setDecoderHeaderTableMaxSize(uint32_t size)
Definition: QPACKCodec.h:62
std::unique_ptr< folly::IOBuf > encodeHeaderAck(uint64_t streamId)
Definition: QPACKCodec.h:82
int main(int argc, char **argv)
static const char *const value
Definition: Conv.cpp:50
HPACK::DecodeError decodeDecoderStream(std::unique_ptr< folly::IOBuf > buf)
Definition: QPACKCodec.h:67
void setMaxBlocking(uint32_t maxBlocking)
Definition: QPACKCodec.h:125
std::size_t computeChainDataLength() const
Definition: IOBuf.cpp:501
std::unique_ptr< folly::IOBuf > encodeTableStateSync()
Definition: QPACKCodec.h:78
const char * string
Definition: Conv.cpp:212
void setEncoderHeaderTableSize(uint32_t size)
Definition: QPACKCodec.h:58
void trimStart(size_t amount)
Definition: IOBufQueue.cpp:255
std::vector< HTTPMessage > requests
Definition: HTTPArchive.h:24
string NAME
Definition: tokenize.py:56
static vector< fbstring > strings
char c
const HTTPHeaderSize & getEncodedSize()
Definition: HeaderCodec.h:62
state
Definition: http_parser.c:272
static Header makeHeaderForTest(const std::string &n, const std::string &v)
Definition: Header.h:41
constexpr detail::First first
Definition: Base-inl.h:2553
void decodeStreaming(uint64_t streamId, std::unique_ptr< folly::IOBuf > block, uint32_t length, HPACK::StreamingCallback *streamingCb) noexcept
Definition: QPACKCodec.cpp:56
void writeBE(T value)
Definition: Cursor.h:744