22 #if LZ4_VERSION_NUMBER >= 10301 27 #include <glog/logging.h> 29 #if FOLLY_HAVE_LIBSNAPPY 30 #include <snappy-sinksource.h> 38 #if FOLLY_HAVE_LIBLZMA 42 #if FOLLY_HAVE_LIBZSTD 63 #include <unordered_set> 87 CompressionCounterType::SUM};
93 CompressionCounterType::SUM};
99 CompressionCounterType::SUM};
104 CompressionCounterType::SUM};
109 CompressionCounterType::SUM};
114 CompressionCounterType::SUM};
120 CompressionCounterType::SUM};
125 constexpr
uint32_t kLoggingRate = 50;
138 stop_watch<std::chrono::milliseconds>
timer_;
144 if (data ==
nullptr) {
145 throw std::invalid_argument(
"Codec: data must not be nullptr");
149 throw std::runtime_error(
"Codec: uncompressed length too large");
166 throw std::runtime_error(
"Codec: uncompressed length too large");
183 if (data ==
nullptr) {
184 throw std::invalid_argument(
"Codec: data must not be nullptr");
186 if (!uncompressedLength) {
188 throw std::invalid_argument(
"Codec: uncompressed length required");
191 throw std::runtime_error(
"Codec: uncompressed length too large");
195 if (uncompressedLength.
value_or(0) != 0) {
196 throw std::runtime_error(
"Codec: invalid uncompressed length");
216 if (!uncompressedLength) {
218 throw std::invalid_argument(
"Codec: uncompressed length required");
221 throw std::runtime_error(
"Codec: uncompressed length too large");
225 if (uncompressedLength.
value_or(0) != 0) {
226 throw std::runtime_error(
"Codec: invalid uncompressed length");
271 output.reserve(outputBuffer->computeChainDataLength());
272 for (
auto range : *outputBuffer) {
273 output.append(reinterpret_cast<const char*>(
range.data()),
range.size());
282 auto outputBuffer =
doUncompress(&inputBuffer, uncompressedLength);
284 output.reserve(outputBuffer->computeChainDataLength());
285 for (
auto range : *outputBuffer) {
286 output.append(reinterpret_cast<const char*>(
range.data()),
range.size());
299 if (compressedLength == 0) {
300 if (uncompressedLength.
value_or(0) != 0) {
301 throw std::runtime_error(
"Invalid uncompressed length");
311 return uncompressedLength;
315 return doNeedsDataLength();
323 if (state_ != expected) {
324 throw std::logic_error(folly::to<std::string>(
325 "Codec: state is ", state_,
"; expected state ", expected));
330 state_ = State::RESET;
331 uncompressedLength_ = uncompressedLength;
332 progressMade_ =
true;
340 if (state_ == State::RESET && input.
empty() &&
342 uncompressedLength().value_or(0) != 0) {
343 throw std::runtime_error(
"Codec: invalid uncompressed length");
346 if (!uncompressedLength() && needsDataLength()) {
347 throw std::runtime_error(
"Codec: uncompressed length required");
349 if (state_ == State::RESET && !input.
empty() &&
350 uncompressedLength() ==
uint64_t(0)) {
351 throw std::runtime_error(
"Codec: invalid uncompressed length");
356 if (state_ == State::RESET) {
357 state_ = State::COMPRESS;
359 assertStateIs(State::COMPRESS);
362 if (state_ == State::RESET || state_ == State::COMPRESS) {
363 state_ = State::COMPRESS_FLUSH;
365 assertStateIs(State::COMPRESS_FLUSH);
368 if (state_ == State::RESET || state_ == State::COMPRESS) {
369 state_ = State::COMPRESS_END;
371 assertStateIs(State::COMPRESS_END);
374 size_t const inputSize = input.
size();
375 size_t const outputSize = output.
size();
376 bool const done = doCompressStream(input, output, flushOp);
377 if (!done && inputSize == input.
size() && outputSize == output.
size()) {
378 if (!progressMade_) {
379 throw std::runtime_error(
"Codec: No forward progress made");
382 progressMade_ =
false;
384 progressMade_ =
true;
388 if (state_ == State::COMPRESS_FLUSH) {
389 state_ = State::COMPRESS;
390 }
else if (state_ == State::COMPRESS_END) {
394 DCHECK(input.
empty());
404 if (state_ == State::RESET && input.
empty()) {
405 if (uncompressedLength().value_or(0) == 0) {
411 if (state_ == State::RESET) {
412 state_ = State::UNCOMPRESS;
414 assertStateIs(State::UNCOMPRESS);
415 size_t const inputSize = input.
size();
416 size_t const outputSize = output.
size();
417 bool const done = doUncompressStream(input, output, flushOp);
418 if (!done && inputSize == input.
size() && outputSize == output.
size()) {
419 if (!progressMade_) {
420 throw std::runtime_error(
"Codec: no forward progress made");
423 progressMade_ =
false;
425 progressMade_ =
true;
437 DCHECK(output.
empty());
439 buffer->append(buffer->capacity());
440 output = {buffer->writableData(), buffer->length()};
446 resetStream(uncompressedLength);
449 auto constexpr kMaxSingleStepLength =
uint64_t(64) << 20;
450 auto constexpr kDefaultBufferLength =
uint64_t(4) << 20;
455 maxCompressedLen <= kMaxSingleStepLength ? maxCompressedLen
456 : kDefaultBufferLength);
464 while (input.empty() && current->
next() !=
data) {
465 current = current->
next();
472 if (output.
empty()) {
475 done = compressStream(input, output, flushOp);
477 DCHECK(input.empty());
491 return std::min(goodBufferSize, kMaxBufferLength);
497 auto constexpr kMaxSingleStepLength =
uint64_t(64) << 20;
498 auto constexpr kBlockSize =
uint64_t(128) << 10;
499 auto const defaultBufferLength =
503 resetStream(uncompressedLength);
508 (uncompressedLength && *uncompressedLength <= kMaxSingleStepLength
509 ? *uncompressedLength
510 : defaultBufferLength));
518 while (input.empty() && current->
next() !=
data) {
519 current = current->
next();
526 if (output.
empty()) {
529 done = uncompressStream(input, output, flushOp);
531 if (!input.empty()) {
532 throw std::runtime_error(
"Codec: Junk after end of data");
536 if (uncompressedLength &&
537 *uncompressedLength !=
buffer->computeChainDataLength()) {
538 throw std::runtime_error(
"Codec: invalid uncompressed length");
549 class NoCompressionCodec final :
public Codec {
551 static std::unique_ptr<Codec> create(
int level,
CodecType type);
552 explicit NoCompressionCodec(
int level,
CodecType type);
562 std::unique_ptr<Codec> NoCompressionCodec::create(
int level,
CodecType type) {
563 return std::make_unique<NoCompressionCodec>(level,
type);
566 NoCompressionCodec::NoCompressionCodec(
int level,
CodecType type)
576 throw std::invalid_argument(
577 to<std::string>(
"NoCompressionCodec: invalid level ", level));
581 uint64_t NoCompressionCodec::doMaxCompressedLength(
582 uint64_t uncompressedLength)
const {
583 return uncompressedLength;
586 std::unique_ptr<IOBuf> NoCompressionCodec::doCompress(
const IOBuf*
data) {
587 return data->
clone();
590 std::unique_ptr<IOBuf> NoCompressionCodec::doUncompress(
593 if (uncompressedLength &&
595 throw std::runtime_error(
596 to<std::string>(
"NoCompressionCodec: invalid uncompressed length"));
598 return data->
clone();
601 #if (FOLLY_HAVE_LIBLZ4 || FOLLY_HAVE_LIBLZMA) 613 for (
int shift = 0; shift <= 63; shift += 7) {
615 val |=
static_cast<uint64_t>(b & 0x7f) << shift;
621 throw std::invalid_argument(
"Invalid varint value. Too big.");
628 #endif // FOLLY_HAVE_LIBLZ4 || FOLLY_HAVE_LIBLZMA 630 #if FOLLY_HAVE_LIBLZ4 632 #if LZ4_VERSION_NUMBER >= 10802 && defined(LZ4_STATIC_LINKING_ONLY) && \ 633 defined(LZ4_HC_STATIC_LINKING_ONLY) && !defined(FOLLY_USE_LZ4_FAST_RESET) 634 #define FOLLY_USE_LZ4_FAST_RESET 637 #ifdef FOLLY_USE_LZ4_FAST_RESET 639 void lz4_stream_t_deleter(LZ4_stream_t* ctx) {
643 void lz4_streamhc_t_deleter(LZ4_streamHC_t* ctx) {
644 LZ4_freeStreamHC(ctx);
652 class LZ4Codec final :
public Codec {
654 static std::unique_ptr<Codec> create(
int level,
CodecType type);
655 explicit LZ4Codec(
int level,
CodecType type);
662 bool encodeSize()
const {
671 #ifdef FOLLY_USE_LZ4_FAST_RESET 682 bool highCompression_;
685 std::unique_ptr<Codec> LZ4Codec::create(
int level,
CodecType type) {
686 return std::make_unique<LZ4Codec>(level,
type);
689 static int lz4ConvertLevel(
int level) {
699 throw std::invalid_argument(
700 to<std::string>(
"LZ4Codec: invalid level: ", level));
703 LZ4Codec::LZ4Codec(
int level,
CodecType type)
704 :
Codec(type, lz4ConvertLevel(level)),
705 highCompression_(lz4ConvertLevel(level) > 1) {
709 bool LZ4Codec::doNeedsUncompressedLength()
const {
710 return !encodeSize();
716 #ifndef LZ4_MAX_INPUT_SIZE 717 #define LZ4_MAX_INPUT_SIZE 0x7E000000 720 uint64_t LZ4Codec::doMaxUncompressedLength()
const {
721 return LZ4_MAX_INPUT_SIZE;
724 uint64_t LZ4Codec::doMaxCompressedLength(
uint64_t uncompressedLength)
const {
725 return LZ4_compressBound(uncompressedLength) +
729 std::unique_ptr<IOBuf> LZ4Codec::doCompress(
const IOBuf* data) {
739 encodeVarintToIOBuf(data->
length(), out.get());
743 auto input =
reinterpret_cast<const char*
>(data->
data());
744 auto output =
reinterpret_cast<char*
>(out->writableTail());
745 const auto inputLength = data->
length();
747 #ifdef FOLLY_USE_LZ4_FAST_RESET 748 if (!highCompression_ && !ctx) {
749 ctx.reset(LZ4_createStream());
751 if (highCompression_ && !hcctx) {
752 hcctx.reset(LZ4_createStreamHC());
755 if (highCompression_) {
756 n = LZ4_compress_HC_extStateHC_fastReset(
757 hcctx.get(), input,
output, inputLength, out->tailroom(), 0);
759 n = LZ4_compress_fast_extState_fastReset(
760 ctx.get(), input,
output, inputLength, out->tailroom(), 1);
762 #elif LZ4_VERSION_NUMBER >= 10700 763 if (highCompression_) {
764 n = LZ4_compress_HC(input,
output, inputLength, out->tailroom(), 0);
766 n = LZ4_compress_default(input,
output, inputLength, out->tailroom());
769 if (highCompression_) {
770 n = LZ4_compressHC(input,
output, inputLength);
772 n = LZ4_compress(input,
output, inputLength);
777 CHECK_LE(n, out->capacity());
783 std::unique_ptr<IOBuf> LZ4Codec::doUncompress(
796 actualUncompressedLength = decodeVarintFromCursor(cursor);
797 if (uncompressedLength && *uncompressedLength != actualUncompressedLength) {
798 throw std::runtime_error(
"LZ4Codec: invalid uncompressed length");
802 DCHECK(uncompressedLength.
hasValue());
804 actualUncompressedLength = *uncompressedLength;
809 int n = LZ4_decompress_safe(
811 reinterpret_cast<char*
>(out->writableTail()),
813 actualUncompressedLength);
815 if (n < 0 ||
uint64_t(n) != actualUncompressedLength) {
816 throw std::runtime_error(
817 to<std::string>(
"LZ4 decompression returned invalid value ", n));
819 out->append(actualUncompressedLength);
823 #if LZ4_VERSION_NUMBER >= 10301 825 class LZ4FrameCodec final :
public Codec {
827 static std::unique_ptr<Codec> create(
int level,
CodecType type);
828 explicit LZ4FrameCodec(
int level,
CodecType type);
829 ~LZ4FrameCodec()
override;
847 #ifdef FOLLY_USE_LZ4_FAST_RESET 848 LZ4F_compressionContext_t cctx_{
nullptr};
850 LZ4F_decompressionContext_t dctx_{
nullptr};
854 std::unique_ptr<Codec> LZ4FrameCodec::create(
857 return std::make_unique<LZ4FrameCodec>(level,
type);
860 static constexpr
uint32_t kLZ4FrameMagicLE = 0x184D2204;
862 std::vector<std::string> LZ4FrameCodec::validPrefixes()
const {
870 uint64_t LZ4FrameCodec::doMaxCompressedLength(
871 uint64_t uncompressedLength)
const {
872 LZ4F_preferences_t prefs{};
873 prefs.compressionLevel = level_;
874 prefs.frameInfo.contentSize = uncompressedLength;
875 return LZ4F_compressFrameBound(uncompressedLength, &prefs);
878 static size_t lz4FrameThrowOnError(
size_t code) {
879 if (LZ4F_isError(code)) {
880 throw std::runtime_error(
881 to<std::string>(
"LZ4Frame error: ", LZ4F_getErrorName(code)));
886 void LZ4FrameCodec::resetDCtx() {
887 if (dctx_ && !dirty_) {
891 LZ4F_freeDecompressionContext(dctx_);
893 lz4FrameThrowOnError(LZ4F_createDecompressionContext(&dctx_, 100));
897 static int lz4fConvertLevel(
int level) {
908 LZ4FrameCodec::LZ4FrameCodec(
int level,
CodecType type)
909 :
Codec(type, lz4fConvertLevel(level)), level_(lz4fConvertLevel(level)) {
913 LZ4FrameCodec::~LZ4FrameCodec() {
915 LZ4F_freeDecompressionContext(dctx_);
917 #ifdef FOLLY_USE_LZ4_FAST_RESET 919 LZ4F_freeCompressionContext(cctx_);
924 std::unique_ptr<IOBuf> LZ4FrameCodec::doCompress(
const IOBuf* data) {
932 #ifdef FOLLY_USE_LZ4_FAST_RESET 934 lz4FrameThrowOnError(LZ4F_createCompressionContext(&cctx_, LZ4F_VERSION));
939 const auto uncompressedLength = data->
length();
940 LZ4F_preferences_t prefs{};
941 prefs.compressionLevel = level_;
942 prefs.frameInfo.contentSize = uncompressedLength;
945 const size_t written = lz4FrameThrowOnError(
946 #ifdef FOLLY_USE_LZ4_FAST_RESET
947 LZ4F_compressFrame_usingCDict(
964 buf->append(written);
968 std::unique_ptr<IOBuf> LZ4FrameCodec::doUncompress(
982 LZ4F_decompressOptions_t options;
983 options.stableDst = 1;
986 auto blockSize =
uint64_t{64} << 10;
987 auto growthSize =
uint64_t{4} << 20;
988 if (uncompressedLength) {
990 const auto allocateSize =
std::min(*uncompressedLength,
uint64_t{64} << 20);
992 blockSize =
std::min(*uncompressedLength, blockSize);
993 growthSize =
std::min(*uncompressedLength, growthSize);
996 const auto guessUncompressedLen =
997 4 * std::max<uint64_t>(blockSize, in.
size());
998 growthSize =
std::min(guessUncompressedLen, growthSize);
1009 std::tie(out, outSize) = queue.
preallocate(blockSize, growthSize);
1011 size_t inSize = in.
size();
1012 code = lz4FrameThrowOnError(
1013 LZ4F_decompress(dctx_, out, &outSize, in.
data(), &inSize, &options));
1014 if (in.
empty() && outSize == 0 && code != 0) {
1017 throw std::runtime_error(
"LZ4Frame error: Incomplete frame");
1021 }
while (code != 0);
1024 if (uncompressedLength && queue.
chainLength() != *uncompressedLength) {
1025 throw std::runtime_error(
"LZ4Frame error: Invalid uncompressedLength");
1027 return queue.
move();
1030 #endif // LZ4_VERSION_NUMBER >= 10301 1031 #endif // FOLLY_HAVE_LIBLZ4 1033 #if FOLLY_HAVE_LIBSNAPPY 1042 class IOBufSnappySource final :
public snappy::Source {
1044 explicit IOBufSnappySource(
const IOBuf* data);
1045 size_t Available()
const override;
1046 const char* Peek(
size_t* len)
override;
1047 void Skip(
size_t n)
override;
1054 IOBufSnappySource::IOBufSnappySource(
const IOBuf* data)
1057 size_t IOBufSnappySource::Available()
const {
1061 const char* IOBufSnappySource::Peek(
size_t* len) {
1068 CHECK_LE(n, available_);
1073 class SnappyCodec final :
public Codec {
1075 static std::unique_ptr<Codec> create(
int level,
CodecType type);
1076 explicit SnappyCodec(
int level,
CodecType type);
1087 std::unique_ptr<Codec> SnappyCodec::create(
int level,
CodecType type) {
1088 return std::make_unique<SnappyCodec>(level,
type);
1091 SnappyCodec::SnappyCodec(
int level,
CodecType type) :
Codec(type) {
1100 throw std::invalid_argument(
1101 to<std::string>(
"SnappyCodec: invalid level: ", level));
1105 uint64_t SnappyCodec::doMaxUncompressedLength()
const {
1110 uint64_t SnappyCodec::doMaxCompressedLength(
uint64_t uncompressedLength)
const {
1111 return snappy::MaxCompressedLength(uncompressedLength);
1114 std::unique_ptr<IOBuf> SnappyCodec::doCompress(
const IOBuf* data) {
1115 IOBufSnappySource source(data);
1118 snappy::UncheckedByteArraySink sink(
1119 reinterpret_cast<char*>(out->writableTail()));
1121 size_t n = snappy::Compress(&source, &sink);
1123 CHECK_LE(n, out->capacity());
1128 std::unique_ptr<IOBuf> SnappyCodec::doUncompress(
1131 uint32_t actualUncompressedLength = 0;
1134 IOBufSnappySource source(data);
1135 if (!snappy::GetUncompressedLength(&source, &actualUncompressedLength)) {
1136 throw std::runtime_error(
"snappy::GetUncompressedLength failed");
1138 if (uncompressedLength && *uncompressedLength != actualUncompressedLength) {
1139 throw std::runtime_error(
"snappy: invalid uncompressed length");
1146 IOBufSnappySource source(data);
1147 if (!snappy::RawUncompress(
1148 &source, reinterpret_cast<char*>(out->writableTail()))) {
1149 throw std::runtime_error(
"snappy::RawUncompress failed");
1153 out->append(actualUncompressedLength);
1157 #endif // FOLLY_HAVE_LIBSNAPPY 1159 #if FOLLY_HAVE_LIBLZMA 1164 class LZMA2StreamCodec final :
public StreamCodec {
1166 static std::unique_ptr<Codec> createCodec(
int level,
CodecType type);
1167 static std::unique_ptr<StreamCodec> createStream(
int level,
CodecType type);
1168 explicit LZMA2StreamCodec(
int level,
CodecType type);
1169 ~LZMA2StreamCodec()
override;
1176 bool doNeedsDataLength()
const override;
1180 bool encodeSize()
const {
1184 void doResetStream()
override;
1185 bool doCompressStream(
1189 bool doUncompressStream(
1194 void resetCStream();
1195 void resetDStream();
1197 bool decodeAndCheckVarint(
ByteRange& input);
1199 void resetVarintBuffer();
1204 std::array<uint8_t, kMaxVarintLength64> varintBuffer_;
1206 size_t varintBufferPos_{0};
1209 bool needReset_{
true};
1210 bool needDecodeSize_{
false};
1213 static constexpr
uint64_t kLZMA2MagicLE = 0x005A587A37FD;
1214 static constexpr
unsigned kLZMA2MagicBytes = 6;
1216 std::vector<std::string> LZMA2StreamCodec::validPrefixes()
const {
1223 bool LZMA2StreamCodec::doNeedsDataLength()
const {
1224 return encodeSize();
1237 std::unique_ptr<Codec> LZMA2StreamCodec::createCodec(
1240 return make_unique<LZMA2StreamCodec>(level,
type);
1243 std::unique_ptr<StreamCodec> LZMA2StreamCodec::createStream(
1246 return make_unique<LZMA2StreamCodec>(level,
type);
1249 LZMA2StreamCodec::LZMA2StreamCodec(
int level,
CodecType type)
1257 level = LZMA_PRESET_DEFAULT;
1263 if (level < 0 || level > 9) {
1264 throw std::invalid_argument(
1265 to<std::string>(
"LZMA2Codec: invalid level: ", level));
1270 LZMA2StreamCodec::~LZMA2StreamCodec() {
1272 lzma_end(cstream_.get_pointer());
1276 lzma_end(dstream_.get_pointer());
1281 uint64_t LZMA2StreamCodec::doMaxUncompressedLength()
const {
1286 uint64_t LZMA2StreamCodec::doMaxCompressedLength(
1287 uint64_t uncompressedLength)
const {
1288 return lzma_stream_buffer_bound(uncompressedLength) +
1292 void LZMA2StreamCodec::doResetStream() {
1296 void LZMA2StreamCodec::resetCStream() {
1298 cstream_.assign(LZMA_STREAM_INIT);
1301 lzma_easy_encoder(cstream_.get_pointer(), level_, LZMA_CHECK_NONE);
1302 if (rc != LZMA_OK) {
1303 throw std::runtime_error(folly::to<std::string>(
1304 "LZMA2StreamCodec: lzma_easy_encoder error: ", rc));
1308 void LZMA2StreamCodec::resetDStream() {
1310 dstream_.assign(LZMA_STREAM_INIT);
1312 lzma_ret
const rc = lzma_auto_decoder(
1314 if (rc != LZMA_OK) {
1315 throw std::runtime_error(folly::to<std::string>(
1316 "LZMA2StreamCodec: lzma_auto_decoder error: ", rc));
1320 static lzma_ret lzmaThrowOnError(lzma_ret
const rc) {
1323 case LZMA_STREAM_END:
1324 case LZMA_BUF_ERROR:
1327 throw std::runtime_error(
1328 to<std::string>(
"LZMA2StreamCodec: error: ", rc));
1337 return LZMA_SYNC_FLUSH;
1341 throw std::invalid_argument(
"LZMA2StreamCodec: Invalid flush");
1351 if (varintToEncode_.empty()) {
1354 const size_t numBytesToCopy =
std::min(varintToEncode_.size(), output.
size());
1355 if (numBytesToCopy > 0) {
1356 memcpy(output.
data(), varintToEncode_.data(), numBytesToCopy);
1358 varintToEncode_.advance(numBytesToCopy);
1359 output.
advance(numBytesToCopy);
1360 return varintToEncode_.empty();
1363 bool LZMA2StreamCodec::doCompressStream(
1370 varintBufferPos_ = 0;
1371 size_t const varintSize =
1372 encodeVarint(*uncompressedLength(), varintBuffer_.data());
1373 varintToEncode_ = {varintBuffer_.data(), varintSize};
1378 if (!flushVarintBuffer(output)) {
1382 cstream_->next_in =
const_cast<uint8_t*
>(input.
data());
1383 cstream_->avail_in = input.
size();
1384 cstream_->next_out = output.
data();
1385 cstream_->avail_out = output.
size();
1390 lzma_ret
const rc = lzmaThrowOnError(
1391 lzma_code(cstream_.get_pointer(), lzmaTranslateFlush(flushOp)));
1396 return cstream_->avail_in == 0 && cstream_->avail_out != 0;
1398 return rc == LZMA_STREAM_END;
1400 throw std::invalid_argument(
"LZMA2StreamCodec: invalid FlushOp");
1417 bool LZMA2StreamCodec::decodeAndCheckVarint(
ByteRange& input) {
1418 if (input.
empty()) {
1421 size_t const numBytesToCopy =
1423 memcpy(varintBuffer_.data() + varintBufferPos_, input.
data(), numBytesToCopy);
1425 size_t const rangeSize = varintBufferPos_ + numBytesToCopy;
1429 if (ret.hasValue()) {
1430 size_t const varintSize = rangeSize -
range.size();
1431 input.
advance(varintSize - varintBufferPos_);
1432 if (uncompressedLength() && *uncompressedLength() != ret.value()) {
1433 throw std::runtime_error(
"LZMA2StreamCodec: invalid uncompressed length");
1437 throw std::runtime_error(
"LZMA2StreamCodec: invalid uncompressed length");
1440 input.
advance(numBytesToCopy);
1441 varintBufferPos_ += numBytesToCopy;
1446 bool LZMA2StreamCodec::doUncompressStream(
1453 needDecodeSize_ = encodeSize();
1456 varintBufferPos_ = 0;
1460 if (needDecodeSize_) {
1463 if (!decodeAndCheckVarint(input)) {
1466 needDecodeSize_ =
false;
1469 dstream_->next_in =
const_cast<uint8_t*
>(input.
data());
1470 dstream_->avail_in = input.
size();
1471 dstream_->next_out = output.
data();
1472 dstream_->avail_out = output.
size();
1475 output.
advance(output.
size() - dstream_->avail_out);
1482 rc = lzmaThrowOnError(lzma_code(dstream_.get_pointer(), LZMA_RUN));
1485 rc = lzmaThrowOnError(lzma_code(dstream_.get_pointer(), LZMA_FINISH));
1488 throw std::invalid_argument(
"LZMA2StreamCodec: invalid flush");
1490 return rc == LZMA_STREAM_END;
1492 #endif // FOLLY_HAVE_LIBLZMA 1494 #if FOLLY_HAVE_LIBZSTD 1496 static int zstdConvertLevel(
int level) {
1505 if (level < 1 || level > ZSTD_maxCLevel()) {
1506 throw std::invalid_argument(
1507 to<std::string>(
"ZSTD: invalid level: ", level));
1512 static int zstdFastConvertLevel(
int level) {
1522 throw std::invalid_argument(
1523 to<std::string>(
"ZSTD: invalid level: ", level));
1528 std::unique_ptr<Codec> getZstdCodec(
int level,
CodecType type) {
1533 std::unique_ptr<StreamCodec> getZstdStreamCodec(
int level,
CodecType type) {
1538 std::unique_ptr<Codec> getZstdFastCodec(
int level,
CodecType type) {
1540 return zstd::getCodec(zstd::Options(zstdFastConvertLevel(level)));
1543 std::unique_ptr<StreamCodec> getZstdFastStreamCodec(
int level,
CodecType type) {
1548 #endif // FOLLY_HAVE_LIBZSTD 1550 #if FOLLY_HAVE_LIBBZ2 1552 class Bzip2StreamCodec final :
public StreamCodec {
1554 static std::unique_ptr<Codec> createCodec(
int level,
CodecType type);
1555 static std::unique_ptr<StreamCodec> createStream(
int level,
CodecType type);
1556 explicit Bzip2StreamCodec(
int level,
CodecType type);
1558 ~Bzip2StreamCodec()
override;
1567 void doResetStream()
override;
1568 bool doCompressStream(
1572 bool doUncompressStream(
1577 void resetCStream();
1578 void resetDStream();
1584 bool needReset_{
true};
1587 std::unique_ptr<Codec> Bzip2StreamCodec::createCodec(
1590 return createStream(level, type);
1593 std::unique_ptr<StreamCodec> Bzip2StreamCodec::createStream(
1596 return std::make_unique<Bzip2StreamCodec>(level,
type);
1599 Bzip2StreamCodec::Bzip2StreamCodec(
int level,
CodecType type)
1613 if (level < 1 || level > 9) {
1614 throw std::invalid_argument(
1615 to<std::string>(
"Bzip2: invalid level: ", level));
1620 static uint32_t constexpr kBzip2MagicLE = 0x685a42;
1621 static uint64_t constexpr kBzip2MagicBytes = 3;
1623 std::vector<std::string> Bzip2StreamCodec::validPrefixes()
const {
1632 uint64_t Bzip2StreamCodec::doMaxCompressedLength(
1633 uint64_t uncompressedLength)
const {
1638 return uncompressedLength + uncompressedLength / 100 + 600;
1641 static bz_stream createBzStream() {
1643 stream.bzalloc =
nullptr;
1644 stream.bzfree =
nullptr;
1645 stream.opaque =
nullptr;
1646 stream.next_in = stream.next_out =
nullptr;
1647 stream.avail_in = stream.avail_out = 0;
1652 static int bzCheck(
int const rc) {
1661 case BZ_PARAM_ERROR:
1664 throw std::runtime_error(to<std::string>(
"Bzip2 error: ", rc));
1668 Bzip2StreamCodec::~Bzip2StreamCodec() {
1670 BZ2_bzCompressEnd(cstream_.get_pointer());
1674 BZ2_bzDecompressEnd(dstream_.get_pointer());
1679 void Bzip2StreamCodec::doResetStream() {
1683 void Bzip2StreamCodec::resetCStream() {
1685 BZ2_bzCompressEnd(cstream_.get_pointer());
1687 cstream_ = createBzStream();
1688 bzCheck(BZ2_bzCompressInit(cstream_.get_pointer(), level_, 0, 0));
1698 throw std::invalid_argument(
1699 "Bzip2StreamCodec: FlushOp::FLUSH not supported");
1701 throw std::invalid_argument(
"Bzip2StreamCodec: Invalid flush");
1705 bool Bzip2StreamCodec::doCompressStream(
1718 const_cast<char*
>(
reinterpret_cast<const char*
>(input.
data()));
1719 cstream_->avail_in = input.
size();
1720 cstream_->next_out =
reinterpret_cast<char*
>(output.
data());
1721 cstream_->avail_out = output.
size();
1726 int const rc = bzCheck(
1727 BZ2_bzCompress(cstream_.get_pointer(), bzip2TranslateFlush(flushOp)));
1732 if (rc == BZ_RUN_OK) {
1733 DCHECK_EQ(cstream_->avail_in, 0);
1734 DCHECK(input.
size() == 0 || cstream_->avail_out != output.
size());
1739 return rc == BZ_STREAM_END;
1741 throw std::invalid_argument(
"Bzip2StreamCodec: invalid FlushOp");
1746 void Bzip2StreamCodec::resetDStream() {
1748 BZ2_bzDecompressEnd(dstream_.get_pointer());
1750 dstream_ = createBzStream();
1751 bzCheck(BZ2_bzDecompressInit(dstream_.get_pointer(), 0, 0));
1754 bool Bzip2StreamCodec::doUncompressStream(
1759 throw std::invalid_argument(
1760 "Bzip2StreamCodec: FlushOp::FLUSH not supported");
1768 const_cast<char*
>(
reinterpret_cast<const char*
>(input.
data()));
1769 dstream_->avail_in = input.
size();
1770 dstream_->next_out =
reinterpret_cast<char*
>(output.
data());
1771 dstream_->avail_out = output.
size();
1776 int const rc = bzCheck(BZ2_bzDecompress(dstream_.get_pointer()));
1777 return rc == BZ_STREAM_END;
1780 #endif // FOLLY_HAVE_LIBBZ2 1784 zlib::Options getZlibOptions(
CodecType type) {
1787 : zlib::defaultZlibOptions();
1790 std::unique_ptr<Codec> getZlibCodec(
int level,
CodecType type) {
1794 std::unique_ptr<StreamCodec> getZlibStreamCodec(
int level,
CodecType type) {
1798 #endif // FOLLY_HAVE_LIBZ 1803 class AutomaticCodec final :
public Codec {
1805 static std::unique_ptr<Codec> create(
1806 std::vector<std::unique_ptr<Codec>> customCodecs,
1807 std::unique_ptr<Codec> terminalCodec);
1808 explicit AutomaticCodec(
1809 std::vector<std::unique_ptr<Codec>> customCodecs,
1810 std::unique_ptr<Codec> terminalCodec);
1821 throw std::runtime_error(
1822 "AutomaticCodec error: maxCompressedLength() not supported.");
1825 throw std::runtime_error(
"AutomaticCodec error: compress() not supported.");
1831 void addCodecIfSupported(
CodecType type);
1834 void checkCompatibleCodecs()
const;
1842 std::vector<std::string> AutomaticCodec::validPrefixes()
const {
1843 std::unordered_set<std::string> prefixes;
1845 const auto codecPrefixes =
codec->validPrefixes();
1846 prefixes.insert(codecPrefixes.begin(), codecPrefixes.end());
1848 return std::vector<std::string>{prefixes.begin(), prefixes.end()};
1851 bool AutomaticCodec::canUncompress(
1853 Optional<uint64_t> uncompressedLength)
const {
1857 [
data, uncompressedLength](std::unique_ptr<Codec>
const&
codec) {
1858 return codec->canUncompress(data, uncompressedLength);
1862 void AutomaticCodec::addCodecIfSupported(
CodecType type) {
1863 const bool present = std::any_of(
1866 [&
type](std::unique_ptr<Codec>
const&
codec) {
1870 if (
hasCodec(type) && !present && !isTerminalType) {
1875 std::unique_ptr<Codec> AutomaticCodec::create(
1876 std::vector<std::unique_ptr<Codec>> customCodecs,
1877 std::unique_ptr<Codec> terminalCodec) {
1878 return std::make_unique<AutomaticCodec>(
1882 AutomaticCodec::AutomaticCodec(
1883 std::vector<std::unique_ptr<Codec>> customCodecs,
1884 std::unique_ptr<Codec> terminalCodec)
1889 std::array<CodecType, 6> defaultTypes{{
1898 for (
auto type : defaultTypes) {
1899 addCodecIfSupported(type);
1903 checkCompatibleCodecs();
1907 DCHECK(std::none_of(
1909 return codec ==
nullptr;
1915 DCHECK(std::none_of(
1918 [&](std::unique_ptr<Codec>
const&
codec) {
1924 bool const terminalNeedsUncompressedLength =
1929 [](std::unique_ptr<Codec>
const&
codec) {
1930 return codec->needsUncompressedLength();
1932 terminalNeedsUncompressedLength;
1934 const auto it = std::max_element(
1937 [](std::unique_ptr<Codec>
const& lhs, std::unique_ptr<Codec>
const&
rhs) {
1938 return lhs->maxUncompressedLength() <
rhs->maxUncompressedLength();
1941 auto const terminalMaxUncompressedLength =
1944 std::max((*it)->maxUncompressedLength(), terminalMaxUncompressedLength);
1947 void AutomaticCodec::checkCompatibleCodecs()
const {
1949 std::unordered_set<std::string> headers;
1956 const auto codecHeaders =
codec->validPrefixes();
1958 if (codecHeaders.empty()) {
1959 throw std::invalid_argument{
1960 "AutomaticCodec: validPrefixes() must not be empty."};
1963 const size_t beforeSize = headers.size();
1964 headers.insert(codecHeaders.begin(), codecHeaders.end());
1966 if (beforeSize + codecHeaders.size() != headers.size()) {
1967 throw std::invalid_argument{
1968 "AutomaticCodec: Two valid prefixes collide."};
1973 for (
const auto& header : headers) {
1974 for (
size_t i = 1;
i < header.size(); ++
i) {
1975 if (headers.count(header.substr(0,
i))) {
1976 throw std::invalid_argument{
1977 "AutomaticCodec: One valid prefix is a prefix of another valid " 1984 bool AutomaticCodec::doNeedsUncompressedLength()
const {
1988 uint64_t AutomaticCodec::doMaxUncompressedLength()
const {
1992 std::unique_ptr<IOBuf> AutomaticCodec::doUncompress(
1994 Optional<uint64_t> uncompressedLength) {
1997 if (
codec->canUncompress(data, uncompressedLength)) {
1998 return codec->uncompress(data, uncompressedLength);
2001 }
catch (std::exception
const& e) {
2012 throw std::runtime_error(
"AutomaticCodec error: Unknown compressed data");
2015 using CodecFactory = std::unique_ptr<Codec> (*)(int,
CodecType);
2016 using StreamCodecFactory = std::unique_ptr<StreamCodec> (*)(int,
CodecType);
2025 {NoCompressionCodec::create,
nullptr},
2027 #if FOLLY_HAVE_LIBLZ4 2028 {LZ4Codec::create,
nullptr},
2033 #if FOLLY_HAVE_LIBSNAPPY 2034 {SnappyCodec::create,
nullptr},
2040 {getZlibCodec, getZlibStreamCodec},
2045 #if FOLLY_HAVE_LIBLZ4 2046 {LZ4Codec::create,
nullptr},
2051 #if FOLLY_HAVE_LIBLZMA 2052 {LZMA2StreamCodec::createCodec, LZMA2StreamCodec::createStream},
2053 {LZMA2StreamCodec::createCodec, LZMA2StreamCodec::createStream},
2059 #if FOLLY_HAVE_LIBZSTD 2060 {getZstdCodec, getZstdStreamCodec},
2066 {getZlibCodec, getZlibStreamCodec},
2071 #if (FOLLY_HAVE_LIBLZ4 && LZ4_VERSION_NUMBER >= 10301) 2072 {LZ4FrameCodec::create,
nullptr},
2077 #if FOLLY_HAVE_LIBBZ2 2078 {Bzip2StreamCodec::createCodec, Bzip2StreamCodec::createStream},
2083 #if FOLLY_HAVE_LIBZSTD 2084 {getZstdFastCodec, getZstdFastStreamCodec},
2090 Factory
const& getFactory(
CodecType type) {
2091 size_t const idx =
static_cast<size_t>(
type);
2093 throw std::invalid_argument(
2094 to<std::string>(
"Compression type ", idx,
" invalid"));
2096 return codecFactories[idx];
2101 return getFactory(type).codec !=
nullptr;
2105 auto const factory = getFactory(type).codec;
2107 throw std::invalid_argument(
2108 to<std::string>(
"Compression type ", type,
" not supported"));
2116 return getFactory(type).stream !=
nullptr;
2120 auto const factory = getFactory(type).stream;
2122 throw std::invalid_argument(
2123 to<std::string>(
"Compression type ", type,
" not supported"));
2131 std::vector<std::unique_ptr<Codec>> customCodecs,
2132 std::unique_ptr<Codec> terminalCodec) {
2133 return AutomaticCodec::create(
std::vector< uint8_t > buffer(kBufferSize+16)
constexpr int COMPRESSION_LEVEL_DEFAULT
virtual std::string doUncompressString(StringPiece data, folly::Optional< uint64_t > uncompressedLength)
virtual bool doNeedsDataLength() const
bool hasCodec(CodecType type)
virtual std::unique_ptr< IOBuf > doUncompress(const folly::IOBuf *data, folly::Optional< uint64_t > uncompressedLength)=0
size_t chainLength() const
bool uncompressStream(folly::ByteRange &input, folly::MutableByteRange &output, FlushOp flushOp=StreamCodec::FlushOp::NONE)
static std::unique_ptr< IOBuf > create(std::size_t capacity)
virtual bool canUncompress(const folly::IOBuf *data, folly::Optional< uint64_t > uncompressedLength=folly::none) const
StreamCodecFactory stream
folly::detail::CompressionCounter compressionMilliseconds_
bool needsUncompressedLength_
bool hasStreamCodec(CodecType type)
constexpr size_t kMaxVarintLength64
constexpr detail::Map< Move > move
void advance(size_type n)
constexpr size_type size() const
const uint8_t * data() const
std::unique_ptr< folly::IOBuf > move()
static bool oneIn(uint32_t n)
—— Concurrent Priority Queue Implementation ——
std::unique_ptr< IOBuf > clone() const
folly::detail::CompressionCounter bytesAfterCompression_
std::vector< std::unique_ptr< Codec > > codecs_
std::unique_ptr< IOBuf > compress(const folly::IOBuf *data)
static uint64_t computeBufferLength(uint64_t const compressedLength, uint64_t const blockSize)
std::unique_ptr< Codec > terminalCodec_
stop_watch< std::chrono::milliseconds > timer_
folly::detail::CompressionCounter bytesBeforeDecompression_
std::size_t tailroom() const
FOLLY_PUSH_WARNING RHS rhs
std::enable_if< std::is_arithmetic< T >::value, std::string >::type prefixToStringLE(T prefix, uint64_t n=sizeof(T))
std::pair< void *, std::size_t > preallocate(std::size_t min, std::size_t newAllocationSize, std::size_t max=std::numeric_limits< std::size_t >::max())
Codec(CodecType type, folly::Optional< int > level=folly::none, folly::StringPiece name={}, bool counters=true)
static std::unique_ptr< IOBuf > addOutputBuffer(MutableByteRange &output, uint64_t size)
virtual uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const =0
virtual folly::Optional< uint64_t > doGetUncompressedLength(const folly::IOBuf *data, folly::Optional< uint64_t > uncompressedLength) const
folly::detail::CompressionCounter decompressions_
constexpr auto size(C const &c) -> decltype(c.size())
constexpr bool empty() const
virtual std::string doCompressString(StringPiece data)
static constexpr uint64_t UNLIMITED_UNCOMPRESSED_LENGTH
virtual std::vector< std::string > validPrefixes() const
std::unique_ptr< IOBuf > uncompress(const IOBuf *data, folly::Optional< uint64_t > uncompressedLength=folly::none)
static Options cacheChainLength()
std::unique_ptr< StreamCodec > getStreamCodec(CodecType type, int level)
virtual uint64_t doMaxUncompressedLength() const
constexpr Iter data() const
constexpr auto data(C &c) -> decltype(c.data())
std::size_t length() const
virtual bool doNeedsUncompressedLength() const
FOLLY_CPP14_CONSTEXPR bool hasValue() const noexcept
constexpr Range< Iter > range(Iter first, Iter last)
uint64_t maxUncompressedLength_
folly::detail::CompressionCounter bytesBeforeCompression_
folly::detail::CompressionCounter bytesAfterDecompression_
constexpr int COMPRESSION_LEVEL_BEST
bool needsDataLength() const
FOLLY_CPP14_CONSTEXPR Value value_or(U &&dflt) const &
folly::detail::CompressionCounter decompressionMilliseconds_
uint64_t maxCompressedLength(uint64_t uncompressedLength) const
std::size_t computeChainDataLength() const
std::atomic< int > counter
virtual std::unique_ptr< IOBuf > doCompress(const folly::IOBuf *data)=0
void uncheckedAdvance(size_type n)
bool compressStream(folly::ByteRange &input, folly::MutableByteRange &output, FlushOp flushOp=StreamCodec::FlushOp::NONE)
std::unique_ptr< IOBuf > doUncompress(const folly::IOBuf *data, folly::Optional< uint64_t > uncompressedLength) override
folly::detail::CompressionCounter compressions_
void resetStream(folly::Optional< uint64_t > uncompressedLength=folly::none)
IOBuf cloneCoalescedAsValue() const
Wrapper around the makeCompressionCounterHandler() extension point.
folly::detail::CompressionCounter * counter_
std::enable_if< std::is_unsigned< T >::value, bool >::type dataStartsWithLE(const IOBuf *data, T prefix, uint64_t n=sizeof(T))
std::unique_ptr< Codec > getAutoUncompressionCodec(std::vector< std::unique_ptr< Codec >> customCodecs, std::unique_ptr< Codec > terminalCodec)
uint64_t maxUncompressedLength() const
void postallocate(std::size_t n)
size_t encodeVarint(uint64_t val, uint8_t *buf)
constexpr int COMPRESSION_LEVEL_FASTEST
std::unique_ptr< Codec > getCodec(CodecType type, int level)
void assertStateIs(State expected) const
Expected< uint64_t, DecodeVarintError > tryDecodeVarint(Range< T * > &data)
std::unique_ptr< IOBuf > doCompress(const folly::IOBuf *data) override
folly::Optional< uint64_t > getUncompressedLength(const folly::IOBuf *data, folly::Optional< uint64_t > uncompressedLength=folly::none) const
bool needsUncompressedLength() const
void append(std::size_t amount)