proxygen
Zstd.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2018-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include <folly/compression/Zstd.h>
17 
18 #if FOLLY_HAVE_LIBZSTD
19 
20 #include <stdexcept>
21 #include <string>
22 
23 #include <zstd.h>
24 
25 #include <folly/Conv.h>
26 #include <folly/Range.h>
27 #include <folly/ScopeGuard.h>
29 
30 static_assert(
31  ZSTD_VERSION_NUMBER >= 10302,
32  "zstd-1.3.2 is the minimum supported zstd version.");
33 
36 
37 namespace folly {
38 namespace io {
39 namespace zstd {
40 namespace {
41 
42 void zstdFreeCCtx(ZSTD_CCtx* zc) {
43  ZSTD_freeCCtx(zc);
44 }
45 
46 void zstdFreeDCtx(ZSTD_DCtx* zd) {
47  ZSTD_freeDCtx(zd);
48 }
49 
50 size_t zstdThrowIfError(size_t rc) {
51  if (!ZSTD_isError(rc)) {
52  return rc;
53  }
54  throw std::runtime_error(
55  to<std::string>("ZSTD returned an error: ", ZSTD_getErrorName(rc)));
56 }
57 
58 ZSTD_EndDirective zstdTranslateFlush(StreamCodec::FlushOp flush) {
59  switch (flush) {
61  return ZSTD_e_continue;
63  return ZSTD_e_flush;
65  return ZSTD_e_end;
66  default:
67  throw std::invalid_argument("ZSTDStreamCodec: Invalid flush");
68  }
69 }
70 
71 class ZSTDStreamCodec final : public StreamCodec {
72  public:
73  explicit ZSTDStreamCodec(Options options);
74 
75  std::vector<std::string> validPrefixes() const override;
76  bool canUncompress(const IOBuf* data, Optional<uint64_t> uncompressedLength)
77  const override;
78 
79  private:
80  bool doNeedsUncompressedLength() const override;
81  uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
82  Optional<uint64_t> doGetUncompressedLength(
83  IOBuf const* data,
84  Optional<uint64_t> uncompressedLength) const override;
85 
86  void doResetStream() override;
87  bool doCompressStream(
88  ByteRange& input,
90  StreamCodec::FlushOp flushOp) override;
91  bool doUncompressStream(
92  ByteRange& input,
94  StreamCodec::FlushOp flushOp) override;
95 
96  void resetCCtx();
97  void resetDCtx();
98 
99  Options options_;
100  bool needReset_{true};
101  std::unique_ptr<
102  ZSTD_CCtx,
104  cctx_{nullptr};
105  std::unique_ptr<
106  ZSTD_DCtx,
108  dctx_{nullptr};
109 };
110 
111 constexpr uint32_t kZSTDMagicLE = 0xFD2FB528;
112 
113 std::vector<std::string> ZSTDStreamCodec::validPrefixes() const {
114  return {prefixToStringLE(kZSTDMagicLE)};
115 }
116 
117 bool ZSTDStreamCodec::canUncompress(const IOBuf* data, Optional<uint64_t>)
118  const {
119  return dataStartsWithLE(data, kZSTDMagicLE);
120 }
121 
122 CodecType codecType(Options const& options) {
123  int const level = options.level();
124  DCHECK_NE(level, 0);
125  return level > 0 ? CodecType::ZSTD : CodecType::ZSTD_FAST;
126 }
127 
128 ZSTDStreamCodec::ZSTDStreamCodec(Options options)
129  : StreamCodec(codecType(options), options.level()),
130  options_(std::move(options)) {}
131 
132 bool ZSTDStreamCodec::doNeedsUncompressedLength() const {
133  return false;
134 }
135 
136 uint64_t ZSTDStreamCodec::doMaxCompressedLength(
137  uint64_t uncompressedLength) const {
138  return ZSTD_compressBound(uncompressedLength);
139 }
140 
141 Optional<uint64_t> ZSTDStreamCodec::doGetUncompressedLength(
142  IOBuf const* data,
143  Optional<uint64_t> uncompressedLength) const {
144  // Read decompressed size from frame if available in first IOBuf.
145  auto const decompressedSize =
146  ZSTD_getFrameContentSize(data->data(), data->length());
147  if (decompressedSize == ZSTD_CONTENTSIZE_UNKNOWN ||
148  decompressedSize == ZSTD_CONTENTSIZE_ERROR) {
149  return uncompressedLength;
150  }
151  if (uncompressedLength && *uncompressedLength != decompressedSize) {
152  throw std::runtime_error("ZSTD: invalid uncompressed length");
153  }
154  return decompressedSize;
155 }
156 
157 void ZSTDStreamCodec::doResetStream() {
158  needReset_ = true;
159 }
160 
161 void ZSTDStreamCodec::resetCCtx() {
162  if (!cctx_) {
163  cctx_.reset(ZSTD_createCCtx());
164  if (!cctx_) {
165  throw std::bad_alloc{};
166  }
167  }
168  ZSTD_CCtx_reset(cctx_.get());
169  zstdThrowIfError(
170  ZSTD_CCtx_setParametersUsingCCtxParams(cctx_.get(), options_.params()));
171  zstdThrowIfError(ZSTD_CCtx_setPledgedSrcSize(
172  cctx_.get(), uncompressedLength().value_or(ZSTD_CONTENTSIZE_UNKNOWN)));
173 }
174 
175 bool ZSTDStreamCodec::doCompressStream(
176  ByteRange& input,
178  StreamCodec::FlushOp flushOp) {
179  if (needReset_) {
180  resetCCtx();
181  needReset_ = false;
182  }
183  ZSTD_inBuffer in = {input.data(), input.size(), 0};
184  ZSTD_outBuffer out = {output.data(), output.size(), 0};
185  SCOPE_EXIT {
186  input.uncheckedAdvance(in.pos);
187  output.uncheckedAdvance(out.pos);
188  };
189  size_t const rc = zstdThrowIfError(ZSTD_compress_generic(
190  cctx_.get(), &out, &in, zstdTranslateFlush(flushOp)));
191  switch (flushOp) {
192  case StreamCodec::FlushOp::NONE:
193  return false;
194  case StreamCodec::FlushOp::FLUSH:
195  case StreamCodec::FlushOp::END:
196  return rc == 0;
197  default:
198  throw std::invalid_argument("ZSTD: invalid FlushOp");
199  }
200 }
201 
202 void ZSTDStreamCodec::resetDCtx() {
203  if (!dctx_) {
204  dctx_.reset(ZSTD_createDCtx());
205  if (!dctx_) {
206  throw std::bad_alloc{};
207  }
208  }
209  ZSTD_DCtx_reset(dctx_.get());
210  if (options_.maxWindowSize() != 0) {
211  zstdThrowIfError(
212  ZSTD_DCtx_setMaxWindowSize(dctx_.get(), options_.maxWindowSize()));
213  }
214 }
215 
216 bool ZSTDStreamCodec::doUncompressStream(
217  ByteRange& input,
218  MutableByteRange& output,
219  StreamCodec::FlushOp) {
220  if (needReset_) {
221  resetDCtx();
222  needReset_ = false;
223  }
224  ZSTD_inBuffer in = {input.data(), input.size(), 0};
225  ZSTD_outBuffer out = {output.data(), output.size(), 0};
226  SCOPE_EXIT {
227  input.uncheckedAdvance(in.pos);
228  output.uncheckedAdvance(out.pos);
229  };
230  size_t const rc =
231  zstdThrowIfError(ZSTD_decompress_generic(dctx_.get(), &out, &in));
232  return rc == 0;
233 }
234 
235 } // namespace
236 
237 Options::Options(int level) : params_(ZSTD_createCCtxParams()), level_(level) {
238  if (params_ == nullptr) {
239  throw std::bad_alloc{};
240  }
241 #if ZSTD_VERSION_NUMBER >= 10304
242  zstdThrowIfError(ZSTD_CCtxParams_init(params_.get(), level));
243 #else
244  zstdThrowIfError(ZSTD_initCCtxParams(params_.get(), level));
245  set(ZSTD_p_contentSizeFlag, 1);
246 #endif
247  // zstd-1.3.4 is buggy and only disables Huffman decompression for negative
248  // compression levels if this call is present. This call is begign in other
249  // versions.
250  set(ZSTD_p_compressionLevel, level);
251 }
252 
253 void Options::set(ZSTD_cParameter param, unsigned value) {
254  zstdThrowIfError(ZSTD_CCtxParam_setParameter(params_.get(), param, value));
255  if (param == ZSTD_p_compressionLevel) {
256  level_ = static_cast<int>(value);
257  }
258 }
259 
260 /* static */ void Options::freeCCtxParams(ZSTD_CCtx_params* params) {
261  ZSTD_freeCCtxParams(params);
262 }
263 
264 std::unique_ptr<Codec> getCodec(Options options) {
265  return std::make_unique<ZSTDStreamCodec>(std::move(options));
266 }
267 
268 std::unique_ptr<StreamCodec> getStreamCodec(Options options) {
269  return std::make_unique<ZSTDStreamCodec>(std::move(options));
270 }
271 
272 } // namespace zstd
273 } // namespace io
274 } // namespace folly
275 
276 #endif
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
STL namespace.
#define SCOPE_EXIT
Definition: ScopeGuard.h:274
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
Range< unsigned char * > MutableByteRange
Definition: Range.h:1164
std::enable_if< std::is_arithmetic< T >::value, std::string >::type prefixToStringLE(T prefix, uint64_t n=sizeof(T))
Definition: Utils.h:54
constexpr Params params[]
std::unique_ptr< StreamCodec > getStreamCodec(CodecType type, int level)
constexpr auto data(C &c) -> decltype(c.data())
Definition: Access.h:71
static const char *const value
Definition: Conv.cpp:50
Range< const unsigned char * > ByteRange
Definition: Range.h:1163
std::enable_if< std::is_unsigned< T >::value, bool >::type dataStartsWithLE(const IOBuf *data, T prefix, uint64_t n=sizeof(T))
Definition: Utils.h:40
std::unique_ptr< Codec > getCodec(CodecType type, int level)
static constexpr uint64_t data[1]
Definition: Fingerprint.cpp:43