proxygen
ZlibStreamCompressor.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2015-present, Facebook, Inc.
3  * All rights reserved.
4  *
5  * This source code is licensed under the BSD-style license found in the
6  * LICENSE file in the root directory of this source tree. An additional grant
7  * of patent rights can be found in the PATENTS file in the same directory.
8  *
9  */
11 
12 #include <folly/lang/Bits.h>
13 #include <folly/io/Cursor.h>
14 
15 using namespace folly;
16 using folly::IOBuf;
17 using std::unique_ptr;
18 
19 // IOBuf uses 24 bytes of data for bookeeping purposes, so requesting for 4073
20 // bytes of data will be rounded up to an allocation of 1 page.
21 DEFINE_int64(zlib_compressor_buffer_growth,
22  2024,
23  "The buffer growth size to use during IOBuf zlib deflation");
24 
25 namespace proxygen {
26 
27 namespace {
28 
29 std::unique_ptr<IOBuf> addOutputBuffer(z_stream* stream, uint32_t length) {
30  CHECK_EQ(stream->avail_out, 0);
31 
32  auto buf = IOBuf::create(length);
33  buf->append(buf->capacity());
34 
35  stream->next_out = buf->writableData();
36  stream->avail_out = buf->length();
37 
38  return buf;
39 }
40 
41 int deflateHelper(z_stream* stream, IOBuf* out, int flush) {
42  if (stream->avail_out == 0) {
43  out->prependChain(
44  addOutputBuffer(stream, FLAGS_zlib_compressor_buffer_growth));
45  }
46 
47  return deflate(stream, flush);
48 }
49 }
50 
52 
54  << "Attempt to re-initialize compression stream";
55 
56  type_ = type;
57  level_ = level;
58  status_ = Z_OK;
59 
60  zlibStream_.zalloc = Z_NULL;
61  zlibStream_.zfree = Z_NULL;
62  zlibStream_.opaque = Z_NULL;
63  zlibStream_.total_in = 0;
64  zlibStream_.next_in = Z_NULL;
65  zlibStream_.avail_in = 0;
66  zlibStream_.avail_out = 0;
67  zlibStream_.next_out = Z_NULL;
68 
69  DCHECK(level_ == Z_DEFAULT_COMPRESSION ||
70  (level_ >= Z_NO_COMPRESSION && level_ <= Z_BEST_COMPRESSION))
71  << "Invalid Zlib compression level. level=" << level_;
72 
73  switch (type_) {
74  case ZlibCompressionType::GZIP:
75  status_ = deflateInit2(&zlibStream_,
76  level_,
77  Z_DEFLATED,
78  static_cast<int32_t>(type),
79  MAX_MEM_LEVEL,
80  Z_DEFAULT_STRATEGY);
81  break;
82  case ZlibCompressionType::DEFLATE:
83  status_ = deflateInit(&zlibStream_, level);
84  break;
85  default:
86  DCHECK(false) << "Unsupported zlib compression type.";
87  break;
88  }
89 
90  if (status_ != Z_OK) {
91  LOG(ERROR) << "error initializing zlib stream. r=" << status_;
92  }
93 }
94 
95 ZlibStreamCompressor::ZlibStreamCompressor(ZlibCompressionType type, int level)
96  : status_(Z_OK) {
97  init(type, level);
98 }
99 
102  status_ = deflateEnd(&zlibStream_);
103  }
104 }
105 
106 // Compress an IOBuf chain. Compress can be called multiple times and the
107 // Zlib stream will be synced after each call. trailer must be set to
108 // true on the final compression call.
109 std::unique_ptr<IOBuf> ZlibStreamCompressor::compress(const IOBuf* in,
110  bool trailer) {
111  auto bufferLength = FLAGS_zlib_compressor_buffer_growth;
112 
113  auto out = addOutputBuffer(&zlibStream_, bufferLength);
114 
115  for (auto& range : *in) {
116  uint64_t remaining = range.size();
117  uint64_t written = 0;
118  while (remaining) {
119  uint32_t step = remaining;
120  zlibStream_.next_in = const_cast<uint8_t*>(range.data() + written);
121  zlibStream_.avail_in = step;
122  remaining -= step;
123  written += step;
124 
125  while (zlibStream_.avail_in != 0) {
126  status_ = deflateHelper(&zlibStream_, out.get(), Z_NO_FLUSH);
127  if (status_ != Z_OK) {
128  DLOG(FATAL) << "Deflate failed: " << zlibStream_.msg;
129  return nullptr;
130  }
131  }
132  }
133  }
134 
135  if (trailer) {
136  do {
137  status_ = deflateHelper(&zlibStream_, out.get(), Z_FINISH);
138  } while (status_ == Z_OK);
139 
140  if (status_ != Z_STREAM_END) {
141  DLOG(FATAL) << "Deflate failed: " << zlibStream_.msg;
142  return nullptr;
143  }
144  } else {
145  do {
146  status_ = deflateHelper(&zlibStream_, out.get(), Z_SYNC_FLUSH);
147  } while (zlibStream_.avail_out == 0);
148 
149  if (status_ != Z_OK) {
150  DLOG(FATAL) << "Deflate failed: " << zlibStream_.msg;
151  return nullptr;
152  }
153  }
154 
155  out->prev()->trimEnd(zlibStream_.avail_out);
156 
157  zlibStream_.next_out = Z_NULL;
158  zlibStream_.avail_out = 0;
159 
160  return out;
161 }
162 }
void init(ZlibCompressionType type, int level)
static std::unique_ptr< IOBuf > create(std::size_t capacity)
Definition: IOBuf.cpp:229
PskType type
std::unique_ptr< folly::IOBuf > compress(const folly::IOBuf *in, bool trailer=true)
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
void init()
DEFINE_int64(threadtimeout_ms, 60000,"Idle time before ThreadPoolExecutor threads are joined")
static std::unique_ptr< IOBuf > addOutputBuffer(MutableByteRange &output, uint64_t size)
Gen range(Value begin, Value end)
Definition: Base.h:467
Type type_
Definition: JSONSchema.cpp:208
void prependChain(std::unique_ptr< IOBuf > &&iobuf)
Definition: IOBuf.cpp:509
IOBuf * prev()
Definition: IOBuf.h:610
void trimEnd(std::size_t amount)
Definition: IOBuf.h:718