proxygen
Compression.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2013-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #if FOLLY_HAVE_LIBLZ4
20 #include <lz4.h>
21 #include <lz4hc.h>
22 #if LZ4_VERSION_NUMBER >= 10301
23 #include <lz4frame.h>
24 #endif
25 #endif
26 
27 #include <glog/logging.h>
28 
29 #if FOLLY_HAVE_LIBSNAPPY
30 #include <snappy-sinksource.h>
31 #include <snappy.h>
32 #endif
33 
34 #if FOLLY_HAVE_LIBZ
35 #include <folly/compression/Zlib.h>
36 #endif
37 
38 #if FOLLY_HAVE_LIBLZMA
39 #include <lzma.h>
40 #endif
41 
42 #if FOLLY_HAVE_LIBZSTD
43 #include <folly/compression/Zstd.h>
44 #endif
45 
46 #if FOLLY_HAVE_LIBBZ2
48 
49 #include <bzlib.h>
50 #endif
51 
52 #include <folly/Conv.h>
53 #include <folly/Memory.h>
54 #include <folly/Portability.h>
55 #include <folly/Random.h>
56 #include <folly/ScopeGuard.h>
57 #include <folly/Varint.h>
59 #include <folly/io/Cursor.h>
60 #include <folly/lang/Bits.h>
61 #include <folly/stop_watch.h>
62 #include <algorithm>
63 #include <unordered_set>
64 
67 
68 namespace folly {
69 namespace io {
70 
73  Optional<int> level,
75  bool counters)
76  : type_(type) {
77  if (counters) {
79  name,
80  level,
84  name,
85  level,
87  CompressionCounterType::SUM};
89  type,
90  name,
91  level,
93  CompressionCounterType::SUM};
95  type,
96  name,
97  level,
99  CompressionCounterType::SUM};
100  compressions_ = {type,
101  name,
102  level,
104  CompressionCounterType::SUM};
106  name,
107  level,
109  CompressionCounterType::SUM};
111  name,
112  level,
114  CompressionCounterType::SUM};
116  type,
117  name,
118  level,
120  CompressionCounterType::SUM};
121  }
122 }
123 
124 namespace {
125 constexpr uint32_t kLoggingRate = 50;
126 
127 class Timer {
128  public:
130  : counter_(&counter) {}
131 
132  ~Timer() {
133  *counter_ += timer_.elapsed().count();
134  }
135 
136  private:
138  stop_watch<std::chrono::milliseconds> timer_;
139 };
140 } // namespace
141 
142 // Ensure consistent behavior in the nullptr case
143 std::unique_ptr<IOBuf> Codec::compress(const IOBuf* data) {
144  if (data == nullptr) {
145  throw std::invalid_argument("Codec: data must not be nullptr");
146  }
147  const uint64_t len = data->computeChainDataLength();
148  if (len > maxUncompressedLength()) {
149  throw std::runtime_error("Codec: uncompressed length too large");
150  }
151  bool const logging = folly::Random::oneIn(kLoggingRate);
152  folly::Optional<Timer> const timer =
154  auto result = doCompress(data);
155  if (logging) {
156  compressions_++;
158  bytesAfterCompression_ += result->computeChainDataLength();
159  }
160  return result;
161 }
162 
164  const uint64_t len = data.size();
165  if (len > maxUncompressedLength()) {
166  throw std::runtime_error("Codec: uncompressed length too large");
167  }
168  bool const logging = folly::Random::oneIn(kLoggingRate);
169  folly::Optional<Timer> const timer =
171  auto result = doCompressString(data);
172  if (logging) {
173  compressions_++;
175  bytesAfterCompression_ += result.size();
176  }
177  return result;
178 }
179 
180 std::unique_ptr<IOBuf> Codec::uncompress(
181  const IOBuf* data,
182  Optional<uint64_t> uncompressedLength) {
183  if (data == nullptr) {
184  throw std::invalid_argument("Codec: data must not be nullptr");
185  }
186  if (!uncompressedLength) {
187  if (needsUncompressedLength()) {
188  throw std::invalid_argument("Codec: uncompressed length required");
189  }
190  } else if (*uncompressedLength > maxUncompressedLength()) {
191  throw std::runtime_error("Codec: uncompressed length too large");
192  }
193 
194  if (data->empty()) {
195  if (uncompressedLength.value_or(0) != 0) {
196  throw std::runtime_error("Codec: invalid uncompressed length");
197  }
198  return IOBuf::create(0);
199  }
200 
201  bool const logging = folly::Random::oneIn(kLoggingRate);
202  folly::Optional<Timer> const timer =
204  auto result = doUncompress(data, uncompressedLength);
205  if (logging) {
206  decompressions_++;
208  bytesAfterDecompression_ += result->computeChainDataLength();
209  }
210  return result;
211 }
212 
214  const StringPiece data,
215  Optional<uint64_t> uncompressedLength) {
216  if (!uncompressedLength) {
217  if (needsUncompressedLength()) {
218  throw std::invalid_argument("Codec: uncompressed length required");
219  }
220  } else if (*uncompressedLength > maxUncompressedLength()) {
221  throw std::runtime_error("Codec: uncompressed length too large");
222  }
223 
224  if (data.empty()) {
225  if (uncompressedLength.value_or(0) != 0) {
226  throw std::runtime_error("Codec: invalid uncompressed length");
227  }
228  return "";
229  }
230 
231  bool const logging = folly::Random::oneIn(kLoggingRate);
232  folly::Optional<Timer> const timer =
234  auto result = doUncompressString(data, uncompressedLength);
235  if (logging) {
236  decompressions_++;
238  bytesAfterDecompression_ += result.size();
239  }
240  return result;
241 }
242 
244  return doNeedsUncompressedLength();
245 }
246 
248  return doMaxUncompressedLength();
249 }
250 
252  return false;
253 }
254 
257 }
258 
259 std::vector<std::string> Codec::validPrefixes() const {
260  return {};
261 }
262 
264  return false;
265 }
266 
268  const IOBuf inputBuffer{IOBuf::WRAP_BUFFER, data};
269  auto outputBuffer = doCompress(&inputBuffer);
271  output.reserve(outputBuffer->computeChainDataLength());
272  for (auto range : *outputBuffer) {
273  output.append(reinterpret_cast<const char*>(range.data()), range.size());
274  }
275  return output;
276 }
277 
279  const StringPiece data,
280  Optional<uint64_t> uncompressedLength) {
281  const IOBuf inputBuffer{IOBuf::WRAP_BUFFER, data};
282  auto outputBuffer = doUncompress(&inputBuffer, uncompressedLength);
284  output.reserve(outputBuffer->computeChainDataLength());
285  for (auto range : *outputBuffer) {
286  output.append(reinterpret_cast<const char*>(range.data()), range.size());
287  }
288  return output;
289 }
290 
291 uint64_t Codec::maxCompressedLength(uint64_t uncompressedLength) const {
292  return doMaxCompressedLength(uncompressedLength);
293 }
294 
296  const folly::IOBuf* data,
297  Optional<uint64_t> uncompressedLength) const {
298  auto const compressedLength = data->computeChainDataLength();
299  if (compressedLength == 0) {
300  if (uncompressedLength.value_or(0) != 0) {
301  throw std::runtime_error("Invalid uncompressed length");
302  }
303  return 0;
304  }
305  return doGetUncompressedLength(data, uncompressedLength);
306 }
307 
309  const folly::IOBuf*,
310  Optional<uint64_t> uncompressedLength) const {
311  return uncompressedLength;
312 }
313 
315  return doNeedsDataLength();
316 }
317 
319  return false;
320 }
321 
322 void StreamCodec::assertStateIs(State expected) const {
323  if (state_ != expected) {
324  throw std::logic_error(folly::to<std::string>(
325  "Codec: state is ", state_, "; expected state ", expected));
326  }
327 }
328 
330  state_ = State::RESET;
331  uncompressedLength_ = uncompressedLength;
332  progressMade_ = true;
333  doResetStream();
334 }
335 
337  ByteRange& input,
339  StreamCodec::FlushOp flushOp) {
340  if (state_ == State::RESET && input.empty() &&
341  flushOp == StreamCodec::FlushOp::END &&
342  uncompressedLength().value_or(0) != 0) {
343  throw std::runtime_error("Codec: invalid uncompressed length");
344  }
345 
346  if (!uncompressedLength() && needsDataLength()) {
347  throw std::runtime_error("Codec: uncompressed length required");
348  }
349  if (state_ == State::RESET && !input.empty() &&
350  uncompressedLength() == uint64_t(0)) {
351  throw std::runtime_error("Codec: invalid uncompressed length");
352  }
353  // Handle input state transitions
354  switch (flushOp) {
356  if (state_ == State::RESET) {
357  state_ = State::COMPRESS;
358  }
359  assertStateIs(State::COMPRESS);
360  break;
362  if (state_ == State::RESET || state_ == State::COMPRESS) {
363  state_ = State::COMPRESS_FLUSH;
364  }
365  assertStateIs(State::COMPRESS_FLUSH);
366  break;
368  if (state_ == State::RESET || state_ == State::COMPRESS) {
369  state_ = State::COMPRESS_END;
370  }
371  assertStateIs(State::COMPRESS_END);
372  break;
373  }
374  size_t const inputSize = input.size();
375  size_t const outputSize = output.size();
376  bool const done = doCompressStream(input, output, flushOp);
377  if (!done && inputSize == input.size() && outputSize == output.size()) {
378  if (!progressMade_) {
379  throw std::runtime_error("Codec: No forward progress made");
380  }
381  // Throw an exception if there is no progress again next time
382  progressMade_ = false;
383  } else {
384  progressMade_ = true;
385  }
386  // Handle output state transitions
387  if (done) {
388  if (state_ == State::COMPRESS_FLUSH) {
389  state_ = State::COMPRESS;
390  } else if (state_ == State::COMPRESS_END) {
391  state_ = State::END;
392  }
393  // Check internal invariants
394  DCHECK(input.empty());
395  DCHECK(flushOp != StreamCodec::FlushOp::NONE);
396  }
397  return done;
398 }
399 
401  ByteRange& input,
403  StreamCodec::FlushOp flushOp) {
404  if (state_ == State::RESET && input.empty()) {
405  if (uncompressedLength().value_or(0) == 0) {
406  return true;
407  }
408  return false;
409  }
410  // Handle input state transitions
411  if (state_ == State::RESET) {
412  state_ = State::UNCOMPRESS;
413  }
414  assertStateIs(State::UNCOMPRESS);
415  size_t const inputSize = input.size();
416  size_t const outputSize = output.size();
417  bool const done = doUncompressStream(input, output, flushOp);
418  if (!done && inputSize == input.size() && outputSize == output.size()) {
419  if (!progressMade_) {
420  throw std::runtime_error("Codec: no forward progress made");
421  }
422  // Throw an exception if there is no progress again next time
423  progressMade_ = false;
424  } else {
425  progressMade_ = true;
426  }
427  // Handle output state transitions
428  if (done) {
429  state_ = State::END;
430  }
431  return done;
432 }
433 
434 static std::unique_ptr<IOBuf> addOutputBuffer(
436  uint64_t size) {
437  DCHECK(output.empty());
438  auto buffer = IOBuf::create(size);
439  buffer->append(buffer->capacity());
440  output = {buffer->writableData(), buffer->length()};
441  return buffer;
442 }
443 
444 std::unique_ptr<IOBuf> StreamCodec::doCompress(IOBuf const* data) {
445  uint64_t const uncompressedLength = data->computeChainDataLength();
446  resetStream(uncompressedLength);
447  uint64_t const maxCompressedLen = maxCompressedLength(uncompressedLength);
448 
449  auto constexpr kMaxSingleStepLength = uint64_t(64) << 20; // 64 MB
450  auto constexpr kDefaultBufferLength = uint64_t(4) << 20; // 4 MB
451 
453  auto buffer = addOutputBuffer(
454  output,
455  maxCompressedLen <= kMaxSingleStepLength ? maxCompressedLen
456  : kDefaultBufferLength);
457 
458  // Compress the entire IOBuf chain into the IOBuf chain pointed to by buffer
459  IOBuf const* current = data;
460  ByteRange input{current->data(), current->length()};
462  bool done = false;
463  while (!done) {
464  while (input.empty() && current->next() != data) {
465  current = current->next();
466  input = {current->data(), current->length()};
467  }
468  if (current->next() == data) {
469  // This is the last input buffer so end the stream
470  flushOp = StreamCodec::FlushOp::END;
471  }
472  if (output.empty()) {
473  buffer->prependChain(addOutputBuffer(output, kDefaultBufferLength));
474  }
475  done = compressStream(input, output, flushOp);
476  if (done) {
477  DCHECK(input.empty());
478  DCHECK(flushOp == StreamCodec::FlushOp::END);
479  DCHECK_EQ(current->next(), data);
480  }
481  }
482  buffer->prev()->trimEnd(output.size());
483  return buffer;
484 }
485 
487  uint64_t const compressedLength,
488  uint64_t const blockSize) {
489  uint64_t constexpr kMaxBufferLength = uint64_t(4) << 20; // 4 MiB
490  uint64_t const goodBufferSize = 4 * std::max(blockSize, compressedLength);
491  return std::min(goodBufferSize, kMaxBufferLength);
492 }
493 
494 std::unique_ptr<IOBuf> StreamCodec::doUncompress(
495  IOBuf const* data,
496  Optional<uint64_t> uncompressedLength) {
497  auto constexpr kMaxSingleStepLength = uint64_t(64) << 20; // 64 MB
498  auto constexpr kBlockSize = uint64_t(128) << 10;
499  auto const defaultBufferLength =
500  computeBufferLength(data->computeChainDataLength(), kBlockSize);
501 
502  uncompressedLength = getUncompressedLength(data, uncompressedLength);
503  resetStream(uncompressedLength);
504 
506  auto buffer = addOutputBuffer(
507  output,
508  (uncompressedLength && *uncompressedLength <= kMaxSingleStepLength
509  ? *uncompressedLength
510  : defaultBufferLength));
511 
512  // Uncompress the entire IOBuf chain into the IOBuf chain pointed to by buffer
513  IOBuf const* current = data;
514  ByteRange input{current->data(), current->length()};
516  bool done = false;
517  while (!done) {
518  while (input.empty() && current->next() != data) {
519  current = current->next();
520  input = {current->data(), current->length()};
521  }
522  if (current->next() == data) {
523  // Tell the uncompressor there is no more input (it may optimize)
524  flushOp = StreamCodec::FlushOp::END;
525  }
526  if (output.empty()) {
527  buffer->prependChain(addOutputBuffer(output, defaultBufferLength));
528  }
529  done = uncompressStream(input, output, flushOp);
530  }
531  if (!input.empty()) {
532  throw std::runtime_error("Codec: Junk after end of data");
533  }
534 
535  buffer->prev()->trimEnd(output.size());
536  if (uncompressedLength &&
537  *uncompressedLength != buffer->computeChainDataLength()) {
538  throw std::runtime_error("Codec: invalid uncompressed length");
539  }
540 
541  return buffer;
542 }
543 
544 namespace {
545 
549 class NoCompressionCodec final : public Codec {
550  public:
551  static std::unique_ptr<Codec> create(int level, CodecType type);
552  explicit NoCompressionCodec(int level, CodecType type);
553 
554  private:
555  uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
556  std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
557  std::unique_ptr<IOBuf> doUncompress(
558  const IOBuf* data,
559  Optional<uint64_t> uncompressedLength) override;
560 };
561 
562 std::unique_ptr<Codec> NoCompressionCodec::create(int level, CodecType type) {
563  return std::make_unique<NoCompressionCodec>(level, type);
564 }
565 
566 NoCompressionCodec::NoCompressionCodec(int level, CodecType type)
567  : Codec(type) {
568  DCHECK(type == CodecType::NO_COMPRESSION);
569  switch (level) {
573  level = 0;
574  }
575  if (level != 0) {
576  throw std::invalid_argument(
577  to<std::string>("NoCompressionCodec: invalid level ", level));
578  }
579 }
580 
581 uint64_t NoCompressionCodec::doMaxCompressedLength(
582  uint64_t uncompressedLength) const {
583  return uncompressedLength;
584 }
585 
586 std::unique_ptr<IOBuf> NoCompressionCodec::doCompress(const IOBuf* data) {
587  return data->clone();
588 }
589 
590 std::unique_ptr<IOBuf> NoCompressionCodec::doUncompress(
591  const IOBuf* data,
592  Optional<uint64_t> uncompressedLength) {
593  if (uncompressedLength &&
594  data->computeChainDataLength() != *uncompressedLength) {
595  throw std::runtime_error(
596  to<std::string>("NoCompressionCodec: invalid uncompressed length"));
597  }
598  return data->clone();
599 }
600 
601 #if (FOLLY_HAVE_LIBLZ4 || FOLLY_HAVE_LIBLZMA)
602 
603 namespace {
604 
605 void encodeVarintToIOBuf(uint64_t val, folly::IOBuf* out) {
606  DCHECK_GE(out->tailroom(), kMaxVarintLength64);
607  out->append(encodeVarint(val, out->writableTail()));
608 }
609 
610 inline uint64_t decodeVarintFromCursor(folly::io::Cursor& cursor) {
611  uint64_t val = 0;
612  int8_t b = 0;
613  for (int shift = 0; shift <= 63; shift += 7) {
614  b = cursor.read<int8_t>();
615  val |= static_cast<uint64_t>(b & 0x7f) << shift;
616  if (b >= 0) {
617  break;
618  }
619  }
620  if (b < 0) {
621  throw std::invalid_argument("Invalid varint value. Too big.");
622  }
623  return val;
624 }
625 
626 } // namespace
627 
628 #endif // FOLLY_HAVE_LIBLZ4 || FOLLY_HAVE_LIBLZMA
629 
630 #if FOLLY_HAVE_LIBLZ4
631 
632 #if LZ4_VERSION_NUMBER >= 10802 && defined(LZ4_STATIC_LINKING_ONLY) && \
633  defined(LZ4_HC_STATIC_LINKING_ONLY) && !defined(FOLLY_USE_LZ4_FAST_RESET)
634 #define FOLLY_USE_LZ4_FAST_RESET
635 #endif
636 
637 #ifdef FOLLY_USE_LZ4_FAST_RESET
638 namespace {
639 void lz4_stream_t_deleter(LZ4_stream_t* ctx) {
640  LZ4_freeStream(ctx);
641 }
642 
643 void lz4_streamhc_t_deleter(LZ4_streamHC_t* ctx) {
644  LZ4_freeStreamHC(ctx);
645 }
646 } // namespace
647 #endif
648 
652 class LZ4Codec final : public Codec {
653  public:
654  static std::unique_ptr<Codec> create(int level, CodecType type);
655  explicit LZ4Codec(int level, CodecType type);
656 
657  private:
658  bool doNeedsUncompressedLength() const override;
659  uint64_t doMaxUncompressedLength() const override;
660  uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
661 
662  bool encodeSize() const {
663  return type() == CodecType::LZ4_VARINT_SIZE;
664  }
665 
666  std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
667  std::unique_ptr<IOBuf> doUncompress(
668  const IOBuf* data,
669  Optional<uint64_t> uncompressedLength) override;
670 
671 #ifdef FOLLY_USE_LZ4_FAST_RESET
672  std::unique_ptr<
673  LZ4_stream_t,
675  ctx;
676  std::unique_ptr<
677  LZ4_streamHC_t,
679  hcctx;
680 #endif
681 
682  bool highCompression_;
683 };
684 
685 std::unique_ptr<Codec> LZ4Codec::create(int level, CodecType type) {
686  return std::make_unique<LZ4Codec>(level, type);
687 }
688 
689 static int lz4ConvertLevel(int level) {
690  switch (level) {
691  case 1:
694  return 1;
695  case 2:
697  return 2;
698  }
699  throw std::invalid_argument(
700  to<std::string>("LZ4Codec: invalid level: ", level));
701 }
702 
703 LZ4Codec::LZ4Codec(int level, CodecType type)
704  : Codec(type, lz4ConvertLevel(level)),
705  highCompression_(lz4ConvertLevel(level) > 1) {
706  DCHECK(type == CodecType::LZ4 || type == CodecType::LZ4_VARINT_SIZE);
707 }
708 
709 bool LZ4Codec::doNeedsUncompressedLength() const {
710  return !encodeSize();
711 }
712 
713 // The value comes from lz4.h in lz4-r117, but older versions of lz4 don't
714 // define LZ4_MAX_INPUT_SIZE (even though the max size is the same), so do it
715 // here.
716 #ifndef LZ4_MAX_INPUT_SIZE
717 #define LZ4_MAX_INPUT_SIZE 0x7E000000
718 #endif
719 
720 uint64_t LZ4Codec::doMaxUncompressedLength() const {
721  return LZ4_MAX_INPUT_SIZE;
722 }
723 
724 uint64_t LZ4Codec::doMaxCompressedLength(uint64_t uncompressedLength) const {
725  return LZ4_compressBound(uncompressedLength) +
726  (encodeSize() ? kMaxVarintLength64 : 0);
727 }
728 
729 std::unique_ptr<IOBuf> LZ4Codec::doCompress(const IOBuf* data) {
730  IOBuf clone;
731  if (data->isChained()) {
732  // LZ4 doesn't support streaming, so we have to coalesce
733  clone = data->cloneCoalescedAsValue();
734  data = &clone;
735  }
736 
737  auto out = IOBuf::create(maxCompressedLength(data->length()));
738  if (encodeSize()) {
739  encodeVarintToIOBuf(data->length(), out.get());
740  }
741 
742  int n;
743  auto input = reinterpret_cast<const char*>(data->data());
744  auto output = reinterpret_cast<char*>(out->writableTail());
745  const auto inputLength = data->length();
746 
747 #ifdef FOLLY_USE_LZ4_FAST_RESET
748  if (!highCompression_ && !ctx) {
749  ctx.reset(LZ4_createStream());
750  }
751  if (highCompression_ && !hcctx) {
752  hcctx.reset(LZ4_createStreamHC());
753  }
754 
755  if (highCompression_) {
756  n = LZ4_compress_HC_extStateHC_fastReset(
757  hcctx.get(), input, output, inputLength, out->tailroom(), 0);
758  } else {
759  n = LZ4_compress_fast_extState_fastReset(
760  ctx.get(), input, output, inputLength, out->tailroom(), 1);
761  }
762 #elif LZ4_VERSION_NUMBER >= 10700
763  if (highCompression_) {
764  n = LZ4_compress_HC(input, output, inputLength, out->tailroom(), 0);
765  } else {
766  n = LZ4_compress_default(input, output, inputLength, out->tailroom());
767  }
768 #else
769  if (highCompression_) {
770  n = LZ4_compressHC(input, output, inputLength);
771  } else {
772  n = LZ4_compress(input, output, inputLength);
773  }
774 #endif
775 
776  CHECK_GE(n, 0);
777  CHECK_LE(n, out->capacity());
778 
779  out->append(n);
780  return out;
781 }
782 
783 std::unique_ptr<IOBuf> LZ4Codec::doUncompress(
784  const IOBuf* data,
785  Optional<uint64_t> uncompressedLength) {
786  IOBuf clone;
787  if (data->isChained()) {
788  // LZ4 doesn't support streaming, so we have to coalesce
789  clone = data->cloneCoalescedAsValue();
790  data = &clone;
791  }
792 
793  folly::io::Cursor cursor(data);
794  uint64_t actualUncompressedLength;
795  if (encodeSize()) {
796  actualUncompressedLength = decodeVarintFromCursor(cursor);
797  if (uncompressedLength && *uncompressedLength != actualUncompressedLength) {
798  throw std::runtime_error("LZ4Codec: invalid uncompressed length");
799  }
800  } else {
801  // Invariants
802  DCHECK(uncompressedLength.hasValue());
803  DCHECK(*uncompressedLength <= maxUncompressedLength());
804  actualUncompressedLength = *uncompressedLength;
805  }
806 
807  auto sp = StringPiece{cursor.peekBytes()};
808  auto out = IOBuf::create(actualUncompressedLength);
809  int n = LZ4_decompress_safe(
810  sp.data(),
811  reinterpret_cast<char*>(out->writableTail()),
812  sp.size(),
813  actualUncompressedLength);
814 
815  if (n < 0 || uint64_t(n) != actualUncompressedLength) {
816  throw std::runtime_error(
817  to<std::string>("LZ4 decompression returned invalid value ", n));
818  }
819  out->append(actualUncompressedLength);
820  return out;
821 }
822 
823 #if LZ4_VERSION_NUMBER >= 10301
824 
825 class LZ4FrameCodec final : public Codec {
826  public:
827  static std::unique_ptr<Codec> create(int level, CodecType type);
828  explicit LZ4FrameCodec(int level, CodecType type);
829  ~LZ4FrameCodec() override;
830 
831  std::vector<std::string> validPrefixes() const override;
832  bool canUncompress(const IOBuf* data, Optional<uint64_t> uncompressedLength)
833  const override;
834 
835  private:
836  uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
837 
838  std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
839  std::unique_ptr<IOBuf> doUncompress(
840  const IOBuf* data,
841  Optional<uint64_t> uncompressedLength) override;
842 
843  // Reset the dctx_ if it is dirty or null.
844  void resetDCtx();
845 
846  int level_;
847 #ifdef FOLLY_USE_LZ4_FAST_RESET
848  LZ4F_compressionContext_t cctx_{nullptr};
849 #endif
850  LZ4F_decompressionContext_t dctx_{nullptr};
851  bool dirty_{false};
852 };
853 
854 /* static */ std::unique_ptr<Codec> LZ4FrameCodec::create(
855  int level,
856  CodecType type) {
857  return std::make_unique<LZ4FrameCodec>(level, type);
858 }
859 
860 static constexpr uint32_t kLZ4FrameMagicLE = 0x184D2204;
861 
862 std::vector<std::string> LZ4FrameCodec::validPrefixes() const {
863  return {prefixToStringLE(kLZ4FrameMagicLE)};
864 }
865 
866 bool LZ4FrameCodec::canUncompress(const IOBuf* data, Optional<uint64_t>) const {
867  return dataStartsWithLE(data, kLZ4FrameMagicLE);
868 }
869 
870 uint64_t LZ4FrameCodec::doMaxCompressedLength(
871  uint64_t uncompressedLength) const {
872  LZ4F_preferences_t prefs{};
873  prefs.compressionLevel = level_;
874  prefs.frameInfo.contentSize = uncompressedLength;
875  return LZ4F_compressFrameBound(uncompressedLength, &prefs);
876 }
877 
878 static size_t lz4FrameThrowOnError(size_t code) {
879  if (LZ4F_isError(code)) {
880  throw std::runtime_error(
881  to<std::string>("LZ4Frame error: ", LZ4F_getErrorName(code)));
882  }
883  return code;
884 }
885 
886 void LZ4FrameCodec::resetDCtx() {
887  if (dctx_ && !dirty_) {
888  return;
889  }
890  if (dctx_) {
891  LZ4F_freeDecompressionContext(dctx_);
892  }
893  lz4FrameThrowOnError(LZ4F_createDecompressionContext(&dctx_, 100));
894  dirty_ = false;
895 }
896 
897 static int lz4fConvertLevel(int level) {
898  switch (level) {
901  return 0;
903  return 16;
904  }
905  return level;
906 }
907 
908 LZ4FrameCodec::LZ4FrameCodec(int level, CodecType type)
909  : Codec(type, lz4fConvertLevel(level)), level_(lz4fConvertLevel(level)) {
910  DCHECK(type == CodecType::LZ4_FRAME);
911 }
912 
913 LZ4FrameCodec::~LZ4FrameCodec() {
914  if (dctx_) {
915  LZ4F_freeDecompressionContext(dctx_);
916  }
917 #ifdef FOLLY_USE_LZ4_FAST_RESET
918  if (cctx_) {
919  LZ4F_freeCompressionContext(cctx_);
920  }
921 #endif
922 }
923 
924 std::unique_ptr<IOBuf> LZ4FrameCodec::doCompress(const IOBuf* data) {
925  // LZ4 Frame compression doesn't support streaming so we have to coalesce
926  IOBuf clone;
927  if (data->isChained()) {
928  clone = data->cloneCoalescedAsValue();
929  data = &clone;
930  }
931 
932 #ifdef FOLLY_USE_LZ4_FAST_RESET
933  if (!cctx_) {
934  lz4FrameThrowOnError(LZ4F_createCompressionContext(&cctx_, LZ4F_VERSION));
935  }
936 #endif
937 
938  // Set preferences
939  const auto uncompressedLength = data->length();
940  LZ4F_preferences_t prefs{};
941  prefs.compressionLevel = level_;
942  prefs.frameInfo.contentSize = uncompressedLength;
943  // Compress
944  auto buf = IOBuf::create(maxCompressedLength(uncompressedLength));
945  const size_t written = lz4FrameThrowOnError(
946 #ifdef FOLLY_USE_LZ4_FAST_RESET
947  LZ4F_compressFrame_usingCDict(
948  cctx_,
949  buf->writableTail(),
950  buf->tailroom(),
951  data->data(),
952  data->length(),
953  nullptr,
954  &prefs)
955 #else
956  LZ4F_compressFrame(
957  buf->writableTail(),
958  buf->tailroom(),
959  data->data(),
960  data->length(),
961  &prefs)
962 #endif
963  );
964  buf->append(written);
965  return buf;
966 }
967 
968 std::unique_ptr<IOBuf> LZ4FrameCodec::doUncompress(
969  const IOBuf* data,
970  Optional<uint64_t> uncompressedLength) {
971  // Reset the dctx if any errors have occurred
972  resetDCtx();
973  // Coalesce the data
974  ByteRange in = *data->begin();
975  IOBuf clone;
976  if (data->isChained()) {
977  clone = data->cloneCoalescedAsValue();
978  in = clone.coalesce();
979  }
980  data = nullptr;
981  // Select decompression options
982  LZ4F_decompressOptions_t options;
983  options.stableDst = 1;
984  // Select blockSize and growthSize for the IOBufQueue
986  auto blockSize = uint64_t{64} << 10;
987  auto growthSize = uint64_t{4} << 20;
988  if (uncompressedLength) {
989  // Allocate uncompressedLength in one chunk (up to 64 MB)
990  const auto allocateSize = std::min(*uncompressedLength, uint64_t{64} << 20);
991  queue.preallocate(allocateSize, allocateSize);
992  blockSize = std::min(*uncompressedLength, blockSize);
993  growthSize = std::min(*uncompressedLength, growthSize);
994  } else {
995  // Reduce growthSize for small data
996  const auto guessUncompressedLen =
997  4 * std::max<uint64_t>(blockSize, in.size());
998  growthSize = std::min(guessUncompressedLen, growthSize);
999  }
1000  // Once LZ4_decompress() is called, the dctx_ cannot be reused until it
1001  // returns 0
1002  dirty_ = true;
1003  // Decompress until the frame is over
1004  size_t code = 0;
1005  do {
1006  // Allocate enough space to decompress at least a block
1007  void* out;
1008  size_t outSize;
1009  std::tie(out, outSize) = queue.preallocate(blockSize, growthSize);
1010  // Decompress
1011  size_t inSize = in.size();
1012  code = lz4FrameThrowOnError(
1013  LZ4F_decompress(dctx_, out, &outSize, in.data(), &inSize, &options));
1014  if (in.empty() && outSize == 0 && code != 0) {
1015  // We passed no input, no output was produced, and the frame isn't over
1016  // No more forward progress is possible
1017  throw std::runtime_error("LZ4Frame error: Incomplete frame");
1018  }
1019  in.uncheckedAdvance(inSize);
1020  queue.postallocate(outSize);
1021  } while (code != 0);
1022  // At this point the decompression context can be reused
1023  dirty_ = false;
1024  if (uncompressedLength && queue.chainLength() != *uncompressedLength) {
1025  throw std::runtime_error("LZ4Frame error: Invalid uncompressedLength");
1026  }
1027  return queue.move();
1028 }
1029 
1030 #endif // LZ4_VERSION_NUMBER >= 10301
1031 #endif // FOLLY_HAVE_LIBLZ4
1032 
1033 #if FOLLY_HAVE_LIBSNAPPY
1034 
1042 class IOBufSnappySource final : public snappy::Source {
1043  public:
1044  explicit IOBufSnappySource(const IOBuf* data);
1045  size_t Available() const override;
1046  const char* Peek(size_t* len) override;
1047  void Skip(size_t n) override;
1048 
1049  private:
1050  size_t available_;
1051  io::Cursor cursor_;
1052 };
1053 
1054 IOBufSnappySource::IOBufSnappySource(const IOBuf* data)
1055  : available_(data->computeChainDataLength()), cursor_(data) {}
1056 
1057 size_t IOBufSnappySource::Available() const {
1058  return available_;
1059 }
1060 
1061 const char* IOBufSnappySource::Peek(size_t* len) {
1062  auto sp = StringPiece{cursor_.peekBytes()};
1063  *len = sp.size();
1064  return sp.data();
1065 }
1066 
1067 void IOBufSnappySource::Skip(size_t n) {
1068  CHECK_LE(n, available_);
1069  cursor_.skip(n);
1070  available_ -= n;
1071 }
1072 
1073 class SnappyCodec final : public Codec {
1074  public:
1075  static std::unique_ptr<Codec> create(int level, CodecType type);
1076  explicit SnappyCodec(int level, CodecType type);
1077 
1078  private:
1079  uint64_t doMaxUncompressedLength() const override;
1080  uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
1081  std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
1082  std::unique_ptr<IOBuf> doUncompress(
1083  const IOBuf* data,
1084  Optional<uint64_t> uncompressedLength) override;
1085 };
1086 
1087 std::unique_ptr<Codec> SnappyCodec::create(int level, CodecType type) {
1088  return std::make_unique<SnappyCodec>(level, type);
1089 }
1090 
1091 SnappyCodec::SnappyCodec(int level, CodecType type) : Codec(type) {
1092  DCHECK(type == CodecType::SNAPPY);
1093  switch (level) {
1097  level = 1;
1098  }
1099  if (level != 1) {
1100  throw std::invalid_argument(
1101  to<std::string>("SnappyCodec: invalid level: ", level));
1102  }
1103 }
1104 
1105 uint64_t SnappyCodec::doMaxUncompressedLength() const {
1106  // snappy.h uses uint32_t for lengths, so there's that.
1108 }
1109 
1110 uint64_t SnappyCodec::doMaxCompressedLength(uint64_t uncompressedLength) const {
1111  return snappy::MaxCompressedLength(uncompressedLength);
1112 }
1113 
1114 std::unique_ptr<IOBuf> SnappyCodec::doCompress(const IOBuf* data) {
1115  IOBufSnappySource source(data);
1116  auto out = IOBuf::create(maxCompressedLength(source.Available()));
1117 
1118  snappy::UncheckedByteArraySink sink(
1119  reinterpret_cast<char*>(out->writableTail()));
1120 
1121  size_t n = snappy::Compress(&source, &sink);
1122 
1123  CHECK_LE(n, out->capacity());
1124  out->append(n);
1125  return out;
1126 }
1127 
1128 std::unique_ptr<IOBuf> SnappyCodec::doUncompress(
1129  const IOBuf* data,
1130  Optional<uint64_t> uncompressedLength) {
1131  uint32_t actualUncompressedLength = 0;
1132 
1133  {
1134  IOBufSnappySource source(data);
1135  if (!snappy::GetUncompressedLength(&source, &actualUncompressedLength)) {
1136  throw std::runtime_error("snappy::GetUncompressedLength failed");
1137  }
1138  if (uncompressedLength && *uncompressedLength != actualUncompressedLength) {
1139  throw std::runtime_error("snappy: invalid uncompressed length");
1140  }
1141  }
1142 
1143  auto out = IOBuf::create(actualUncompressedLength);
1144 
1145  {
1146  IOBufSnappySource source(data);
1147  if (!snappy::RawUncompress(
1148  &source, reinterpret_cast<char*>(out->writableTail()))) {
1149  throw std::runtime_error("snappy::RawUncompress failed");
1150  }
1151  }
1152 
1153  out->append(actualUncompressedLength);
1154  return out;
1155 }
1156 
1157 #endif // FOLLY_HAVE_LIBSNAPPY
1158 
1159 #if FOLLY_HAVE_LIBLZMA
1160 
1164 class LZMA2StreamCodec final : public StreamCodec {
1165  public:
1166  static std::unique_ptr<Codec> createCodec(int level, CodecType type);
1167  static std::unique_ptr<StreamCodec> createStream(int level, CodecType type);
1168  explicit LZMA2StreamCodec(int level, CodecType type);
1169  ~LZMA2StreamCodec() override;
1170 
1171  std::vector<std::string> validPrefixes() const override;
1172  bool canUncompress(const IOBuf* data, Optional<uint64_t> uncompressedLength)
1173  const override;
1174 
1175  private:
1176  bool doNeedsDataLength() const override;
1177  uint64_t doMaxUncompressedLength() const override;
1178  uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
1179 
1180  bool encodeSize() const {
1181  return type() == CodecType::LZMA2_VARINT_SIZE;
1182  }
1183 
1184  void doResetStream() override;
1185  bool doCompressStream(
1186  ByteRange& input,
1188  StreamCodec::FlushOp flushOp) override;
1189  bool doUncompressStream(
1190  ByteRange& input,
1191  MutableByteRange& output,
1192  StreamCodec::FlushOp flushOp) override;
1193 
1194  void resetCStream();
1195  void resetDStream();
1196 
1197  bool decodeAndCheckVarint(ByteRange& input);
1198  bool flushVarintBuffer(MutableByteRange& output);
1199  void resetVarintBuffer();
1200 
1201  Optional<lzma_stream> cstream_{};
1202  Optional<lzma_stream> dstream_{};
1203 
1204  std::array<uint8_t, kMaxVarintLength64> varintBuffer_;
1205  ByteRange varintToEncode_;
1206  size_t varintBufferPos_{0};
1207 
1208  int level_;
1209  bool needReset_{true};
1210  bool needDecodeSize_{false};
1211 };
1212 
1213 static constexpr uint64_t kLZMA2MagicLE = 0x005A587A37FD;
1214 static constexpr unsigned kLZMA2MagicBytes = 6;
1215 
1216 std::vector<std::string> LZMA2StreamCodec::validPrefixes() const {
1218  return {};
1219  }
1220  return {prefixToStringLE(kLZMA2MagicLE, kLZMA2MagicBytes)};
1221 }
1222 
1223 bool LZMA2StreamCodec::doNeedsDataLength() const {
1224  return encodeSize();
1225 }
1226 
1227 bool LZMA2StreamCodec::canUncompress(const IOBuf* data, Optional<uint64_t>)
1228  const {
1230  return false;
1231  }
1232  // Returns false for all inputs less than 8 bytes.
1233  // This is okay, because no valid LZMA2 streams are less than 8 bytes.
1234  return dataStartsWithLE(data, kLZMA2MagicLE, kLZMA2MagicBytes);
1235 }
1236 
1237 std::unique_ptr<Codec> LZMA2StreamCodec::createCodec(
1238  int level,
1239  CodecType type) {
1240  return make_unique<LZMA2StreamCodec>(level, type);
1241 }
1242 
1243 std::unique_ptr<StreamCodec> LZMA2StreamCodec::createStream(
1244  int level,
1245  CodecType type) {
1246  return make_unique<LZMA2StreamCodec>(level, type);
1247 }
1248 
1249 LZMA2StreamCodec::LZMA2StreamCodec(int level, CodecType type)
1250  : StreamCodec(type) {
1251  DCHECK(type == CodecType::LZMA2 || type == CodecType::LZMA2_VARINT_SIZE);
1252  switch (level) {
1254  level = 0;
1255  break;
1257  level = LZMA_PRESET_DEFAULT;
1258  break;
1260  level = 9;
1261  break;
1262  }
1263  if (level < 0 || level > 9) {
1264  throw std::invalid_argument(
1265  to<std::string>("LZMA2Codec: invalid level: ", level));
1266  }
1267  level_ = level;
1268 }
1269 
1270 LZMA2StreamCodec::~LZMA2StreamCodec() {
1271  if (cstream_) {
1272  lzma_end(cstream_.get_pointer());
1273  cstream_.clear();
1274  }
1275  if (dstream_) {
1276  lzma_end(dstream_.get_pointer());
1277  dstream_.clear();
1278  }
1279 }
1280 
1281 uint64_t LZMA2StreamCodec::doMaxUncompressedLength() const {
1282  // From lzma/base.h: "Stream is roughly 8 EiB (2^63 bytes)"
1283  return uint64_t(1) << 63;
1284 }
1285 
1286 uint64_t LZMA2StreamCodec::doMaxCompressedLength(
1287  uint64_t uncompressedLength) const {
1288  return lzma_stream_buffer_bound(uncompressedLength) +
1289  (encodeSize() ? kMaxVarintLength64 : 0);
1290 }
1291 
1292 void LZMA2StreamCodec::doResetStream() {
1293  needReset_ = true;
1294 }
1295 
1296 void LZMA2StreamCodec::resetCStream() {
1297  if (!cstream_) {
1298  cstream_.assign(LZMA_STREAM_INIT);
1299  }
1300  lzma_ret const rc =
1301  lzma_easy_encoder(cstream_.get_pointer(), level_, LZMA_CHECK_NONE);
1302  if (rc != LZMA_OK) {
1303  throw std::runtime_error(folly::to<std::string>(
1304  "LZMA2StreamCodec: lzma_easy_encoder error: ", rc));
1305  }
1306 }
1307 
1308 void LZMA2StreamCodec::resetDStream() {
1309  if (!dstream_) {
1310  dstream_.assign(LZMA_STREAM_INIT);
1311  }
1312  lzma_ret const rc = lzma_auto_decoder(
1313  dstream_.get_pointer(), std::numeric_limits<uint64_t>::max(), 0);
1314  if (rc != LZMA_OK) {
1315  throw std::runtime_error(folly::to<std::string>(
1316  "LZMA2StreamCodec: lzma_auto_decoder error: ", rc));
1317  }
1318 }
1319 
1320 static lzma_ret lzmaThrowOnError(lzma_ret const rc) {
1321  switch (rc) {
1322  case LZMA_OK:
1323  case LZMA_STREAM_END:
1324  case LZMA_BUF_ERROR: // not fatal: returned if no progress was made twice
1325  return rc;
1326  default:
1327  throw std::runtime_error(
1328  to<std::string>("LZMA2StreamCodec: error: ", rc));
1329  }
1330 }
1331 
1332 static lzma_action lzmaTranslateFlush(StreamCodec::FlushOp flush) {
1333  switch (flush) {
1335  return LZMA_RUN;
1337  return LZMA_SYNC_FLUSH;
1339  return LZMA_FINISH;
1340  default:
1341  throw std::invalid_argument("LZMA2StreamCodec: Invalid flush");
1342  }
1343 }
1344 
1350 bool LZMA2StreamCodec::flushVarintBuffer(MutableByteRange& output) {
1351  if (varintToEncode_.empty()) {
1352  return true;
1353  }
1354  const size_t numBytesToCopy = std::min(varintToEncode_.size(), output.size());
1355  if (numBytesToCopy > 0) {
1356  memcpy(output.data(), varintToEncode_.data(), numBytesToCopy);
1357  }
1358  varintToEncode_.advance(numBytesToCopy);
1359  output.advance(numBytesToCopy);
1360  return varintToEncode_.empty();
1361 }
1362 
1363 bool LZMA2StreamCodec::doCompressStream(
1364  ByteRange& input,
1365  MutableByteRange& output,
1366  StreamCodec::FlushOp flushOp) {
1367  if (needReset_) {
1368  resetCStream();
1369  if (encodeSize()) {
1370  varintBufferPos_ = 0;
1371  size_t const varintSize =
1372  encodeVarint(*uncompressedLength(), varintBuffer_.data());
1373  varintToEncode_ = {varintBuffer_.data(), varintSize};
1374  }
1375  needReset_ = false;
1376  }
1377 
1378  if (!flushVarintBuffer(output)) {
1379  return false;
1380  }
1381 
1382  cstream_->next_in = const_cast<uint8_t*>(input.data());
1383  cstream_->avail_in = input.size();
1384  cstream_->next_out = output.data();
1385  cstream_->avail_out = output.size();
1386  SCOPE_EXIT {
1387  input.uncheckedAdvance(input.size() - cstream_->avail_in);
1388  output.uncheckedAdvance(output.size() - cstream_->avail_out);
1389  };
1390  lzma_ret const rc = lzmaThrowOnError(
1391  lzma_code(cstream_.get_pointer(), lzmaTranslateFlush(flushOp)));
1392  switch (flushOp) {
1394  return false;
1396  return cstream_->avail_in == 0 && cstream_->avail_out != 0;
1398  return rc == LZMA_STREAM_END;
1399  default:
1400  throw std::invalid_argument("LZMA2StreamCodec: invalid FlushOp");
1401  }
1402 }
1403 
1417 bool LZMA2StreamCodec::decodeAndCheckVarint(ByteRange& input) {
1418  if (input.empty()) {
1419  return false;
1420  }
1421  size_t const numBytesToCopy =
1422  std::min(kMaxVarintLength64 - varintBufferPos_, input.size());
1423  memcpy(varintBuffer_.data() + varintBufferPos_, input.data(), numBytesToCopy);
1424 
1425  size_t const rangeSize = varintBufferPos_ + numBytesToCopy;
1426  ByteRange range{varintBuffer_.data(), rangeSize};
1427  auto const ret = tryDecodeVarint(range);
1428 
1429  if (ret.hasValue()) {
1430  size_t const varintSize = rangeSize - range.size();
1431  input.advance(varintSize - varintBufferPos_);
1432  if (uncompressedLength() && *uncompressedLength() != ret.value()) {
1433  throw std::runtime_error("LZMA2StreamCodec: invalid uncompressed length");
1434  }
1435  return true;
1436  } else if (ret.error() == DecodeVarintError::TooManyBytes) {
1437  throw std::runtime_error("LZMA2StreamCodec: invalid uncompressed length");
1438  } else {
1439  // Too few bytes
1440  input.advance(numBytesToCopy);
1441  varintBufferPos_ += numBytesToCopy;
1442  return false;
1443  }
1444 }
1445 
1446 bool LZMA2StreamCodec::doUncompressStream(
1447  ByteRange& input,
1448  MutableByteRange& output,
1449  StreamCodec::FlushOp flushOp) {
1450  if (needReset_) {
1451  resetDStream();
1452  needReset_ = false;
1453  needDecodeSize_ = encodeSize();
1454  if (encodeSize()) {
1455  // Reset buffer
1456  varintBufferPos_ = 0;
1457  }
1458  }
1459 
1460  if (needDecodeSize_) {
1461  // Try decoding the varint. If the input does not contain the entire varint,
1462  // buffer the input. If the varint can not be decoded, fail.
1463  if (!decodeAndCheckVarint(input)) {
1464  return false;
1465  }
1466  needDecodeSize_ = false;
1467  }
1468 
1469  dstream_->next_in = const_cast<uint8_t*>(input.data());
1470  dstream_->avail_in = input.size();
1471  dstream_->next_out = output.data();
1472  dstream_->avail_out = output.size();
1473  SCOPE_EXIT {
1474  input.advance(input.size() - dstream_->avail_in);
1475  output.advance(output.size() - dstream_->avail_out);
1476  };
1477 
1478  lzma_ret rc;
1479  switch (flushOp) {
1482  rc = lzmaThrowOnError(lzma_code(dstream_.get_pointer(), LZMA_RUN));
1483  break;
1485  rc = lzmaThrowOnError(lzma_code(dstream_.get_pointer(), LZMA_FINISH));
1486  break;
1487  default:
1488  throw std::invalid_argument("LZMA2StreamCodec: invalid flush");
1489  }
1490  return rc == LZMA_STREAM_END;
1491 }
1492 #endif // FOLLY_HAVE_LIBLZMA
1493 
1494 #if FOLLY_HAVE_LIBZSTD
1495 
1496 static int zstdConvertLevel(int level) {
1497  switch (level) {
1499  return 1;
1501  return 1;
1503  return 19;
1504  }
1505  if (level < 1 || level > ZSTD_maxCLevel()) {
1506  throw std::invalid_argument(
1507  to<std::string>("ZSTD: invalid level: ", level));
1508  }
1509  return level;
1510 }
1511 
1512 static int zstdFastConvertLevel(int level) {
1513  switch (level) {
1515  return -5;
1517  return -1;
1519  return -1;
1520  }
1521  if (level < 1) {
1522  throw std::invalid_argument(
1523  to<std::string>("ZSTD: invalid level: ", level));
1524  }
1525  return -level;
1526 }
1527 
1528 std::unique_ptr<Codec> getZstdCodec(int level, CodecType type) {
1529  DCHECK(type == CodecType::ZSTD);
1530  return zstd::getCodec(zstd::Options(zstdConvertLevel(level)));
1531 }
1532 
1533 std::unique_ptr<StreamCodec> getZstdStreamCodec(int level, CodecType type) {
1534  DCHECK(type == CodecType::ZSTD);
1535  return zstd::getStreamCodec(zstd::Options(zstdConvertLevel(level)));
1536 }
1537 
1538 std::unique_ptr<Codec> getZstdFastCodec(int level, CodecType type) {
1539  DCHECK(type == CodecType::ZSTD_FAST);
1540  return zstd::getCodec(zstd::Options(zstdFastConvertLevel(level)));
1541 }
1542 
1543 std::unique_ptr<StreamCodec> getZstdFastStreamCodec(int level, CodecType type) {
1544  DCHECK(type == CodecType::ZSTD_FAST);
1545  return zstd::getStreamCodec(zstd::Options(zstdFastConvertLevel(level)));
1546 }
1547 
1548 #endif // FOLLY_HAVE_LIBZSTD
1549 
1550 #if FOLLY_HAVE_LIBBZ2
1551 
1552 class Bzip2StreamCodec final : public StreamCodec {
1553  public:
1554  static std::unique_ptr<Codec> createCodec(int level, CodecType type);
1555  static std::unique_ptr<StreamCodec> createStream(int level, CodecType type);
1556  explicit Bzip2StreamCodec(int level, CodecType type);
1557 
1558  ~Bzip2StreamCodec() override;
1559 
1560  std::vector<std::string> validPrefixes() const override;
1561  bool canUncompress(IOBuf const* data, Optional<uint64_t> uncompressedLength)
1562  const override;
1563 
1564  private:
1565  uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
1566 
1567  void doResetStream() override;
1568  bool doCompressStream(
1569  ByteRange& input,
1570  MutableByteRange& output,
1571  StreamCodec::FlushOp flushOp) override;
1572  bool doUncompressStream(
1573  ByteRange& input,
1574  MutableByteRange& output,
1575  StreamCodec::FlushOp flushOp) override;
1576 
1577  void resetCStream();
1578  void resetDStream();
1579 
1580  Optional<bz_stream> cstream_{};
1581  Optional<bz_stream> dstream_{};
1582 
1583  int level_;
1584  bool needReset_{true};
1585 };
1586 
1587 /* static */ std::unique_ptr<Codec> Bzip2StreamCodec::createCodec(
1588  int level,
1589  CodecType type) {
1590  return createStream(level, type);
1591 }
1592 
1593 /* static */ std::unique_ptr<StreamCodec> Bzip2StreamCodec::createStream(
1594  int level,
1595  CodecType type) {
1596  return std::make_unique<Bzip2StreamCodec>(level, type);
1597 }
1598 
1599 Bzip2StreamCodec::Bzip2StreamCodec(int level, CodecType type)
1600  : StreamCodec(type) {
1601  DCHECK(type == CodecType::BZIP2);
1602  switch (level) {
1604  level = 1;
1605  break;
1607  level = 9;
1608  break;
1610  level = 9;
1611  break;
1612  }
1613  if (level < 1 || level > 9) {
1614  throw std::invalid_argument(
1615  to<std::string>("Bzip2: invalid level: ", level));
1616  }
1617  level_ = level;
1618 }
1619 
1620 static uint32_t constexpr kBzip2MagicLE = 0x685a42;
1621 static uint64_t constexpr kBzip2MagicBytes = 3;
1622 
1623 std::vector<std::string> Bzip2StreamCodec::validPrefixes() const {
1624  return {prefixToStringLE(kBzip2MagicLE, kBzip2MagicBytes)};
1625 }
1626 
1627 bool Bzip2StreamCodec::canUncompress(IOBuf const* data, Optional<uint64_t>)
1628  const {
1629  return dataStartsWithLE(data, kBzip2MagicLE, kBzip2MagicBytes);
1630 }
1631 
1632 uint64_t Bzip2StreamCodec::doMaxCompressedLength(
1633  uint64_t uncompressedLength) const {
1634  // http://www.bzip.org/1.0.5/bzip2-manual-1.0.5.html#bzbufftobuffcompress
1635  // To guarantee that the compressed data will fit in its buffer, allocate an
1636  // output buffer of size 1% larger than the uncompressed data, plus six
1637  // hundred extra bytes.
1638  return uncompressedLength + uncompressedLength / 100 + 600;
1639 }
1640 
1641 static bz_stream createBzStream() {
1642  bz_stream stream;
1643  stream.bzalloc = nullptr;
1644  stream.bzfree = nullptr;
1645  stream.opaque = nullptr;
1646  stream.next_in = stream.next_out = nullptr;
1647  stream.avail_in = stream.avail_out = 0;
1648  return stream;
1649 }
1650 
1651 // Throws on error condition, otherwise returns the code.
1652 static int bzCheck(int const rc) {
1653  switch (rc) {
1654  case BZ_OK:
1655  case BZ_RUN_OK:
1656  case BZ_FLUSH_OK:
1657  case BZ_FINISH_OK:
1658  case BZ_STREAM_END:
1659  // Allow BZ_PARAM_ERROR.
1660  // It can get returned if no progress is made, but we handle that.
1661  case BZ_PARAM_ERROR:
1662  return rc;
1663  default:
1664  throw std::runtime_error(to<std::string>("Bzip2 error: ", rc));
1665  }
1666 }
1667 
1668 Bzip2StreamCodec::~Bzip2StreamCodec() {
1669  if (cstream_) {
1670  BZ2_bzCompressEnd(cstream_.get_pointer());
1671  cstream_.clear();
1672  }
1673  if (dstream_) {
1674  BZ2_bzDecompressEnd(dstream_.get_pointer());
1675  dstream_.clear();
1676  }
1677 }
1678 
1679 void Bzip2StreamCodec::doResetStream() {
1680  needReset_ = true;
1681 }
1682 
1683 void Bzip2StreamCodec::resetCStream() {
1684  if (cstream_) {
1685  BZ2_bzCompressEnd(cstream_.get_pointer());
1686  }
1687  cstream_ = createBzStream();
1688  bzCheck(BZ2_bzCompressInit(cstream_.get_pointer(), level_, 0, 0));
1689 }
1690 
1691 int bzip2TranslateFlush(StreamCodec::FlushOp flushOp) {
1692  switch (flushOp) {
1694  return BZ_RUN;
1696  return BZ_FINISH;
1698  throw std::invalid_argument(
1699  "Bzip2StreamCodec: FlushOp::FLUSH not supported");
1700  default:
1701  throw std::invalid_argument("Bzip2StreamCodec: Invalid flush");
1702  }
1703 }
1704 
1705 bool Bzip2StreamCodec::doCompressStream(
1706  ByteRange& input,
1707  MutableByteRange& output,
1708  StreamCodec::FlushOp flushOp) {
1709  if (needReset_) {
1710  resetCStream();
1711  needReset_ = false;
1712  }
1713  if (input.empty() && output.empty()) {
1714  return false;
1715  }
1716 
1717  cstream_->next_in =
1718  const_cast<char*>(reinterpret_cast<const char*>(input.data()));
1719  cstream_->avail_in = input.size();
1720  cstream_->next_out = reinterpret_cast<char*>(output.data());
1721  cstream_->avail_out = output.size();
1722  SCOPE_EXIT {
1723  input.uncheckedAdvance(input.size() - cstream_->avail_in);
1724  output.uncheckedAdvance(output.size() - cstream_->avail_out);
1725  };
1726  int const rc = bzCheck(
1727  BZ2_bzCompress(cstream_.get_pointer(), bzip2TranslateFlush(flushOp)));
1728  switch (flushOp) {
1730  return false;
1732  if (rc == BZ_RUN_OK) {
1733  DCHECK_EQ(cstream_->avail_in, 0);
1734  DCHECK(input.size() == 0 || cstream_->avail_out != output.size());
1735  return true;
1736  }
1737  return false;
1739  return rc == BZ_STREAM_END;
1740  default:
1741  throw std::invalid_argument("Bzip2StreamCodec: invalid FlushOp");
1742  }
1743  return false;
1744 }
1745 
1746 void Bzip2StreamCodec::resetDStream() {
1747  if (dstream_) {
1748  BZ2_bzDecompressEnd(dstream_.get_pointer());
1749  }
1750  dstream_ = createBzStream();
1751  bzCheck(BZ2_bzDecompressInit(dstream_.get_pointer(), 0, 0));
1752 }
1753 
1754 bool Bzip2StreamCodec::doUncompressStream(
1755  ByteRange& input,
1756  MutableByteRange& output,
1757  StreamCodec::FlushOp flushOp) {
1758  if (flushOp == StreamCodec::FlushOp::FLUSH) {
1759  throw std::invalid_argument(
1760  "Bzip2StreamCodec: FlushOp::FLUSH not supported");
1761  }
1762  if (needReset_) {
1763  resetDStream();
1764  needReset_ = false;
1765  }
1766 
1767  dstream_->next_in =
1768  const_cast<char*>(reinterpret_cast<const char*>(input.data()));
1769  dstream_->avail_in = input.size();
1770  dstream_->next_out = reinterpret_cast<char*>(output.data());
1771  dstream_->avail_out = output.size();
1772  SCOPE_EXIT {
1773  input.uncheckedAdvance(input.size() - dstream_->avail_in);
1774  output.uncheckedAdvance(output.size() - dstream_->avail_out);
1775  };
1776  int const rc = bzCheck(BZ2_bzDecompress(dstream_.get_pointer()));
1777  return rc == BZ_STREAM_END;
1778 }
1779 
1780 #endif // FOLLY_HAVE_LIBBZ2
1781 
1782 #if FOLLY_HAVE_LIBZ
1783 
1784 zlib::Options getZlibOptions(CodecType type) {
1785  DCHECK(type == CodecType::GZIP || type == CodecType::ZLIB);
1786  return type == CodecType::GZIP ? zlib::defaultGzipOptions()
1787  : zlib::defaultZlibOptions();
1788 }
1789 
1790 std::unique_ptr<Codec> getZlibCodec(int level, CodecType type) {
1791  return zlib::getCodec(getZlibOptions(type), level);
1792 }
1793 
1794 std::unique_ptr<StreamCodec> getZlibStreamCodec(int level, CodecType type) {
1795  return zlib::getStreamCodec(getZlibOptions(type), level);
1796 }
1797 
1798 #endif // FOLLY_HAVE_LIBZ
1799 
1803 class AutomaticCodec final : public Codec {
1804  public:
1805  static std::unique_ptr<Codec> create(
1806  std::vector<std::unique_ptr<Codec>> customCodecs,
1807  std::unique_ptr<Codec> terminalCodec);
1808  explicit AutomaticCodec(
1809  std::vector<std::unique_ptr<Codec>> customCodecs,
1810  std::unique_ptr<Codec> terminalCodec);
1811 
1812  std::vector<std::string> validPrefixes() const override;
1813  bool canUncompress(const IOBuf* data, Optional<uint64_t> uncompressedLength)
1814  const override;
1815 
1816  private:
1817  bool doNeedsUncompressedLength() const override;
1818  uint64_t doMaxUncompressedLength() const override;
1819 
1820  uint64_t doMaxCompressedLength(uint64_t) const override {
1821  throw std::runtime_error(
1822  "AutomaticCodec error: maxCompressedLength() not supported.");
1823  }
1824  std::unique_ptr<IOBuf> doCompress(const IOBuf*) override {
1825  throw std::runtime_error("AutomaticCodec error: compress() not supported.");
1826  }
1827  std::unique_ptr<IOBuf> doUncompress(
1828  const IOBuf* data,
1829  Optional<uint64_t> uncompressedLength) override;
1830 
1831  void addCodecIfSupported(CodecType type);
1832 
1833  // Throws iff the codecs aren't compatible (very slow)
1834  void checkCompatibleCodecs() const;
1835 
1836  std::vector<std::unique_ptr<Codec>> codecs_;
1837  std::unique_ptr<Codec> terminalCodec_;
1840 };
1841 
1842 std::vector<std::string> AutomaticCodec::validPrefixes() const {
1843  std::unordered_set<std::string> prefixes;
1844  for (const auto& codec : codecs_) {
1845  const auto codecPrefixes = codec->validPrefixes();
1846  prefixes.insert(codecPrefixes.begin(), codecPrefixes.end());
1847  }
1848  return std::vector<std::string>{prefixes.begin(), prefixes.end()};
1849 }
1850 
1851 bool AutomaticCodec::canUncompress(
1852  const IOBuf* data,
1853  Optional<uint64_t> uncompressedLength) const {
1854  return std::any_of(
1855  codecs_.begin(),
1856  codecs_.end(),
1857  [data, uncompressedLength](std::unique_ptr<Codec> const& codec) {
1858  return codec->canUncompress(data, uncompressedLength);
1859  });
1860 }
1861 
1862 void AutomaticCodec::addCodecIfSupported(CodecType type) {
1863  const bool present = std::any_of(
1864  codecs_.begin(),
1865  codecs_.end(),
1866  [&type](std::unique_ptr<Codec> const& codec) {
1867  return codec->type() == type;
1868  });
1869  bool const isTerminalType = terminalCodec_ && terminalCodec_->type() == type;
1870  if (hasCodec(type) && !present && !isTerminalType) {
1871  codecs_.push_back(getCodec(type));
1872  }
1873 }
1874 
1875 /* static */ std::unique_ptr<Codec> AutomaticCodec::create(
1876  std::vector<std::unique_ptr<Codec>> customCodecs,
1877  std::unique_ptr<Codec> terminalCodec) {
1878  return std::make_unique<AutomaticCodec>(
1879  std::move(customCodecs), std::move(terminalCodec));
1880 }
1881 
1882 AutomaticCodec::AutomaticCodec(
1883  std::vector<std::unique_ptr<Codec>> customCodecs,
1884  std::unique_ptr<Codec> terminalCodec)
1886  codecs_(std::move(customCodecs)),
1887  terminalCodec_(std::move(terminalCodec)) {
1888  // Fastest -> slowest
1889  std::array<CodecType, 6> defaultTypes{{
1896  }};
1897 
1898  for (auto type : defaultTypes) {
1899  addCodecIfSupported(type);
1900  }
1901 
1902  if (kIsDebug) {
1903  checkCompatibleCodecs();
1904  }
1905 
1906  // Check that none of the codecs are null
1907  DCHECK(std::none_of(
1908  codecs_.begin(), codecs_.end(), [](std::unique_ptr<Codec> const& codec) {
1909  return codec == nullptr;
1910  }));
1911 
1912  // Check that the terminal codec's type is not duplicated (with the exception
1913  // of USER_DEFINED).
1914  if (terminalCodec_) {
1915  DCHECK(std::none_of(
1916  codecs_.begin(),
1917  codecs_.end(),
1918  [&](std::unique_ptr<Codec> const& codec) {
1919  return codec->type() != CodecType::USER_DEFINED &&
1920  codec->type() == terminalCodec_->type();
1921  }));
1922  }
1923 
1924  bool const terminalNeedsUncompressedLength =
1925  terminalCodec_ && terminalCodec_->needsUncompressedLength();
1926  needsUncompressedLength_ = std::any_of(
1927  codecs_.begin(),
1928  codecs_.end(),
1929  [](std::unique_ptr<Codec> const& codec) {
1930  return codec->needsUncompressedLength();
1931  }) ||
1932  terminalNeedsUncompressedLength;
1933 
1934  const auto it = std::max_element(
1935  codecs_.begin(),
1936  codecs_.end(),
1937  [](std::unique_ptr<Codec> const& lhs, std::unique_ptr<Codec> const& rhs) {
1938  return lhs->maxUncompressedLength() < rhs->maxUncompressedLength();
1939  });
1940  DCHECK(it != codecs_.end());
1941  auto const terminalMaxUncompressedLength =
1942  terminalCodec_ ? terminalCodec_->maxUncompressedLength() : 0;
1944  std::max((*it)->maxUncompressedLength(), terminalMaxUncompressedLength);
1945 }
1946 
1947 void AutomaticCodec::checkCompatibleCodecs() const {
1948  // Keep track of all the possible headers.
1949  std::unordered_set<std::string> headers;
1950  // The empty header is not allowed.
1951  headers.insert("");
1952  // Step 1:
1953  // Construct a set of headers and check that none of the headers occur twice.
1954  // Eliminate edge cases.
1955  for (auto&& codec : codecs_) {
1956  const auto codecHeaders = codec->validPrefixes();
1957  // Codecs without any valid headers are not allowed.
1958  if (codecHeaders.empty()) {
1959  throw std::invalid_argument{
1960  "AutomaticCodec: validPrefixes() must not be empty."};
1961  }
1962  // Insert all the headers for the current codec.
1963  const size_t beforeSize = headers.size();
1964  headers.insert(codecHeaders.begin(), codecHeaders.end());
1965  // Codecs are not compatible if any header occurred twice.
1966  if (beforeSize + codecHeaders.size() != headers.size()) {
1967  throw std::invalid_argument{
1968  "AutomaticCodec: Two valid prefixes collide."};
1969  }
1970  }
1971  // Step 2:
1972  // Check if any strict non-empty prefix of any header is a header.
1973  for (const auto& header : headers) {
1974  for (size_t i = 1; i < header.size(); ++i) {
1975  if (headers.count(header.substr(0, i))) {
1976  throw std::invalid_argument{
1977  "AutomaticCodec: One valid prefix is a prefix of another valid "
1978  "prefix."};
1979  }
1980  }
1981  }
1982 }
1983 
1984 bool AutomaticCodec::doNeedsUncompressedLength() const {
1985  return needsUncompressedLength_;
1986 }
1987 
1988 uint64_t AutomaticCodec::doMaxUncompressedLength() const {
1989  return maxUncompressedLength_;
1990 }
1991 
1992 std::unique_ptr<IOBuf> AutomaticCodec::doUncompress(
1993  const IOBuf* data,
1994  Optional<uint64_t> uncompressedLength) {
1995  try {
1996  for (auto&& codec : codecs_) {
1997  if (codec->canUncompress(data, uncompressedLength)) {
1998  return codec->uncompress(data, uncompressedLength);
1999  }
2000  }
2001  } catch (std::exception const& e) {
2002  if (!terminalCodec_) {
2003  throw e;
2004  }
2005  }
2006 
2007  // Try terminal codec
2008  if (terminalCodec_) {
2009  return terminalCodec_->uncompress(data, uncompressedLength);
2010  }
2011 
2012  throw std::runtime_error("AutomaticCodec error: Unknown compressed data");
2013 }
2014 
2015 using CodecFactory = std::unique_ptr<Codec> (*)(int, CodecType);
2016 using StreamCodecFactory = std::unique_ptr<StreamCodec> (*)(int, CodecType);
2017 struct Factory {
2018  CodecFactory codec;
2019  StreamCodecFactory stream;
2020 };
2021 
2022 constexpr Factory
2023  codecFactories[static_cast<size_t>(CodecType::NUM_CODEC_TYPES)] = {
2024  {}, // USER_DEFINED
2025  {NoCompressionCodec::create, nullptr},
2026 
2027 #if FOLLY_HAVE_LIBLZ4
2028  {LZ4Codec::create, nullptr},
2029 #else
2030  {},
2031 #endif
2032 
2033 #if FOLLY_HAVE_LIBSNAPPY
2034  {SnappyCodec::create, nullptr},
2035 #else
2036  {},
2037 #endif
2038 
2039 #if FOLLY_HAVE_LIBZ
2040  {getZlibCodec, getZlibStreamCodec},
2041 #else
2042  {},
2043 #endif
2044 
2045 #if FOLLY_HAVE_LIBLZ4
2046  {LZ4Codec::create, nullptr},
2047 #else
2048  {},
2049 #endif
2050 
2051 #if FOLLY_HAVE_LIBLZMA
2052  {LZMA2StreamCodec::createCodec, LZMA2StreamCodec::createStream},
2053  {LZMA2StreamCodec::createCodec, LZMA2StreamCodec::createStream},
2054 #else
2055  {},
2056  {},
2057 #endif
2058 
2059 #if FOLLY_HAVE_LIBZSTD
2060  {getZstdCodec, getZstdStreamCodec},
2061 #else
2062  {},
2063 #endif
2064 
2065 #if FOLLY_HAVE_LIBZ
2066  {getZlibCodec, getZlibStreamCodec},
2067 #else
2068  {},
2069 #endif
2070 
2071 #if (FOLLY_HAVE_LIBLZ4 && LZ4_VERSION_NUMBER >= 10301)
2072  {LZ4FrameCodec::create, nullptr},
2073 #else
2074  {},
2075 #endif
2076 
2077 #if FOLLY_HAVE_LIBBZ2
2078  {Bzip2StreamCodec::createCodec, Bzip2StreamCodec::createStream},
2079 #else
2080  {},
2081 #endif
2082 
2083 #if FOLLY_HAVE_LIBZSTD
2084  {getZstdFastCodec, getZstdFastStreamCodec},
2085 #else
2086  {},
2087 #endif
2088 };
2089 
2090 Factory const& getFactory(CodecType type) {
2091  size_t const idx = static_cast<size_t>(type);
2092  if (idx >= static_cast<size_t>(CodecType::NUM_CODEC_TYPES)) {
2093  throw std::invalid_argument(
2094  to<std::string>("Compression type ", idx, " invalid"));
2095  }
2096  return codecFactories[idx];
2097 }
2098 } // namespace
2099 
2101  return getFactory(type).codec != nullptr;
2102 }
2103 
2104 std::unique_ptr<Codec> getCodec(CodecType type, int level) {
2105  auto const factory = getFactory(type).codec;
2106  if (!factory) {
2107  throw std::invalid_argument(
2108  to<std::string>("Compression type ", type, " not supported"));
2109  }
2110  auto codec = (*factory)(level, type);
2111  DCHECK(codec->type() == type);
2112  return codec;
2113 }
2114 
2116  return getFactory(type).stream != nullptr;
2117 }
2118 
2119 std::unique_ptr<StreamCodec> getStreamCodec(CodecType type, int level) {
2120  auto const factory = getFactory(type).stream;
2121  if (!factory) {
2122  throw std::invalid_argument(
2123  to<std::string>("Compression type ", type, " not supported"));
2124  }
2125  auto codec = (*factory)(level, type);
2126  DCHECK(codec->type() == type);
2127  return codec;
2128 }
2129 
2130 std::unique_ptr<Codec> getAutoUncompressionCodec(
2131  std::vector<std::unique_ptr<Codec>> customCodecs,
2132  std::unique_ptr<Codec> terminalCodec) {
2133  return AutomaticCodec::create(
2134  std::move(customCodecs), std::move(terminalCodec));
2135 }
2136 } // namespace io
2137 } // namespace folly
std::vector< uint8_t > buffer(kBufferSize+16)
constexpr int COMPRESSION_LEVEL_DEFAULT
Definition: Compression.h:441
virtual std::string doUncompressString(StringPiece data, folly::Optional< uint64_t > uncompressedLength)
virtual bool doNeedsDataLength() const
bool hasCodec(CodecType type)
virtual std::unique_ptr< IOBuf > doUncompress(const folly::IOBuf *data, folly::Optional< uint64_t > uncompressedLength)=0
size_t chainLength() const
Definition: IOBufQueue.h:492
bool uncompressStream(folly::ByteRange &input, folly::MutableByteRange &output, FlushOp flushOp=StreamCodec::FlushOp::NONE)
bool empty() const
Definition: IOBuf.cpp:482
constexpr auto kIsDebug
Definition: Portability.h:264
static std::unique_ptr< IOBuf > create(std::size_t capacity)
Definition: IOBuf.cpp:229
char b
LogLevel max
Definition: LogLevel.cpp:31
Iterator begin() const
Definition: IOBuf.h:1684
virtual bool canUncompress(const folly::IOBuf *data, folly::Optional< uint64_t > uncompressedLength=folly::none) const
StreamCodecFactory stream
folly::detail::CompressionCounter compressionMilliseconds_
Definition: Compression.h:270
ByteRange coalesce()
Definition: IOBuf.h:1095
bool needsUncompressedLength_
PskType type
bool hasStreamCodec(CodecType type)
constexpr size_t kMaxVarintLength64
Definition: Varint.h:50
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
void advance(size_type n)
Definition: Range.h:672
CodecFactory codec
constexpr size_type size() const
Definition: Range.h:431
double val
Definition: String.cpp:273
const uint8_t * data() const
Definition: IOBuf.h:499
#define SCOPE_EXIT
Definition: ScopeGuard.h:274
std::unique_ptr< folly::IOBuf > move()
Definition: IOBufQueue.h:459
static bool oneIn(uint32_t n)
Definition: Random.h:311
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
bool isChained() const
Definition: IOBuf.h:760
std::unique_ptr< IOBuf > clone() const
Definition: IOBuf.cpp:527
folly::detail::CompressionCounter bytesAfterCompression_
Definition: Compression.h:265
std::vector< std::unique_ptr< Codec > > codecs_
std::unique_ptr< IOBuf > compress(const folly::IOBuf *data)
static uint64_t computeBufferLength(uint64_t const compressedLength, uint64_t const blockSize)
std::unique_ptr< Codec > terminalCodec_
stop_watch< std::chrono::milliseconds > timer_
folly::detail::CompressionCounter bytesBeforeDecompression_
Definition: Compression.h:266
std::size_t tailroom() const
Definition: IOBuf.h:551
uint8_t * writableTail()
Definition: IOBuf.h:526
FOLLY_PUSH_WARNING RHS rhs
Definition: Traits.h:649
std::enable_if< std::is_arithmetic< T >::value, std::string >::type prefixToStringLE(T prefix, uint64_t n=sizeof(T))
Definition: Utils.h:54
std::pair< void *, std::size_t > preallocate(std::size_t min, std::size_t newAllocationSize, std::size_t max=std::numeric_limits< std::size_t >::max())
Definition: IOBufQueue.h:356
Codec(CodecType type, folly::Optional< int > level=folly::none, folly::StringPiece name={}, bool counters=true)
Definition: Compression.cpp:71
static std::unique_ptr< IOBuf > addOutputBuffer(MutableByteRange &output, uint64_t size)
virtual uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const =0
virtual folly::Optional< uint64_t > doGetUncompressedLength(const folly::IOBuf *data, folly::Optional< uint64_t > uncompressedLength) const
int current
const char * name
Definition: http_parser.c:437
folly::detail::CompressionCounter decompressions_
Definition: Compression.h:269
constexpr auto size(C const &c) -> decltype(c.size())
Definition: Access.h:45
constexpr bool empty() const
Definition: Range.h:443
virtual std::string doCompressString(StringPiece data)
static constexpr uint64_t UNLIMITED_UNCOMPRESSED_LENGTH
Definition: Compression.h:135
virtual std::vector< std::string > validPrefixes() const
LogLevel min
Definition: LogLevel.cpp:30
std::unique_ptr< IOBuf > uncompress(const IOBuf *data, folly::Optional< uint64_t > uncompressedLength=folly::none)
static Options cacheChainLength()
Definition: IOBufQueue.h:83
std::unique_ptr< StreamCodec > getStreamCodec(CodecType type, int level)
virtual uint64_t doMaxUncompressedLength() const
Type type_
Definition: JSONSchema.cpp:208
constexpr Iter data() const
Definition: Range.h:446
constexpr auto data(C &c) -> decltype(c.data())
Definition: Access.h:71
std::size_t length() const
Definition: IOBuf.h:533
virtual bool doNeedsUncompressedLength() const
FOLLY_CPP14_CONSTEXPR bool hasValue() const noexcept
Definition: Optional.h:300
constexpr Range< Iter > range(Iter first, Iter last)
Definition: Range.h:1114
uint64_t maxUncompressedLength_
folly::detail::CompressionCounter bytesBeforeCompression_
Definition: Compression.h:264
folly::detail::CompressionCounter bytesAfterDecompression_
Definition: Compression.h:267
IOBuf * next()
Definition: IOBuf.h:600
constexpr int COMPRESSION_LEVEL_BEST
Definition: Compression.h:442
bool needsDataLength() const
FOLLY_CPP14_CONSTEXPR Value value_or(U &&dflt) const &
Definition: Optional.h:330
folly::detail::CompressionCounter decompressionMilliseconds_
Definition: Compression.h:271
uint64_t maxCompressedLength(uint64_t uncompressedLength) const
std::size_t computeChainDataLength() const
Definition: IOBuf.cpp:501
CodecType type() const
Definition: Compression.h:147
std::atomic< int > counter
virtual std::unique_ptr< IOBuf > doCompress(const folly::IOBuf *data)=0
const char * string
Definition: Conv.cpp:212
void uncheckedAdvance(size_type n)
Definition: Range.h:695
bool compressStream(folly::ByteRange &input, folly::MutableByteRange &output, FlushOp flushOp=StreamCodec::FlushOp::NONE)
std::unique_ptr< IOBuf > doUncompress(const folly::IOBuf *data, folly::Optional< uint64_t > uncompressedLength) override
folly::detail::CompressionCounter compressions_
Definition: Compression.h:268
void resetStream(folly::Optional< uint64_t > uncompressedLength=folly::none)
IOBuf cloneCoalescedAsValue() const
Definition: IOBuf.cpp:570
Wrapper around the makeCompressionCounterHandler() extension point.
Definition: Counters.h:70
folly::detail::CompressionCounter * counter_
std::enable_if< std::is_unsigned< T >::value, bool >::type dataStartsWithLE(const IOBuf *data, T prefix, uint64_t n=sizeof(T))
Definition: Utils.h:40
std::unique_ptr< Codec > getAutoUncompressionCodec(std::vector< std::unique_ptr< Codec >> customCodecs, std::unique_ptr< Codec > terminalCodec)
uint64_t maxUncompressedLength() const
void postallocate(std::size_t n)
Definition: IOBufQueue.h:380
size_t encodeVarint(uint64_t val, uint8_t *buf)
Definition: Varint.h:109
constexpr int COMPRESSION_LEVEL_FASTEST
Definition: Compression.h:440
std::unique_ptr< Codec > getCodec(CodecType type, int level)
void assertStateIs(State expected) const
Expected< uint64_t, DecodeVarintError > tryDecodeVarint(Range< T * > &data)
Definition: Varint.h:147
std::unique_ptr< IOBuf > doCompress(const folly::IOBuf *data) override
folly::Optional< uint64_t > getUncompressedLength(const folly::IOBuf *data, folly::Optional< uint64_t > uncompressedLength=folly::none) const
bool needsUncompressedLength() const
void append(std::size_t amount)
Definition: IOBuf.h:689
constexpr None none
Definition: Optional.h:87