18 #include <sys/queue.h> 21 #include "qmin_common.h" 115 unsigned char sc_buf[0];
119 struct stream_chunks_head sm_chunks;
123 namespace proxygen {
namespace compress {
133 chunk->
sc_sz = bufsz;
134 memcpy(chunk->
sc_buf, buf, bufsz);
141 TAILQ_FOREACH(chunk, &stream->
sm_chunks, sc_next)
143 TAILQ_INSERT_BEFORE(chunk, new_chunk, sc_next);
146 TAILQ_INSERT_TAIL(&stream->
sm_chunks, new_chunk, sc_next);
152 TAILQ_REMOVE(&stream->
sm_chunks, chunk, sc_next);
162 qms_ctl[0].out.qco_write = write_enc2dec;
163 qms_ctl[0].write_off = 0;
165 qms_ctl[1].out.qco_write = write_dec2enc;
166 qms_ctl[1].write_off = 0;
169 qms_idstr = (
char *)malloc(8);
170 sprintf(qms_idstr,
"%u",
s_seq++);
175 qms_streams = (
struct stream *)calloc(2,
sizeof(qms_streams[0]));
177 TAILQ_INIT(&qms_streams[1].sm_chunks);
179 qms_next_stream_id_to_encode = 1;
193 explicit QMINAck(
size_t off,
const void *buf,
size_t bufsz) {
196 memcpy(qma_buf, buf, bufsz);
201 unsigned char qma_buf[0x1000];
206 auto ack = std::make_unique<QMINAck>(
207 qms_ctl[1].write_off, qms_ctl[1].buf, qms_ctl[1].sz);
208 VLOG(4) <<
"sent ACK for instance " << qms_idstr
209 <<
" off: " << qms_ctl[1].write_off <<
"; sz: " << qms_ctl[1].sz;
210 qms_ctl[1].write_off += qms_ctl[1].sz;
219 void recvAck(std::unique_ptr<Ack> generic_ack)
override {
223 auto ack =
dynamic_cast<QMINAck *
>(generic_ack.get());
226 VLOG(4) <<
"received ACK for instance " << qms_idstr
227 <<
" off: " << ack->qma_off <<
"; sz: " << ack->qma_sz;
229 chunk = stream_chunk_new(ack->qma_off, ack->qma_buf, ack->qma_sz);
230 insert_chunk(&qms_streams[0], chunk);
232 while ((chunk = maybe_pop_chunk(&qms_streams[0]))) {
235 if (nread < 0 || (
size_t)nread != chunk->
sc_sz) {
236 VLOG(1) <<
"error: qmin_enc_cmds_in failed";
243 std::pair<FrameFlags, std::unique_ptr<folly::IOBuf>>
encode(
245 std::vector<compress::Header> allHeaders,
247 const size_t max_ctl = 0x1000;
248 const size_t max_comp = 0x1000;
249 unsigned char outbuf[max_ctl + max_comp];
250 unsigned char *
const comp = outbuf + max_ctl;
255 qms_ctl[0].out.qco_ctx =
this;
258 for (
const auto header : allHeaders) {
262 qms_next_stream_id_to_encode,
265 header.value->c_str(),
266 header.value->length(),
281 VLOG(1) <<
"compressed header does not fit into temporary " 283 return {
flags,
nullptr};
285 VLOG(1) <<
"error: " << strerror(errno);
287 return {
flags,
nullptr};
294 VLOG(4) <<
"encoder state: " <<
state;
299 VLOG(1) <<
"error: qmin_enc_end_stream_headers failed";
304 size_t ctl_msg_sz = qms_ctl[0].sz;
306 size_t ctl_msg_sz_with_off;
308 memcpy(outbuf + max_ctl - ctl_msg_sz, qms_ctl[0].buf, ctl_msg_sz);
309 memcpy(outbuf + max_ctl - ctl_msg_sz -
sizeof(qms_ctl[0].write_off),
310 &qms_ctl[0].write_off,
311 sizeof(qms_ctl[0].write_off));
312 qms_ctl[0].write_off += ctl_msg_sz;
313 ctl_msg_sz_with_off = ctl_msg_sz +
sizeof(qms_ctl[0].write_off);
315 ctl_msg_sz_with_off = 0;
316 memcpy(outbuf + max_ctl - ctl_msg_sz_with_off -
sizeof(ctl_msg_sz),
323 memcpy(outbuf + max_ctl - ctl_msg_sz_with_off -
sizeof(ctl_msg_sz) -
325 &qms_next_stream_id_to_encode,
326 sizeof(qms_next_stream_id_to_encode));
328 qms_next_stream_id_to_encode += 2;
333 sizeof(ctl_msg_sz) -
sizeof(
uint32_t),
334 comp_sz + ctl_msg_sz_with_off +
335 sizeof(ctl_msg_sz) +
sizeof(
uint32_t))};
339 std::unique_ptr<folly::IOBuf> encodedReq,
343 const unsigned char *buf;
345 size_t ctl_sz, stream_off;
347 unsigned name_len, val_len;
348 unsigned decoded_size = 0;
351 qms_ctl[1].out.qco_ctx =
this;
355 memcpy(&stream_id, buf,
sizeof(
uint32_t));
360 memcpy(&ctl_sz, buf,
sizeof(ctl_sz));
369 memcpy(&stream_off, buf,
sizeof(stream_off));
370 encodedReq->
trimStart(
sizeof(stream_off));
373 chunk = stream_chunk_new(stream_off, buf, ctl_sz);
376 insert_chunk(&qms_streams[1], chunk);
378 while ((chunk = maybe_pop_chunk(&qms_streams[1]))) {
380 if (nread < 0 || (
size_t)nread != chunk->
sc_sz) {
381 VLOG(1) <<
"error: qmin_dec_cmds_in failed";
389 const unsigned char *
const end = buf + cursor.length();
393 qms_dec, buf, end - buf, outbuf,
sizeof(outbuf), &name_len, &val_len);
395 VLOG(1) <<
"error: decoder failed!";
401 decoded_size += name_len + val_len;
409 VLOG(1) <<
"error: qmin_dec_stream_done failed";
423 CompressionScheme::runLoopCallback();
427 size_t avail =
sizeof(qms_ctl[idx].buf) - qms_ctl[idx].sz;
430 VLOG(1) <<
"Truncating control message from " << sz <<
" to " << avail
434 memcpy(qms_ctl[idx].buf + qms_ctl[idx].sz, buf, sz);
435 qms_ctl[idx].sz += sz;
436 VLOG(4) <<
"Wrote " << sz <<
" bytes to control channel";
466 unsigned char buf[0x1000];
void decode(FrameFlags, std::unique_ptr< folly::IOBuf > encodedReq, SimStats &, SimStreamingCallback &callback) override
void onHeader(const folly::fbstring &name, const folly::fbstring &value) override
struct stream_chunks_head sm_chunks
unsigned qms_next_stream_id_to_encode
static int qmin_enc_end_stream_headers(struct qmin_enc *)
void recvAck(std::unique_ptr< Ack > generic_ack) override
constexpr detail::Map< Move > move
static void qmin_dec_destroy(struct qmin_dec *)
std::pair< FrameFlags, std::unique_ptr< folly::IOBuf > > encode(bool, std::vector< compress::Header > allHeaders, SimStats &stats) override
void write_ctl_msg(const void *buf, size_t sz, unsigned idx)
static void insert_chunk(struct stream *stream, struct stream_chunk *new_chunk)
requires E e noexcept(noexcept(s.error(std::move(e))))
PUSHMI_INLINE_VAR constexpr detail::transform_fn transform
static void qmin_enc_destroy(struct qmin_enc *)
struct qmin_dec * qms_dec
static ssize_t qmin_enc_cmds_in(struct qmin_enc *, const void *, size_t)
static ssize_t qmin_dec_decode(struct qmin_dec *, const void *, size_t, char *, size_t, unsigned *, unsigned *)
auto end(TestAdlIterable &instance)
struct qmin_enc * qms_enc
struct stream * qms_streams
static char * qmin_enc_to_str(struct qmin_enc *, size_t *)
void runLoopCallback() noexceptoverride
std::unique_ptr< Ack > getAck(uint16_t) override
static const char *const value
void(* qco_write)(void *qco_ctx, const void *, size_t)
void onHeadersComplete(HTTPHeaderSize) override
static void write_enc2dec(void *ctx, const void *buf, size_t sz)
std::size_t computeChainDataLength() const
static int qmin_dec_stream_done(struct qmin_dec *, unsigned)
uint32_t getHolBlockCount() const override
static struct stream_chunk * maybe_pop_chunk(struct stream *stream)
TAILQ_HEAD(stream_chunks_head, stream_chunk)
static enum qmin_encode_status qmin_enc_encode(struct qmin_enc *, unsigned, const char *, unsigned, const char *, unsigned, enum qmin_index_type, unsigned char *, size_t, size_t *)
QMINAck(size_t off, const void *buf, size_t bufsz)
QMINScheme(CompressionSimulator *sim, uint32_t)
void trimStart(std::size_t amount)
static struct qmin_enc * qmin_enc_new(int, unsigned, const struct qmin_ctl_out *, const char *)
static std::unique_ptr< IOBuf > copyBuffer(const void *buf, std::size_t size, std::size_t headroom=0, std::size_t minTailroom=0)
static void write_dec2enc(void *ctx, const void *buf, size_t sz)
static struct stream_chunk * stream_chunk_new(size_t off, const void *buf, size_t bufsz)
static struct qmin_dec * qmin_dec_new(int, unsigned, const struct qmin_ctl_out *, const char *)
static ssize_t qmin_dec_cmds_in(struct qmin_dec *, const void *, size_t)