proxygen
QMINScheme.h
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2015-present, Facebook, Inc.
3  * All rights reserved.
4  *
5  * This source code is licensed under the BSD-style license found in the
6  * LICENSE file in the root directory of this source tree. An additional grant
7  * of patent rights can be found in the PATENTS file in the same directory.
8  *
9  */
10 #pragma once
11 
12 #include <assert.h>
13 #include <folly/String.h>
18 #include <sys/queue.h>
19 
20 #ifdef HAVE_REAL_QMIN
21 #include "qmin_common.h"
22 #include "qmin_dec.h"
23 #include "qmin_enc.h"
24 #else
25 /* Stub implementation for when you don't have QMIN */
26 extern "C" {
31 };
32 enum {
35 };
40 };
41 struct qmin_ctl_out {
42  void (*qco_write)(void *qco_ctx, const void *, size_t);
43  void *qco_ctx;
44 };
45 struct qmin_enc;
46 static struct qmin_enc *qmin_enc_new(int /*side*/,
47  unsigned /*max_capacity*/,
48  const struct qmin_ctl_out * /*ctl_out*/,
49  const char * /*idstr*/) {
50  return NULL;
51 }
52 static ssize_t qmin_enc_cmds_in(struct qmin_enc * /*enc*/,
53  const void * /*buf*/,
54  size_t /*bufsz*/) {
55  return -1;
56 }
57 static enum qmin_encode_status qmin_enc_encode(struct qmin_enc * /*enc*/,
58  unsigned /*stream_id*/,
59  const char * /*name*/,
60  unsigned /*name_len*/,
61  const char * /*value*/,
62  unsigned /*value_len*/,
63  enum qmin_index_type /*ix_type*/,
64  unsigned char * /*dst*/,
65  size_t /*dst_sz*/,
66  size_t * /*n_written*/) {
67  return QES_ERR;
68 }
69 static int qmin_enc_end_stream_headers(struct qmin_enc * /*enc*/) {
70  return -1;
71 }
72 static char *qmin_enc_to_str(struct qmin_enc * /*enc*/, size_t * /*size*/) {
73  return strdup("");
74 }
75 static void qmin_enc_destroy(struct qmin_enc * /*enc*/) {
76 }
77 struct qmin_dec;
78 static struct qmin_dec *qmin_dec_new(int /*side*/,
79  unsigned /*max_capacity*/,
80  const struct qmin_ctl_out * /*ctl_out*/,
81  const char * /*idstr*/) {
82  return NULL;
83 }
84 static ssize_t qmin_dec_cmds_in(struct qmin_dec * /*dec*/,
85  const void * /*buf*/,
86  size_t /*bufsz*/) {
87  return -1;
88 }
89 static ssize_t qmin_dec_decode(struct qmin_dec * /*dec*/,
90  const void * /*void_src*/,
91  size_t /*src_sz*/,
92  char * /*dst*/,
93  size_t /*dst_sz*/,
94  unsigned * /*name_len*/,
95  unsigned * /*val_len*/) {
96  return -1;
97 }
98 static int qmin_dec_stream_done(struct qmin_dec * /*dec*/,
99  unsigned /*stream_id*/) {
100  return -1;
101 }
102 static void qmin_dec_destroy(struct qmin_dec * /*dec*/) {
103 }
104 }
105 #endif
106 
107 static unsigned s_seq;
108 
109 TAILQ_HEAD(stream_chunks_head, stream_chunk);
110 
111 struct stream_chunk {
112  TAILQ_ENTRY(stream_chunk) sc_next;
113  size_t sc_off;
114  size_t sc_sz;
115  unsigned char sc_buf[0];
116 };
117 
118 struct stream {
119  struct stream_chunks_head sm_chunks;
120  size_t sm_read_off;
121 };
122 
123 namespace proxygen { namespace compress {
125  public:
126  static struct stream_chunk *stream_chunk_new(size_t off,
127  const void *buf,
128  size_t bufsz) {
129  struct stream_chunk *chunk =
130  (struct stream_chunk *)malloc(sizeof(*chunk) + bufsz);
131  assert(chunk);
132  chunk->sc_off = off;
133  chunk->sc_sz = bufsz;
134  memcpy(chunk->sc_buf, buf, bufsz);
135  return chunk;
136  }
137 
138  static void insert_chunk(struct stream *stream,
139  struct stream_chunk *new_chunk) {
140  struct stream_chunk *chunk;
141  TAILQ_FOREACH(chunk, &stream->sm_chunks, sc_next)
142  if (chunk->sc_off > new_chunk->sc_off) {
143  TAILQ_INSERT_BEFORE(chunk, new_chunk, sc_next);
144  return;
145  }
146  TAILQ_INSERT_TAIL(&stream->sm_chunks, new_chunk, sc_next);
147  }
148 
149  static struct stream_chunk *maybe_pop_chunk(struct stream *stream) {
150  struct stream_chunk *chunk = TAILQ_FIRST(&stream->sm_chunks);
151  if (chunk && chunk->sc_off == stream->sm_read_off) {
152  TAILQ_REMOVE(&stream->sm_chunks, chunk, sc_next);
153  stream->sm_read_off += chunk->sc_sz;
154  return chunk;
155  } else
156  return NULL;
157  }
158 
159  explicit QMINScheme(CompressionSimulator *sim, uint32_t /*tableSize*/)
160  : CompressionScheme(sim) {
161  // TODO: set table size?
162  qms_ctl[0].out.qco_write = write_enc2dec;
163  qms_ctl[0].write_off = 0;
164  qms_ctl[0].sz = 0;
165  qms_ctl[1].out.qco_write = write_dec2enc;
166  qms_ctl[1].write_off = 0;
167  qms_ctl[1].sz = 0;
168 
169  qms_idstr = (char *)malloc(8);
170  sprintf(qms_idstr, "%u", s_seq++);
171 
172  qms_enc = qmin_enc_new(QSIDE_CLIENT, 4 * 1024, &qms_ctl[0].out, qms_idstr);
173  qms_dec = qmin_dec_new(QSIDE_SERVER, 4 * 1024, &qms_ctl[1].out, qms_idstr);
174 
175  qms_streams = (struct stream *)calloc(2, sizeof(qms_streams[0]));
176  TAILQ_INIT(&qms_streams[0].sm_chunks);
177  TAILQ_INIT(&qms_streams[1].sm_chunks);
178 
179  qms_next_stream_id_to_encode = 1;
180  }
181 
183  free(qms_streams);
184  qmin_enc_destroy(qms_enc);
185  qmin_dec_destroy(qms_dec);
186  free(qms_idstr);
187  }
188 
189  /* QMIN Ack carries QMM_STREAM_DONE and QMM_ACK_FLUSH messages from decoder
190  * to the encoder.
191  */
192  struct QMINAck : public CompressionScheme::Ack {
193  explicit QMINAck(size_t off, const void *buf, size_t bufsz) {
194  qma_off = off;
195  qma_sz = bufsz;
196  memcpy(qma_buf, buf, bufsz);
197  }
198 
199  size_t qma_off;
200  size_t qma_sz;
201  unsigned char qma_buf[0x1000];
202  };
203 
204  std::unique_ptr<Ack> getAck(uint16_t /*seqn*/) override {
205  if (qms_ctl[1].sz) {
206  auto ack = std::make_unique<QMINAck>(
207  qms_ctl[1].write_off, qms_ctl[1].buf, qms_ctl[1].sz);
208  VLOG(4) << "sent ACK for instance " << qms_idstr
209  << " off: " << qms_ctl[1].write_off << "; sz: " << qms_ctl[1].sz;
210  qms_ctl[1].write_off += qms_ctl[1].sz;
211  qms_ctl[1].sz = 0;
212  return std::move(ack);
213  } else {
214  assert(0);
215  return nullptr;
216  }
217  }
218 
219  void recvAck(std::unique_ptr<Ack> generic_ack) override {
220  struct stream_chunk *chunk;
221 
222  CHECK(generic_ack);
223  auto ack = dynamic_cast<QMINAck *>(generic_ack.get());
224  CHECK_NOTNULL(ack);
225 
226  VLOG(4) << "received ACK for instance " << qms_idstr
227  << " off: " << ack->qma_off << "; sz: " << ack->qma_sz;
228 
229  chunk = stream_chunk_new(ack->qma_off, ack->qma_buf, ack->qma_sz);
230  insert_chunk(&qms_streams[0], chunk);
231 
232  while ((chunk = maybe_pop_chunk(&qms_streams[0]))) {
233  ssize_t nread;
234  nread = qmin_enc_cmds_in(qms_enc, chunk->sc_buf, chunk->sc_sz);
235  if (nread < 0 || (size_t)nread != chunk->sc_sz) {
236  VLOG(1) << "error: qmin_enc_cmds_in failed";
237  assert(0);
238  }
239  free(chunk);
240  }
241  }
242 
243  std::pair<FrameFlags, std::unique_ptr<folly::IOBuf>> encode(
244  bool /*newPacket*/,
245  std::vector<compress::Header> allHeaders,
246  SimStats &stats) override {
247  const size_t max_ctl = 0x1000;
248  const size_t max_comp = 0x1000;
249  unsigned char outbuf[max_ctl + max_comp];
250  unsigned char *const comp = outbuf + max_ctl;
251  size_t nw, comp_sz;
252  enum qmin_encode_status qes;
254 
255  qms_ctl[0].out.qco_ctx = this;
256  comp_sz = 0;
257 
258  for (const auto header : allHeaders) {
259  std::string name{header.name->c_str()};
260  std::transform(name.begin(), name.end(), name.begin(), ::tolower);
261  qes = qmin_enc_encode(qms_enc,
262  qms_next_stream_id_to_encode,
263  name.c_str(),
264  name.length(),
265  header.value->c_str(),
266  header.value->length(),
267  QIT_YES,
268  comp + comp_sz,
269  max_comp - comp_sz,
270  &nw);
271  switch (qes) {
272  case QES_OK:
273  /* 2 is a magic number added to the uncompressed size by the other
274  * encoder. We follow suit to make the numbers match.
275  */
276  stats.uncompressed += name.length() + header.value->length() + 2;
277  stats.compressed += nw;
278  comp_sz += nw;
279  break;
280  case QES_NOBUFS:
281  VLOG(1) << "compressed header does not fit into temporary "
282  "output buffer";
283  return {flags, nullptr};
284  case QES_ERR:
285  VLOG(1) << "error: " << strerror(errno);
286  assert(0);
287  return {flags, nullptr};
288  }
289  }
290 
291  {
292  size_t sz;
293  char *state = qmin_enc_to_str(qms_enc, &sz);
294  VLOG(4) << "encoder state: " << state;
295  free(state);
296  }
297 
298  if (0 != qmin_enc_end_stream_headers(qms_enc)) {
299  VLOG(1) << "error: qmin_enc_end_stream_headers failed";
300  assert(0);
301  }
302 
303  /* Prepend control message and its size: */
304  size_t ctl_msg_sz = qms_ctl[0].sz;
305  qms_ctl[0].sz = 0;
306  size_t ctl_msg_sz_with_off;
307  if (ctl_msg_sz) {
308  memcpy(outbuf + max_ctl - ctl_msg_sz, qms_ctl[0].buf, ctl_msg_sz);
309  memcpy(outbuf + max_ctl - ctl_msg_sz - sizeof(qms_ctl[0].write_off),
310  &qms_ctl[0].write_off,
311  sizeof(qms_ctl[0].write_off));
312  qms_ctl[0].write_off += ctl_msg_sz;
313  ctl_msg_sz_with_off = ctl_msg_sz + sizeof(qms_ctl[0].write_off);
314  } else
315  ctl_msg_sz_with_off = 0;
316  memcpy(outbuf + max_ctl - ctl_msg_sz_with_off - sizeof(ctl_msg_sz),
317  &ctl_msg_sz,
318  sizeof(ctl_msg_sz));
319 
320  stats.compressed += ctl_msg_sz;
321 
322  /* Prepend Stream ID: */
323  memcpy(outbuf + max_ctl - ctl_msg_sz_with_off - sizeof(ctl_msg_sz) -
324  sizeof(uint32_t),
325  &qms_next_stream_id_to_encode,
326  sizeof(qms_next_stream_id_to_encode));
327 
328  qms_next_stream_id_to_encode += 2;
329  flags.allowOOO = true;
330  return {
331  flags,
332  folly::IOBuf::copyBuffer(outbuf + max_ctl - ctl_msg_sz_with_off -
333  sizeof(ctl_msg_sz) - sizeof(uint32_t),
334  comp_sz + ctl_msg_sz_with_off +
335  sizeof(ctl_msg_sz) + sizeof(uint32_t))};
336  }
337 
339  std::unique_ptr<folly::IOBuf> encodedReq,
340  SimStats &,
341  SimStreamingCallback &callback) override {
342  folly::io::Cursor cursor(encodedReq.get());
343  const unsigned char *buf;
344  ssize_t nread;
345  size_t ctl_sz, stream_off;
346  char outbuf[0x1000];
347  unsigned name_len, val_len;
348  unsigned decoded_size = 0;
349  uint32_t stream_id;
350 
351  qms_ctl[1].out.qco_ctx = this;
352 
353  /* Read Stream ID: */
354  buf = cursor.data();
355  memcpy(&stream_id, buf, sizeof(uint32_t));
356  encodedReq->trimStart(sizeof(uint32_t));
357 
358  /* Read size of control messages */
359  buf = cursor.data();
360  memcpy(&ctl_sz, buf, sizeof(ctl_sz));
361  encodedReq->trimStart(sizeof(ctl_sz));
362 
363  /* Feed control messages to the decoder: */
364  if (ctl_sz) {
365  struct stream_chunk *chunk;
366 
367  /* Read stream offset: */
368  buf = cursor.data();
369  memcpy(&stream_off, buf, sizeof(stream_off));
370  encodedReq->trimStart(sizeof(stream_off));
371 
372  buf = cursor.data();
373  chunk = stream_chunk_new(stream_off, buf, ctl_sz);
374  encodedReq->trimStart(ctl_sz);
375 
376  insert_chunk(&qms_streams[1], chunk);
377 
378  while ((chunk = maybe_pop_chunk(&qms_streams[1]))) {
379  nread = qmin_dec_cmds_in(qms_dec, chunk->sc_buf, chunk->sc_sz);
380  if (nread < 0 || (size_t)nread != chunk->sc_sz) {
381  VLOG(1) << "error: qmin_dec_cmds_in failed";
382  assert(0);
383  }
384  free(chunk);
385  }
386  }
387 
388  buf = cursor.data();
389  const unsigned char *const end = buf + cursor.length();
390 
391  while (buf < end) {
392  nread = qmin_dec_decode(
393  qms_dec, buf, end - buf, outbuf, sizeof(outbuf), &name_len, &val_len);
394  if (nread < 0) {
395  VLOG(1) << "error: decoder failed!";
396  assert(0);
397  return;
398  }
399  assert(nread);
400  buf += nread;
401  decoded_size += name_len + val_len;
402  std::string name{outbuf, name_len};
403  std::string value{outbuf + name_len, val_len};
404  callback.onHeader(name, value);
405  }
406 
407  if (0 != qmin_dec_stream_done(qms_dec, stream_id)) {
408  assert(0);
409  VLOG(1) << "error: qmin_dec_stream_done failed";
410  }
411 
413  sz.compressed = encodedReq->computeChainDataLength();
414  sz.uncompressed = decoded_size;
415  callback.onHeadersComplete(sz);
416  }
417 
418  uint32_t getHolBlockCount() const override {
419  return 0;
420  }
421 
422  void runLoopCallback() noexcept override {
423  CompressionScheme::runLoopCallback();
424  }
425 
426  void write_ctl_msg(const void *buf, size_t sz, unsigned idx) {
427  size_t avail = sizeof(qms_ctl[idx].buf) - qms_ctl[idx].sz;
428  assert(avail >= sz);
429  if (avail < sz) {
430  VLOG(1) << "Truncating control message from " << sz << " to " << avail
431  << "bytes";
432  sz = avail;
433  }
434  memcpy(qms_ctl[idx].buf + qms_ctl[idx].sz, buf, sz);
435  qms_ctl[idx].sz += sz;
436  VLOG(4) << "Wrote " << sz << " bytes to control channel";
437  }
438 
439  static void write_enc2dec(void *ctx, const void *buf, size_t sz) {
440  QMINScheme *const qms = (QMINScheme *)ctx;
441  qms->write_ctl_msg(buf, sz, 0);
442  }
443 
444  static void write_dec2enc(void *ctx, const void *buf, size_t sz) {
445  QMINScheme *const qms = (QMINScheme *)ctx;
446  qms->write_ctl_msg(buf, sz, 1);
447  }
448 
449  char *qms_idstr;
450 
451  struct qmin_enc *qms_enc;
452  struct qmin_dec *qms_dec;
453 
454  /* Each call to `encode' is interpreted as a header block for a new
455  * stream.
456  */
458 
459  /* 0: decoder-to-encoder; 1: encoder-to-decoder */
461 
462  struct {
463  struct qmin_ctl_out out;
464  size_t write_off;
465  size_t sz;
466  unsigned char buf[0x1000];
467  } qms_ctl[2]; /* 0: enc-to-dec; 1: dec-to-enc */
468 };
469 }} // namespace proxygen::compress
static unsigned s_seq
Definition: QMINScheme.h:107
void decode(FrameFlags, std::unique_ptr< folly::IOBuf > encodedReq, SimStats &, SimStreamingCallback &callback) override
Definition: QMINScheme.h:338
flags
Definition: http_parser.h:127
void onHeader(const folly::fbstring &name, const folly::fbstring &value) override
struct stream_chunks_head sm_chunks
Definition: QMINScheme.h:119
static int qmin_enc_end_stream_headers(struct qmin_enc *)
Definition: QMINScheme.h:69
void recvAck(std::unique_ptr< Ack > generic_ack) override
Definition: QMINScheme.h:219
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
static void qmin_dec_destroy(struct qmin_dec *)
Definition: QMINScheme.h:102
std::pair< FrameFlags, std::unique_ptr< folly::IOBuf > > encode(bool, std::vector< compress::Header > allHeaders, SimStats &stats) override
Definition: QMINScheme.h:243
void write_ctl_msg(const void *buf, size_t sz, unsigned idx)
Definition: QMINScheme.h:426
static void insert_chunk(struct stream *stream, struct stream_chunk *new_chunk)
Definition: QMINScheme.h:138
requires E e noexcept(noexcept(s.error(std::move(e))))
void * qco_ctx
Definition: QMINScheme.h:43
PUSHMI_INLINE_VAR constexpr detail::transform_fn transform
Definition: transform.h:158
static void qmin_enc_destroy(struct qmin_enc *)
Definition: QMINScheme.h:75
struct qmin_dec * qms_dec
Definition: QMINScheme.h:452
static ssize_t qmin_enc_cmds_in(struct qmin_enc *, const void *, size_t)
Definition: QMINScheme.h:52
const char * name
Definition: http_parser.c:437
static ssize_t qmin_dec_decode(struct qmin_dec *, const void *, size_t, char *, size_t, unsigned *, unsigned *)
Definition: QMINScheme.h:89
size_t sc_sz
Definition: QMINScheme.h:114
auto end(TestAdlIterable &instance)
Definition: ForeachTest.cpp:62
struct qmin_enc * qms_enc
Definition: QMINScheme.h:451
static char * qmin_enc_to_str(struct qmin_enc *, size_t *)
Definition: QMINScheme.h:72
void runLoopCallback() noexceptoverride
Definition: QMINScheme.h:422
std::unique_ptr< Ack > getAck(uint16_t) override
Definition: QMINScheme.h:204
qmin_index_type
Definition: QMINScheme.h:27
static const char *const value
Definition: Conv.cpp:50
void(* qco_write)(void *qco_ctx, const void *, size_t)
Definition: QMINScheme.h:42
void free()
size_t sm_read_off
Definition: QMINScheme.h:120
void onHeadersComplete(HTTPHeaderSize) override
static void write_enc2dec(void *ctx, const void *buf, size_t sz)
Definition: QMINScheme.h:439
std::size_t computeChainDataLength() const
Definition: IOBuf.cpp:501
static int qmin_dec_stream_done(struct qmin_dec *, unsigned)
Definition: QMINScheme.h:98
uint32_t getHolBlockCount() const override
Definition: QMINScheme.h:418
size_t sc_off
Definition: QMINScheme.h:113
static struct stream_chunk * maybe_pop_chunk(struct stream *stream)
Definition: QMINScheme.h:149
TAILQ_HEAD(stream_chunks_head, stream_chunk)
const char * string
Definition: Conv.cpp:212
static enum qmin_encode_status qmin_enc_encode(struct qmin_enc *, unsigned, const char *, unsigned, const char *, unsigned, enum qmin_index_type, unsigned char *, size_t, size_t *)
Definition: QMINScheme.h:57
qmin_encode_status
Definition: QMINScheme.h:36
QMINAck(size_t off, const void *buf, size_t bufsz)
Definition: QMINScheme.h:193
QMINScheme(CompressionSimulator *sim, uint32_t)
Definition: QMINScheme.h:159
unsigned char sc_buf[0]
Definition: QMINScheme.h:115
void trimStart(std::size_t amount)
Definition: IOBuf.h:703
static struct qmin_enc * qmin_enc_new(int, unsigned, const struct qmin_ctl_out *, const char *)
Definition: QMINScheme.h:46
static std::unique_ptr< IOBuf > copyBuffer(const void *buf, std::size_t size, std::size_t headroom=0, std::size_t minTailroom=0)
Definition: IOBuf.h:1587
static void write_dec2enc(void *ctx, const void *buf, size_t sz)
Definition: QMINScheme.h:444
static struct stream_chunk * stream_chunk_new(size_t off, const void *buf, size_t bufsz)
Definition: QMINScheme.h:126
static struct qmin_dec * qmin_dec_new(int, unsigned, const struct qmin_ctl_out *, const char *)
Definition: QMINScheme.h:78
state
Definition: http_parser.c:272
static ssize_t qmin_dec_cmds_in(struct qmin_dec *, const void *, size_t)
Definition: QMINScheme.h:84