proxygen
CompressionTest.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2013-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include <algorithm>
20 #include <random>
21 #include <set>
22 #include <thread>
23 #include <unordered_map>
24 #include <utility>
25 
26 #include <boost/noncopyable.hpp>
27 #include <glog/logging.h>
28 
29 #include <folly/Random.h>
30 #include <folly/Varint.h>
31 #include <folly/hash/Hash.h>
32 #include <folly/io/IOBufQueue.h>
34 
35 #if FOLLY_HAVE_LIBZSTD
36 #include <zstd.h>
37 
38 #include <folly/compression/Zstd.h>
39 #endif
40 
41 #if FOLLY_HAVE_LIBZ
42 #include <folly/compression/Zlib.h>
43 
44 namespace zlib = folly::io::zlib;
45 #endif
46 
47 namespace folly {
48 namespace io {
49 namespace test {
50 
51 class DataHolder : private boost::noncopyable {
52  public:
53  uint64_t hash(size_t size) const;
54  ByteRange data(size_t size) const;
55 
56  protected:
57  explicit DataHolder(size_t sizeLog2);
58  const size_t size_;
59  std::unique_ptr<uint8_t[]> data_;
60  mutable std::unordered_map<uint64_t, uint64_t> hashCache_;
61 };
62 
63 DataHolder::DataHolder(size_t sizeLog2)
64  : size_(size_t(1) << sizeLog2), data_(new uint8_t[size_]) {}
65 
67  CHECK_LE(size, size_);
68  auto p = hashCache_.find(size);
69  if (p != hashCache_.end()) {
70  return p->second;
71  }
72 
74  hashCache_[size] = h;
75  return h;
76 }
77 
79  CHECK_LE(size, size_);
80  return ByteRange(data_.get(), size);
81 }
82 
83 uint64_t hashIOBuf(const IOBuf* buf) {
85  for (auto& range : *buf) {
86  h = folly::hash::fnv64_buf(range.data(), range.size(), h);
87  }
88  return h;
89 }
90 
91 class RandomDataHolder : public DataHolder {
92  public:
93  explicit RandomDataHolder(size_t sizeLog2);
94 };
95 
96 RandomDataHolder::RandomDataHolder(size_t sizeLog2) : DataHolder(sizeLog2) {
97  static constexpr size_t numThreadsLog2 = 3;
98  static constexpr size_t numThreads = size_t(1) << numThreadsLog2;
99 
101 
102  std::vector<std::thread> threads;
103  threads.reserve(numThreads);
104  for (size_t t = 0; t < numThreads; ++t) {
105  threads.emplace_back([this, seed, t, sizeLog2] {
106  std::mt19937 rng(seed + t);
107  size_t countLog2 = sizeLog2 - numThreadsLog2;
108  size_t start = size_t(t) << countLog2;
109  for (size_t i = 0; i < countLog2; ++i) {
110  this->data_[start + i] = rng();
111  }
112  });
113  }
114 
115  for (auto& t : threads) {
116  t.join();
117  }
118 }
119 
121  public:
122  explicit ConstantDataHolder(size_t sizeLog2);
123 };
124 
126  memset(data_.get(), 'a', size_);
127 }
128 
129 constexpr size_t dataSizeLog2 = 27; // 128MiB
130 RandomDataHolder randomDataHolder(dataSizeLog2);
132 
133 // The intersection of the provided codecs & those that are compiled in.
134 static std::vector<CodecType> supportedCodecs(std::vector<CodecType> const& v) {
135  std::vector<CodecType> supported;
136 
137  std::copy_if(
138  std::begin(v), std::end(v), std::back_inserter(supported), hasCodec);
139 
140  return supported;
141 }
142 
143 // All compiled-in compression codecs.
144 static std::vector<CodecType> availableCodecs() {
145  std::vector<CodecType> codecs;
146 
147  for (size_t i = 0; i < static_cast<size_t>(CodecType::NUM_CODEC_TYPES); ++i) {
148  auto type = static_cast<CodecType>(i);
149  if (hasCodec(type)) {
150  codecs.push_back(type);
151  }
152  }
153 
154  return codecs;
155 }
156 
157 static std::vector<CodecType> availableStreamCodecs() {
158  std::vector<CodecType> codecs;
159 
160  for (size_t i = 0; i < static_cast<size_t>(CodecType::NUM_CODEC_TYPES); ++i) {
161  auto type = static_cast<CodecType>(i);
162  if (hasStreamCodec(type)) {
163  codecs.push_back(type);
164  }
165  }
166 
167  return codecs;
168 }
169 
170 TEST(CompressionTestNeedsUncompressedLength, Simple) {
171  static const struct {
172  CodecType type;
173  bool needsUncompressedLength;
174  } expectations[] = {
175  {CodecType::NO_COMPRESSION, false},
176  {CodecType::LZ4, true},
177  {CodecType::SNAPPY, false},
178  {CodecType::ZLIB, false},
180  {CodecType::LZMA2, false},
182  {CodecType::ZSTD, false},
183  {CodecType::GZIP, false},
184  {CodecType::LZ4_FRAME, false},
185  {CodecType::BZIP2, false},
186  };
187 
188  for (auto const& test : expectations) {
189  if (hasCodec(test.type)) {
190  EXPECT_EQ(
191  getCodec(test.type)->needsUncompressedLength(),
192  test.needsUncompressedLength);
193  }
194  }
195 }
196 
198  : public testing::TestWithParam<std::tuple<int, int, CodecType>> {
199  protected:
200  void SetUp() override {
201  auto tup = GetParam();
202  int lengthLog = std::get<0>(tup);
203  // Small hack to test empty data
204  uncompressedLength_ = (lengthLog < 0) ? 0 : uint64_t(1) << std::get<0>(tup);
205  chunks_ = std::get<1>(tup);
206  codec_ = getCodec(std::get<2>(tup));
207  }
208 
209  void runSimpleIOBufTest(const DataHolder& dh);
210 
211  void runSimpleStringTest(const DataHolder& dh);
212 
213  private:
214  std::unique_ptr<IOBuf> split(std::unique_ptr<IOBuf> data) const;
215 
217  size_t chunks_;
218  std::unique_ptr<Codec> codec_;
219 };
220 
222  const auto original = split(IOBuf::wrapBuffer(dh.data(uncompressedLength_)));
223  const auto compressed = split(codec_->compress(original.get()));
224  EXPECT_LE(
225  compressed->computeChainDataLength(),
226  codec_->maxCompressedLength(uncompressedLength_));
227  if (!codec_->needsUncompressedLength()) {
228  auto uncompressed = codec_->uncompress(compressed.get());
229  EXPECT_EQ(uncompressedLength_, uncompressed->computeChainDataLength());
230  EXPECT_EQ(dh.hash(uncompressedLength_), hashIOBuf(uncompressed.get()));
231  }
232  {
233  auto uncompressed =
234  codec_->uncompress(compressed.get(), uncompressedLength_);
235  EXPECT_EQ(uncompressedLength_, uncompressed->computeChainDataLength());
236  EXPECT_EQ(dh.hash(uncompressedLength_), hashIOBuf(uncompressed.get()));
237  }
238 }
239 
241  const auto original = std::string(
242  reinterpret_cast<const char*>(dh.data(uncompressedLength_).data()),
243  uncompressedLength_);
244  const auto compressed = codec_->compress(original);
245  EXPECT_LE(
246  compressed.length(), codec_->maxCompressedLength(uncompressedLength_));
247 
248  if (!codec_->needsUncompressedLength()) {
249  auto uncompressed = codec_->uncompress(compressed);
250  EXPECT_EQ(uncompressedLength_, uncompressed.length());
251  EXPECT_EQ(uncompressed, original);
252  }
253  {
254  auto uncompressed = codec_->uncompress(compressed, uncompressedLength_);
255  EXPECT_EQ(uncompressedLength_, uncompressed.length());
256  EXPECT_EQ(uncompressed, original);
257  }
258 }
259 
260 // Uniformly split data into (potentially empty) chunks.
261 std::unique_ptr<IOBuf> CompressionTest::split(
262  std::unique_ptr<IOBuf> data) const {
263  if (data->isChained()) {
264  data->coalesce();
265  }
266 
267  const size_t size = data->computeChainDataLength();
268 
269  std::multiset<size_t> splits;
270  for (size_t i = 1; i < chunks_; ++i) {
271  splits.insert(Random::rand64(size));
272  }
273 
274  folly::IOBufQueue result;
275 
276  size_t offset = 0;
277  for (size_t split : splits) {
278  result.append(IOBuf::copyBuffer(data->data() + offset, split - offset));
279  offset = split;
280  }
281  result.append(IOBuf::copyBuffer(data->data() + offset, size - offset));
282 
283  return result.move();
284 }
285 
286 TEST_P(CompressionTest, RandomData) {
287  runSimpleIOBufTest(randomDataHolder);
288 }
289 
290 TEST_P(CompressionTest, ConstantData) {
291  runSimpleIOBufTest(constantDataHolder);
292 }
293 
294 TEST_P(CompressionTest, RandomDataString) {
295  runSimpleStringTest(randomDataHolder);
296 }
297 
298 TEST_P(CompressionTest, ConstantDataString) {
299  runSimpleStringTest(constantDataHolder);
300 }
301 
305  testing::Combine(
306  testing::Values(-1, 0, 1, 12, 22, 25, 27),
307  testing::Values(1, 2, 3, 8, 65),
308  testing::ValuesIn(availableCodecs())));
309 
311  : public testing::TestWithParam<std::tuple<int, CodecType>> {
312  protected:
313  void SetUp() override {
314  auto tup = GetParam();
315  uncompressedLength_ = uint64_t(1) << std::get<0>(tup);
316  codec_ = getCodec(std::get<1>(tup));
317  }
318 
319  void runSimpleTest(const DataHolder& dh);
320 
322  std::unique_ptr<Codec> codec_;
323 };
324 
326  uint64_t pos = 0;
327  for (; number > 0; ++pos, number >>= 1) {
328  }
329  return pos;
330 }
331 
333  auto original = IOBuf::wrapBuffer(dh.data(uncompressedLength_));
334  auto compressed = codec_->compress(original.get());
335  auto breakPoint =
336  1UL +
338  std::max(uint64_t(9), oneBasedMsbPos(uncompressedLength_)) / 9UL);
339  auto tinyBuf = IOBuf::copyBuffer(
340  compressed->data(), std::min(compressed->length(), breakPoint));
341  compressed->trimStart(breakPoint);
342  tinyBuf->prependChain(std::move(compressed));
343  compressed = std::move(tinyBuf);
344 
345  auto uncompressed = codec_->uncompress(compressed.get());
346 
347  EXPECT_EQ(uncompressedLength_, uncompressed->computeChainDataLength());
348  EXPECT_EQ(dh.hash(uncompressedLength_), hashIOBuf(uncompressed.get()));
349 }
350 
352  runSimpleTest(randomDataHolder);
353 }
354 
356  runSimpleTest(constantDataHolder);
357 }
358 
362  testing::Combine(
363  testing::Values(0, 1, 12, 22, 25, 27),
364  testing::ValuesIn(supportedCodecs({
367  }))));
368 
369 TEST(LZMATest, UncompressBadVarint) {
371  std::string const str(kMaxVarintLength64 * 2, '\xff');
372  ByteRange input((folly::StringPiece(str)));
374  auto buffer = IOBuf::create(16);
375  buffer->append(buffer->capacity());
376  MutableByteRange output{buffer->writableData(), buffer->length()};
377  EXPECT_THROW(codec->uncompressStream(input, output), std::runtime_error);
378  }
379 }
380 
381 class CompressionCorruptionTest : public testing::TestWithParam<CodecType> {
382  protected:
383  void SetUp() override {
384  codec_ = getCodec(GetParam());
385  }
386 
387  void runSimpleTest(const DataHolder& dh);
388 
389  std::unique_ptr<Codec> codec_;
390 };
391 
393  constexpr uint64_t uncompressedLength = 42;
394  auto original = IOBuf::wrapBuffer(dh.data(uncompressedLength));
395  auto compressed = codec_->compress(original.get());
396 
397  if (!codec_->needsUncompressedLength()) {
398  auto uncompressed = codec_->uncompress(compressed.get());
399  EXPECT_EQ(uncompressedLength, uncompressed->computeChainDataLength());
400  EXPECT_EQ(dh.hash(uncompressedLength), hashIOBuf(uncompressed.get()));
401  }
402  {
403  auto uncompressed =
404  codec_->uncompress(compressed.get(), uncompressedLength);
405  EXPECT_EQ(uncompressedLength, uncompressed->computeChainDataLength());
406  EXPECT_EQ(dh.hash(uncompressedLength), hashIOBuf(uncompressed.get()));
407  }
408 
409  EXPECT_THROW(
410  codec_->uncompress(compressed.get(), uncompressedLength + 1),
411  std::runtime_error);
412 
413  auto corrupted = compressed->clone();
414  corrupted->unshare();
415  // Truncate the last character
416  corrupted->prev()->trimEnd(1);
417  if (!codec_->needsUncompressedLength()) {
418  EXPECT_THROW(codec_->uncompress(corrupted.get()), std::runtime_error);
419  }
420 
421  EXPECT_THROW(
422  codec_->uncompress(corrupted.get(), uncompressedLength),
423  std::runtime_error);
424 
425  corrupted = compressed->clone();
426  corrupted->unshare();
427  // Corrupt the first character
428  ++(corrupted->writableData()[0]);
429 
430  if (!codec_->needsUncompressedLength()) {
431  EXPECT_THROW(codec_->uncompress(corrupted.get()), std::runtime_error);
432  }
433 
434  EXPECT_THROW(
435  codec_->uncompress(corrupted.get(), uncompressedLength),
436  std::runtime_error);
437 }
438 
440  runSimpleTest(randomDataHolder);
441 }
442 
444  runSimpleTest(constantDataHolder);
445 }
446 
450  testing::ValuesIn(
451  // NO_COMPRESSION can't detect corruption
452  // LZ4 can't detect corruption reliably (sigh)
460  })));
461 
463  return type != CodecType::BZIP2;
464 }
465 
466 class StreamingUnitTest : public testing::TestWithParam<CodecType> {
467  protected:
468  void SetUp() override {
469  codec_ = getStreamCodec(GetParam());
470  }
471 
472  bool hasFlush() const {
473  return codecHasFlush(GetParam());
474  }
475 
476  std::unique_ptr<StreamCodec> codec_;
477 };
478 
479 TEST(StreamingUnitTest, needsDataLength) {
480  static const struct {
481  CodecType type;
482  bool needsDataLength;
483  } expectations[] = {
484  {CodecType::ZLIB, false},
485  {CodecType::GZIP, false},
486  {CodecType::LZMA2, false},
488  {CodecType::ZSTD, false},
489  };
490 
491  for (auto const& test : expectations) {
492  if (hasStreamCodec(test.type)) {
493  EXPECT_EQ(
494  getStreamCodec(test.type)->needsDataLength(), test.needsDataLength);
495  }
496  }
497 }
498 
499 TEST_P(StreamingUnitTest, maxCompressedLength) {
500  for (uint64_t const length : {1, 10, 100, 1000, 10000, 100000, 1000000}) {
501  EXPECT_GE(codec_->maxCompressedLength(length), length);
502  }
503 }
504 
505 TEST_P(StreamingUnitTest, getUncompressedLength) {
506  auto const empty = IOBuf::create(0);
507  EXPECT_EQ(uint64_t(0), codec_->getUncompressedLength(empty.get()));
508  EXPECT_EQ(uint64_t(0), codec_->getUncompressedLength(empty.get(), 0));
509  EXPECT_ANY_THROW(codec_->getUncompressedLength(empty.get(), 1));
510 
511  auto const data = IOBuf::wrapBuffer(randomDataHolder.data(100));
512  auto const compressed = codec_->compress(data.get());
513 
514  if (auto const length = codec_->getUncompressedLength(data.get())) {
515  EXPECT_EQ(100, *length);
516  }
517  EXPECT_EQ(uint64_t(100), codec_->getUncompressedLength(data.get(), 100));
518  // If the uncompressed length is stored in the frame, then make sure it throws
519  // when it is given the wrong length.
520  if (codec_->getUncompressedLength(data.get()) == uint64_t(100)) {
521  EXPECT_ANY_THROW(codec_->getUncompressedLength(data.get(), 200));
522  }
523 }
524 
526  ByteRange input{};
527  auto buffer = IOBuf::create(codec_->maxCompressedLength(0));
528  buffer->append(buffer->capacity());
530 
531  // Test compressing empty data in one pass
532  if (!codec_->needsDataLength()) {
533  output = {buffer->writableData(), buffer->length()};
534  EXPECT_TRUE(
535  codec_->compressStream(input, output, StreamCodec::FlushOp::END));
536  }
537  codec_->resetStream(0);
538  output = {buffer->writableData(), buffer->length()};
539  EXPECT_TRUE(codec_->compressStream(input, output, StreamCodec::FlushOp::END));
540 
541  // Test uncompressing the compressed empty data is equivalent to the empty
542  // string
543  {
544  size_t compressedSize = buffer->length() - output.size();
545  auto const compressed =
546  IOBuf::copyBuffer(buffer->writableData(), compressedSize);
547  auto inputRange = compressed->coalesce();
548  codec_->resetStream(0);
549  output = {buffer->writableData(), buffer->length()};
550  EXPECT_TRUE(codec_->uncompressStream(
551  inputRange, output, StreamCodec::FlushOp::END));
552  EXPECT_EQ(output.size(), buffer->length());
553  }
554 
555  // Test compressing empty data with multiple calls to compressStream()
556  {
557  auto largeBuffer = IOBuf::create(codec_->maxCompressedLength(0) * 2);
558  largeBuffer->append(largeBuffer->capacity());
559  codec_->resetStream(0);
560  output = {largeBuffer->writableData(), largeBuffer->length()};
561  EXPECT_FALSE(codec_->compressStream(input, output));
562  if (hasFlush()) {
563  EXPECT_TRUE(
564  codec_->compressStream(input, output, StreamCodec::FlushOp::FLUSH));
565  }
566  EXPECT_TRUE(
567  codec_->compressStream(input, output, StreamCodec::FlushOp::END));
568  }
569 
570  // Test uncompressing empty data
571  output = {};
572  codec_->resetStream();
573  EXPECT_TRUE(codec_->uncompressStream(input, output));
574  if (hasFlush()) {
575  codec_->resetStream();
576  EXPECT_TRUE(
577  codec_->uncompressStream(input, output, StreamCodec::FlushOp::FLUSH));
578  }
579  codec_->resetStream();
580  EXPECT_TRUE(
581  codec_->uncompressStream(input, output, StreamCodec::FlushOp::END));
582  codec_->resetStream(0);
583  EXPECT_TRUE(codec_->uncompressStream(input, output));
584  if (hasFlush()) {
585  codec_->resetStream(0);
586  EXPECT_TRUE(
587  codec_->uncompressStream(input, output, StreamCodec::FlushOp::FLUSH));
588  }
589  codec_->resetStream(0);
590  EXPECT_TRUE(
591  codec_->uncompressStream(input, output, StreamCodec::FlushOp::END));
592 }
593 
594 TEST_P(StreamingUnitTest, noForwardProgress) {
595  auto inBuffer = IOBuf::create(2);
596  inBuffer->writableData()[0] = 'a';
597  inBuffer->writableData()[1] = 'a';
598  inBuffer->append(2);
599  const auto compressed = codec_->compress(inBuffer.get());
600  auto outBuffer = IOBuf::create(codec_->maxCompressedLength(2));
601 
602  ByteRange emptyInput;
603  MutableByteRange emptyOutput;
604 
605  const std::array<StreamCodec::FlushOp, 3> flushOps = {{
609  }};
610 
611  // No progress is not okay twice in a row for all flush operations when
612  // compressing
613  for (const auto flushOp : flushOps) {
614  if (flushOp == StreamCodec::FlushOp::FLUSH && !hasFlush()) {
615  continue;
616  }
617  if (codec_->needsDataLength()) {
618  codec_->resetStream(inBuffer->computeChainDataLength());
619  } else {
620  codec_->resetStream();
621  }
622  auto input = inBuffer->coalesce();
623  MutableByteRange output = {outBuffer->writableTail(),
624  outBuffer->tailroom()};
625  // Compress some data to avoid empty data special casing
626  while (!input.empty()) {
627  codec_->compressStream(input, output);
628  }
629  EXPECT_FALSE(codec_->compressStream(emptyInput, emptyOutput, flushOp));
630  EXPECT_THROW(
631  codec_->compressStream(emptyInput, emptyOutput, flushOp),
632  std::runtime_error);
633  }
634 
635  // No progress is not okay twice in a row for all flush operations when
636  // uncompressing
637  for (const auto flushOp : flushOps) {
638  if (flushOp == StreamCodec::FlushOp::FLUSH && !hasFlush()) {
639  continue;
640  }
641  codec_->resetStream();
642  auto input = compressed->coalesce();
643  // Remove the last byte so the operation is incomplete
644  input.uncheckedSubtract(1);
645  MutableByteRange output = {inBuffer->writableData(), inBuffer->length()};
646  // Uncompress some data to avoid empty data special casing
647  while (!input.empty()) {
648  EXPECT_FALSE(codec_->uncompressStream(input, output));
649  }
650  EXPECT_FALSE(codec_->uncompressStream(emptyInput, emptyOutput, flushOp));
651  EXPECT_THROW(
652  codec_->uncompressStream(emptyInput, emptyOutput, flushOp),
653  std::runtime_error);
654  }
655 }
656 
657 TEST_P(StreamingUnitTest, stateTransitions) {
658  auto inBuffer = IOBuf::create(2);
659  inBuffer->writableData()[0] = 'a';
660  inBuffer->writableData()[1] = 'a';
661  inBuffer->append(2);
662  auto compressed = codec_->compress(inBuffer.get());
663  ByteRange const in = compressed->coalesce();
664  auto outBuffer = IOBuf::create(codec_->maxCompressedLength(in.size()));
665  MutableByteRange const out{outBuffer->writableTail(), outBuffer->tailroom()};
666 
667  auto compress = [&](StreamCodec::FlushOp flushOp = StreamCodec::FlushOp::NONE,
668  bool empty = false) {
669  auto input = in;
670  auto output = empty ? MutableByteRange{} : out;
671  return codec_->compressStream(input, output, flushOp);
672  };
673  auto compress_all = [&](bool expect,
674  StreamCodec::FlushOp flushOp =
676  bool empty = false) {
677  auto input = in;
678  auto output = empty ? MutableByteRange{} : out;
679  while (!input.empty()) {
680  if (expect) {
681  EXPECT_TRUE(codec_->compressStream(input, output, flushOp));
682  } else {
683  EXPECT_FALSE(codec_->compressStream(input, output, flushOp));
684  }
685  }
686  };
687  auto uncompress = [&](StreamCodec::FlushOp flushOp =
689  bool empty = false) {
690  auto input = in;
691  auto output = empty ? MutableByteRange{} : out;
692  return codec_->uncompressStream(input, output, flushOp);
693  };
694 
695  // compression flow
696  if (!codec_->needsDataLength()) {
697  codec_->resetStream();
698  EXPECT_FALSE(compress());
699  EXPECT_FALSE(compress());
700  if (hasFlush()) {
702  }
703  EXPECT_FALSE(compress());
705  }
706  codec_->resetStream(in.size() * 5);
707  compress_all(false);
708  compress_all(false);
709  if (hasFlush()) {
710  compress_all(true, StreamCodec::FlushOp::FLUSH);
711  }
712  compress_all(false);
713  compress_all(true, StreamCodec::FlushOp::END);
714 
715  // uncompression flow
716  codec_->resetStream();
717  EXPECT_FALSE(uncompress(StreamCodec::FlushOp::NONE, true));
718  if (hasFlush()) {
719  codec_->resetStream();
720  EXPECT_FALSE(uncompress(StreamCodec::FlushOp::FLUSH, true));
721  }
722  codec_->resetStream();
723  EXPECT_FALSE(uncompress(StreamCodec::FlushOp::NONE, true));
724  codec_->resetStream();
725  EXPECT_FALSE(uncompress(StreamCodec::FlushOp::NONE, true));
726  if (hasFlush()) {
727  codec_->resetStream();
729  }
730  // compress -> uncompress
731  codec_->resetStream(in.size());
732  EXPECT_FALSE(compress());
733  EXPECT_THROW(uncompress(), std::logic_error);
734  // uncompress -> compress
735  codec_->resetStream(inBuffer->computeChainDataLength());
737  EXPECT_THROW(compress(), std::logic_error);
738  // end -> compress
739  if (!codec_->needsDataLength()) {
740  codec_->resetStream();
741  EXPECT_FALSE(compress());
743  EXPECT_THROW(compress(), std::logic_error);
744  }
745  codec_->resetStream(in.size() * 2);
746  compress_all(false);
747  compress_all(true, StreamCodec::FlushOp::END);
748  EXPECT_THROW(compress(), std::logic_error);
749  // end -> uncompress
750  codec_->resetStream();
752  EXPECT_THROW(uncompress(), std::logic_error);
753  // flush -> compress
754  if (hasFlush()) {
755  codec_->resetStream(in.size());
756  EXPECT_FALSE(compress(StreamCodec::FlushOp::FLUSH, true));
757  EXPECT_THROW(compress(), std::logic_error);
758  }
759  // flush -> end
760  if (hasFlush()) {
761  codec_->resetStream(in.size());
762  EXPECT_FALSE(compress(StreamCodec::FlushOp::FLUSH, true));
763  EXPECT_THROW(compress(StreamCodec::FlushOp::END), std::logic_error);
764  }
765  // undefined -> compress
766  codec_->compress(inBuffer.get());
767  EXPECT_THROW(compress(), std::logic_error);
768  codec_->uncompress(compressed.get(), inBuffer->computeChainDataLength());
769  EXPECT_THROW(compress(), std::logic_error);
770  // undefined -> undefined
771  codec_->uncompress(compressed.get());
772  codec_->compress(inBuffer.get());
773 }
774 
778  testing::ValuesIn(availableStreamCodecs()));
779 
781  : public testing::TestWithParam<std::tuple<int, int, CodecType>> {
782  protected:
783  bool hasFlush() const {
784  return codecHasFlush(std::get<2>(GetParam()));
785  }
786 
787  void SetUp() override {
788  auto const tup = GetParam();
789  uncompressedLength_ = uint64_t(1) << std::get<0>(tup);
790  chunkSize_ = size_t(1) << std::get<1>(tup);
791  codec_ = getStreamCodec(std::get<2>(tup));
792  }
793 
794  void runResetStreamTest(DataHolder const& dh);
795  void runCompressStreamTest(DataHolder const& dh);
796  void runUncompressStreamTest(DataHolder const& dh);
797  void runFlushTest(DataHolder const& dh);
798 
799  private:
800  std::vector<ByteRange> split(ByteRange data) const;
801 
803  size_t chunkSize_;
804  std::unique_ptr<StreamCodec> codec_;
805 };
806 
807 std::vector<ByteRange> StreamingCompressionTest::split(ByteRange data) const {
808  size_t const pieces = std::max<size_t>(1, data.size() / chunkSize_);
809  std::vector<ByteRange> result;
810  result.reserve(pieces + 1);
811  while (!data.empty()) {
812  size_t const pieceSize = std::min(data.size(), chunkSize_);
813  result.push_back(data.subpiece(0, pieceSize));
814  data.uncheckedAdvance(pieceSize);
815  }
816  return result;
817 }
818 
819 static std::unique_ptr<IOBuf> compressSome(
821  ByteRange data,
822  uint64_t bufferSize,
823  StreamCodec::FlushOp flush) {
824  bool result;
825  IOBufQueue queue;
826  do {
827  auto buffer = IOBuf::create(bufferSize);
828  buffer->append(buffer->capacity());
829  MutableByteRange output{buffer->writableData(), buffer->length()};
830 
831  result = codec->compressStream(data, output, flush);
832  buffer->trimEnd(output.size());
833  queue.append(std::move(buffer));
834 
835  } while (!(flush == StreamCodec::FlushOp::NONE && data.empty()) && !result);
836  EXPECT_TRUE(data.empty());
837  return queue.move();
838 }
839 
840 static std::pair<bool, std::unique_ptr<IOBuf>> uncompressSome(
842  ByteRange& data,
843  uint64_t bufferSize,
844  StreamCodec::FlushOp flush) {
845  bool result;
846  IOBufQueue queue;
847  do {
848  auto buffer = IOBuf::create(bufferSize);
849  buffer->append(buffer->capacity());
850  MutableByteRange output{buffer->writableData(), buffer->length()};
851 
852  result = codec->uncompressStream(data, output, flush);
853  buffer->trimEnd(output.size());
854  queue.append(std::move(buffer));
855 
856  } while (queue.tailroom() == 0 && !result);
857  return std::make_pair(result, queue.move());
858 }
859 
861  auto const input = dh.data(uncompressedLength_);
862  // Compress some but leave state unclean
863  codec_->resetStream(uncompressedLength_);
864  compressSome(codec_.get(), input, chunkSize_, StreamCodec::FlushOp::NONE);
865  // Reset stream and compress all
866  if (codec_->needsDataLength()) {
867  codec_->resetStream(uncompressedLength_);
868  } else {
869  codec_->resetStream();
870  }
871  auto compressed =
872  compressSome(codec_.get(), input, chunkSize_, StreamCodec::FlushOp::END);
873  auto const uncompressed = codec_->uncompress(compressed.get(), input.size());
874  EXPECT_EQ(dh.hash(uncompressedLength_), hashIOBuf(uncompressed.get()));
875 }
876 
878  runResetStreamTest(constantDataHolder);
879  runResetStreamTest(randomDataHolder);
880 }
881 
883  const folly::io::test::DataHolder& dh) {
884  auto const inputs = split(dh.data(uncompressedLength_));
885 
886  IOBufQueue queue;
887  codec_->resetStream(uncompressedLength_);
888  // Compress many inputs in a row
889  for (auto const input : inputs) {
890  queue.append(compressSome(
891  codec_.get(), input, chunkSize_, StreamCodec::FlushOp::NONE));
892  }
893  // Finish the operation with empty input.
895  queue.append(
896  compressSome(codec_.get(), empty, chunkSize_, StreamCodec::FlushOp::END));
897 
898  auto const uncompressed = codec_->uncompress(queue.front());
899  EXPECT_EQ(dh.hash(uncompressedLength_), hashIOBuf(uncompressed.get()));
900 }
901 
903  runCompressStreamTest(constantDataHolder);
904  runCompressStreamTest(randomDataHolder);
905 }
906 
908  const folly::io::test::DataHolder& dh) {
909  const auto flush =
911  auto const data = IOBuf::wrapBuffer(dh.data(uncompressedLength_));
912  // Concatenate 3 compressed frames in a row
913  auto compressed = codec_->compress(data.get());
914  compressed->prependChain(codec_->compress(data.get()));
915  compressed->prependChain(codec_->compress(data.get()));
916  // Pass all 3 compressed frames in one input buffer
917  auto input = compressed->coalesce();
918  // Uncompress the first frame
919  codec_->resetStream(data->computeChainDataLength());
920  {
921  auto const result = uncompressSome(codec_.get(), input, chunkSize_, flush);
922  ASSERT_TRUE(result.first);
923  ASSERT_EQ(hashIOBuf(data.get()), hashIOBuf(result.second.get()));
924  }
925  // Uncompress the second frame
926  codec_->resetStream();
927  {
928  auto const result = uncompressSome(
929  codec_.get(), input, chunkSize_, StreamCodec::FlushOp::END);
930  ASSERT_TRUE(result.first);
931  ASSERT_EQ(hashIOBuf(data.get()), hashIOBuf(result.second.get()));
932  }
933  // Uncompress the third frame
934  codec_->resetStream();
935  {
936  auto const result = uncompressSome(codec_.get(), input, chunkSize_, flush);
937  ASSERT_TRUE(result.first);
938  ASSERT_EQ(hashIOBuf(data.get()), hashIOBuf(result.second.get()));
939  }
940  EXPECT_TRUE(input.empty());
941 }
942 
943 TEST_P(StreamingCompressionTest, uncompressStream) {
944  runUncompressStreamTest(constantDataHolder);
945  runUncompressStreamTest(randomDataHolder);
946 }
947 
949  auto const inputs = split(dh.data(uncompressedLength_));
950  auto uncodec = getStreamCodec(codec_->type());
951 
952  if (codec_->needsDataLength()) {
953  codec_->resetStream(uncompressedLength_);
954  } else {
955  codec_->resetStream();
956  }
957  for (auto input : inputs) {
958  // Compress some data and flush the stream
959  auto compressed = compressSome(
960  codec_.get(), input, chunkSize_, StreamCodec::FlushOp::FLUSH);
961  auto compressedRange = compressed->coalesce();
962  // Uncompress the compressed data
963  auto result = uncompressSome(
964  uncodec.get(),
965  compressedRange,
966  chunkSize_,
968  // All compressed data should have been consumed
969  EXPECT_TRUE(compressedRange.empty());
970  // The frame isn't complete
971  EXPECT_FALSE(result.first);
972  // The uncompressed data should be exactly the input data
973  EXPECT_EQ(input.size(), result.second->computeChainDataLength());
974  auto const data = IOBuf::wrapBuffer(input);
975  EXPECT_EQ(hashIOBuf(data.get()), hashIOBuf(result.second.get()));
976  }
977 }
978 
980  if (!hasFlush()) {
981  return;
982  }
983  runFlushTest(constantDataHolder);
984  runFlushTest(randomDataHolder);
985 }
986 
990  testing::Combine(
991  testing::Values(0, 1, 12, 22, 27),
992  testing::Values(12, 17, 20),
993  testing::ValuesIn(availableStreamCodecs())));
994 
995 namespace {
996 
997 // Codec types included in the codec returned by getAutoUncompressionCodec() by
998 // default.
999 std::vector<CodecType> autoUncompressionCodecTypes = {{
1006 }};
1007 
1008 } // namespace
1009 
1010 class AutomaticCodecTest : public testing::TestWithParam<CodecType> {
1011  protected:
1012  void SetUp() override {
1013  codecType_ = GetParam();
1014  codec_ = getCodec(codecType_);
1015  autoType_ = std::any_of(
1016  autoUncompressionCodecTypes.begin(),
1017  autoUncompressionCodecTypes.end(),
1018  [&](CodecType o) { return codecType_ == o; });
1019  // Add the codec with type codecType_ as the terminal codec if it is not in
1020  // autoUncompressionCodecTypes.
1021  auto_ = getAutoUncompressionCodec({}, getTerminalCodec());
1022  }
1023 
1024  void runSimpleTest(const DataHolder& dh);
1025 
1026  std::unique_ptr<Codec> getTerminalCodec() {
1027  return (autoType_ ? nullptr : getCodec(codecType_));
1028  }
1029 
1030  std::unique_ptr<Codec> codec_;
1031  std::unique_ptr<Codec> auto_;
1033  // true if codecType_ is in autoUncompressionCodecTypes
1035 };
1036 
1038  constexpr uint64_t uncompressedLength = 1000;
1039  auto original = IOBuf::wrapBuffer(dh.data(uncompressedLength));
1040  auto compressed = codec_->compress(original.get());
1041 
1042  if (!codec_->needsUncompressedLength()) {
1043  auto uncompressed = auto_->uncompress(compressed.get());
1044  EXPECT_EQ(uncompressedLength, uncompressed->computeChainDataLength());
1045  EXPECT_EQ(dh.hash(uncompressedLength), hashIOBuf(uncompressed.get()));
1046  }
1047  {
1048  auto uncompressed = auto_->uncompress(compressed.get(), uncompressedLength);
1049  EXPECT_EQ(uncompressedLength, uncompressed->computeChainDataLength());
1050  EXPECT_EQ(dh.hash(uncompressedLength), hashIOBuf(uncompressed.get()));
1051  }
1052  ASSERT_GE(compressed->computeChainDataLength(), 8);
1053  for (size_t i = 0; i < 8; ++i) {
1054  auto split = compressed->clone();
1055  auto rest = compressed->clone();
1056  split->trimEnd(split->length() - i);
1057  rest->trimStart(i);
1058  split->appendChain(std::move(rest));
1059  auto uncompressed = auto_->uncompress(split.get(), uncompressedLength);
1060  EXPECT_EQ(uncompressedLength, uncompressed->computeChainDataLength());
1061  EXPECT_EQ(dh.hash(uncompressedLength), hashIOBuf(uncompressed.get()));
1062  }
1063 }
1064 
1066  runSimpleTest(randomDataHolder);
1067 }
1068 
1069 TEST_P(AutomaticCodecTest, ConstantData) {
1070  runSimpleTest(constantDataHolder);
1071 }
1072 
1073 TEST_P(AutomaticCodecTest, ValidPrefixes) {
1074  const auto prefixes = codec_->validPrefixes();
1075  for (const auto& prefix : prefixes) {
1076  EXPECT_FALSE(prefix.empty());
1077  // Ensure that all strings are at least 8 bytes for LZMA2.
1078  // The bytes after the prefix should be ignored by `canUncompress()`.
1080  data.append(8);
1081  EXPECT_TRUE(codec_->canUncompress(&data));
1082  EXPECT_TRUE(auto_->canUncompress(&data));
1083  }
1084 }
1085 
1086 TEST_P(AutomaticCodecTest, NeedsUncompressedLength) {
1087  if (codec_->needsUncompressedLength()) {
1088  EXPECT_TRUE(auto_->needsUncompressedLength());
1089  }
1090 }
1091 
1092 TEST_P(AutomaticCodecTest, maxUncompressedLength) {
1093  EXPECT_LE(codec_->maxUncompressedLength(), auto_->maxUncompressedLength());
1094 }
1095 
1096 TEST_P(AutomaticCodecTest, DefaultCodec) {
1097  const uint64_t length = 42;
1098  std::vector<std::unique_ptr<Codec>> codecs;
1099  codecs.push_back(getCodec(CodecType::ZSTD));
1100  auto automatic =
1101  getAutoUncompressionCodec(std::move(codecs), getTerminalCodec());
1102  auto original = IOBuf::wrapBuffer(constantDataHolder.data(length));
1103  auto compressed = codec_->compress(original.get());
1104  std::unique_ptr<IOBuf> decompressed;
1105 
1106  if (automatic->needsUncompressedLength()) {
1107  decompressed = automatic->uncompress(compressed.get(), length);
1108  } else {
1109  decompressed = automatic->uncompress(compressed.get());
1110  }
1111 
1112  EXPECT_EQ(constantDataHolder.hash(length), hashIOBuf(decompressed.get()));
1113 }
1114 
1115 namespace {
1116 class CustomCodec : public Codec {
1117  public:
1118  static std::unique_ptr<Codec> create(std::string prefix, CodecType type) {
1119  return std::make_unique<CustomCodec>(std::move(prefix), type);
1120  }
1121  explicit CustomCodec(std::string prefix, CodecType type)
1123  prefix_(std::move(prefix)),
1124  codec_(getCodec(type)) {}
1125 
1126  private:
1127  std::vector<std::string> validPrefixes() const override {
1128  return {prefix_};
1129  }
1130 
1131  uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override {
1132  return codec_->maxCompressedLength(uncompressedLength) + prefix_.size();
1133  }
1134 
1135  bool canUncompress(const IOBuf* data, Optional<uint64_t>) const override {
1136  auto clone = data->cloneCoalescedAsValue();
1137  if (clone.length() < prefix_.size()) {
1138  return false;
1139  }
1140  return memcmp(clone.data(), prefix_.data(), prefix_.size()) == 0;
1141  }
1142 
1143  std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override {
1144  auto result = IOBuf::copyBuffer(prefix_);
1145  result->appendChain(codec_->compress(data));
1146  EXPECT_TRUE(canUncompress(result.get(), data->computeChainDataLength()));
1147  return result;
1148  }
1149 
1150  std::unique_ptr<IOBuf> doUncompress(
1151  const IOBuf* data,
1152  Optional<uint64_t> uncompressedLength) override {
1153  EXPECT_TRUE(canUncompress(data, uncompressedLength));
1154  auto clone = data->cloneCoalescedAsValue();
1155  clone.trimStart(prefix_.size());
1156  return codec_->uncompress(&clone, uncompressedLength);
1157  }
1158 
1160  std::unique_ptr<Codec> codec_;
1161 };
1162 } // namespace
1163 
1165  const uint64_t length = 42;
1166  auto ab = CustomCodec::create("ab", CodecType::ZSTD);
1167  std::vector<std::unique_ptr<Codec>> codecs;
1168  codecs.push_back(CustomCodec::create("ab", CodecType::ZSTD));
1169  auto automatic =
1170  getAutoUncompressionCodec(std::move(codecs), getTerminalCodec());
1171  auto original = IOBuf::wrapBuffer(constantDataHolder.data(length));
1172 
1173  auto abCompressed = ab->compress(original.get());
1174  std::unique_ptr<IOBuf> abDecompressed;
1175  if (automatic->needsUncompressedLength()) {
1176  abDecompressed = automatic->uncompress(abCompressed.get(), length);
1177  } else {
1178  abDecompressed = automatic->uncompress(abCompressed.get());
1179  }
1180  EXPECT_TRUE(automatic->canUncompress(abCompressed.get()));
1181  EXPECT_FALSE(auto_->canUncompress(abCompressed.get()));
1182  EXPECT_EQ(constantDataHolder.hash(length), hashIOBuf(abDecompressed.get()));
1183 
1184  auto compressed = codec_->compress(original.get());
1185  std::unique_ptr<IOBuf> decompressed;
1186  if (automatic->needsUncompressedLength()) {
1187  decompressed = automatic->uncompress(compressed.get(), length);
1188  } else {
1189  decompressed = automatic->uncompress(compressed.get());
1190  }
1191  EXPECT_EQ(constantDataHolder.hash(length), hashIOBuf(decompressed.get()));
1192 }
1193 
1194 TEST_P(AutomaticCodecTest, CustomDefaultCodec) {
1195  const uint64_t length = 42;
1196  auto none = CustomCodec::create("none", CodecType::NO_COMPRESSION);
1197  std::vector<std::unique_ptr<Codec>> codecs;
1198  codecs.push_back(CustomCodec::create("none", CodecType::NO_COMPRESSION));
1199  codecs.push_back(getCodec(CodecType::LZ4_FRAME));
1200  auto automatic =
1201  getAutoUncompressionCodec(std::move(codecs), getTerminalCodec());
1202  auto original = IOBuf::wrapBuffer(constantDataHolder.data(length));
1203 
1204  auto noneCompressed = none->compress(original.get());
1205  std::unique_ptr<IOBuf> noneDecompressed;
1206  if (automatic->needsUncompressedLength()) {
1207  noneDecompressed = automatic->uncompress(noneCompressed.get(), length);
1208  } else {
1209  noneDecompressed = automatic->uncompress(noneCompressed.get());
1210  }
1211  EXPECT_TRUE(automatic->canUncompress(noneCompressed.get()));
1212  EXPECT_FALSE(auto_->canUncompress(noneCompressed.get()));
1213  EXPECT_EQ(constantDataHolder.hash(length), hashIOBuf(noneDecompressed.get()));
1214 
1215  auto compressed = codec_->compress(original.get());
1216  std::unique_ptr<IOBuf> decompressed;
1217  if (automatic->needsUncompressedLength()) {
1218  decompressed = automatic->uncompress(compressed.get(), length);
1219  } else {
1220  decompressed = automatic->uncompress(compressed.get());
1221  }
1222  EXPECT_EQ(constantDataHolder.hash(length), hashIOBuf(decompressed.get()));
1223 }
1224 
1225 TEST_P(AutomaticCodecTest, canUncompressOneBytes) {
1226  // No default codec can uncompress 1 bytes.
1227  IOBuf buf{IOBuf::CREATE, 1};
1228  buf.append(1);
1229  EXPECT_FALSE(codec_->canUncompress(&buf, 1));
1230  EXPECT_FALSE(codec_->canUncompress(&buf, folly::none));
1231  EXPECT_FALSE(auto_->canUncompress(&buf, 1));
1232  EXPECT_FALSE(auto_->canUncompress(&buf, folly::none));
1233 }
1234 
1238  testing::ValuesIn(availableCodecs()));
1239 
1240 namespace {
1241 
1242 // Codec that always "uncompresses" to the same string.
1243 class ConstantCodec : public Codec {
1244  public:
1245  static std::unique_ptr<Codec> create(
1246  std::string uncompressed,
1247  CodecType type) {
1248  return std::make_unique<ConstantCodec>(std::move(uncompressed), type);
1249  }
1250  explicit ConstantCodec(std::string uncompressed, CodecType type)
1251  : Codec(type), uncompressed_(std::move(uncompressed)) {}
1252 
1253  private:
1254  uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override {
1255  return uncompressedLength;
1256  }
1257 
1258  std::unique_ptr<IOBuf> doCompress(const IOBuf*) override {
1259  throw std::runtime_error("ConstantCodec error: compress() not supported.");
1260  }
1261 
1262  std::unique_ptr<IOBuf> doUncompress(const IOBuf*, Optional<uint64_t>)
1263  override {
1265  }
1266 
1268  std::unique_ptr<Codec> codec_;
1269 };
1270 
1271 } // namespace
1272 
1273 class TerminalCodecTest : public testing::TestWithParam<CodecType> {
1274  protected:
1275  void SetUp() override {
1276  codecType_ = GetParam();
1277  codec_ = getCodec(codecType_);
1278  auto_ = getAutoUncompressionCodec();
1279  }
1280 
1282  std::unique_ptr<Codec> codec_;
1283  std::unique_ptr<Codec> auto_;
1284 };
1285 
1286 // Test that the terminal codec's uncompress() function is called when the
1287 // default chosen automatic codec throws.
1288 TEST_P(TerminalCodecTest, uncompressIfDefaultThrows) {
1289  std::string const original = "abc";
1290  auto const compressed = codec_->compress(original);
1291 
1292  // Sanity check: the automatic codec can uncompress the original string.
1293  auto const uncompressed = auto_->uncompress(compressed);
1294  EXPECT_EQ(uncompressed, original);
1295 
1296  // Truncate the compressed string.
1297  auto const truncated = compressed.substr(0, compressed.size() - 1);
1298  auto const truncatedBuf =
1299  IOBuf::wrapBuffer(truncated.data(), truncated.size());
1300  EXPECT_TRUE(auto_->canUncompress(truncatedBuf.get()));
1301  EXPECT_ANY_THROW(auto_->uncompress(truncated));
1302 
1303  // Expect the terminal codec to successfully uncompress the string.
1304  std::unique_ptr<Codec> terminal = getAutoUncompressionCodec(
1305  {}, ConstantCodec::create("dummyString", CodecType::USER_DEFINED));
1306  EXPECT_TRUE(terminal->canUncompress(truncatedBuf.get()));
1307  EXPECT_EQ(terminal->uncompress(truncated), "dummyString");
1308 }
1309 
1310 // If the terminal codec has one of the "default types" automatically added in
1311 // the AutomaticCodec, check that the default codec is no longer added.
1312 TEST_P(TerminalCodecTest, terminalOverridesDefaults) {
1313  std::unique_ptr<Codec> terminal = getAutoUncompressionCodec(
1314  {}, ConstantCodec::create("dummyString", codecType_));
1315  std::string const original = "abc";
1316  auto const compressed = codec_->compress(original);
1317  EXPECT_EQ(terminal->uncompress(compressed), "dummyString");
1318 }
1319 
1323  testing::ValuesIn(autoUncompressionCodecTypes));
1324 
1325 TEST(ValidPrefixesTest, CustomCodec) {
1326  std::vector<std::unique_ptr<Codec>> codecs;
1327  codecs.push_back(CustomCodec::create("none", CodecType::NO_COMPRESSION));
1328  const auto none = getAutoUncompressionCodec(std::move(codecs));
1329  const auto prefixes = none->validPrefixes();
1330  const auto it = std::find(prefixes.begin(), prefixes.end(), "none");
1331  EXPECT_TRUE(it != prefixes.end());
1332 }
1333 
1334 #define EXPECT_THROW_IF_DEBUG(statement, expected_exception) \
1335  do { \
1336  if (kIsDebug) { \
1337  EXPECT_THROW((statement), expected_exception); \
1338  } else { \
1339  EXPECT_NO_THROW((statement)); \
1340  } \
1341  } while (false)
1342 
1343 TEST(CheckCompatibleTest, SimplePrefixSecond) {
1344  std::vector<std::unique_ptr<Codec>> codecs;
1345  codecs.push_back(CustomCodec::create("abc", CodecType::NO_COMPRESSION));
1346  codecs.push_back(CustomCodec::create("ab", CodecType::NO_COMPRESSION));
1348  getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument);
1349 }
1350 
1351 TEST(CheckCompatibleTest, SimplePrefixFirst) {
1352  std::vector<std::unique_ptr<Codec>> codecs;
1353  codecs.push_back(CustomCodec::create("ab", CodecType::NO_COMPRESSION));
1354  codecs.push_back(CustomCodec::create("abc", CodecType::NO_COMPRESSION));
1356  getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument);
1357 }
1358 
1359 TEST(CheckCompatibleTest, Empty) {
1360  std::vector<std::unique_ptr<Codec>> codecs;
1361  codecs.push_back(CustomCodec::create("", CodecType::NO_COMPRESSION));
1363  getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument);
1364 }
1365 
1366 TEST(CheckCompatibleTest, ZstdPrefix) {
1367  std::vector<std::unique_ptr<Codec>> codecs;
1368  codecs.push_back(CustomCodec::create("\x28\xB5\x2F", CodecType::ZSTD));
1370  getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument);
1371 }
1372 
1373 TEST(CheckCompatibleTest, ZstdDuplicate) {
1374  std::vector<std::unique_ptr<Codec>> codecs;
1375  codecs.push_back(CustomCodec::create("\x28\xB5\x2F\xFD", CodecType::ZSTD));
1377  getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument);
1378 }
1379 
1380 TEST(CheckCompatibleTest, ZlibIsPrefix) {
1381  std::vector<std::unique_ptr<Codec>> codecs;
1382  codecs.push_back(CustomCodec::create("\x18\x76zzasdf", CodecType::ZSTD));
1384  getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument);
1385 }
1386 
1387 #if FOLLY_HAVE_LIBZSTD
1388 
1389 TEST(ZstdTest, BackwardCompatible) {
1390  auto codec = getCodec(CodecType::ZSTD);
1391  {
1392  auto const data = IOBuf::wrapBuffer(randomDataHolder.data(size_t(1) << 20));
1393  auto compressed = codec->compress(data.get());
1394  compressed->coalesce();
1395  EXPECT_EQ(
1396  data->length(),
1397  ZSTD_getDecompressedSize(compressed->data(), compressed->length()));
1398  }
1399  {
1400  auto const data =
1401  IOBuf::wrapBuffer(randomDataHolder.data(size_t(100) << 20));
1402  auto compressed = codec->compress(data.get());
1403  compressed->coalesce();
1404  EXPECT_EQ(
1405  data->length(),
1406  ZSTD_getDecompressedSize(compressed->data(), compressed->length()));
1407  }
1408 }
1409 
1410 TEST(ZstdTest, CustomOptions) {
1411  auto test = [](const DataHolder& dh, unsigned contentSizeFlag) {
1412  unsigned const wlog = 23;
1413  zstd::Options options(1);
1414  options.set(ZSTD_p_contentSizeFlag, contentSizeFlag);
1415  options.set(ZSTD_p_checksumFlag, 1);
1416  options.set(ZSTD_p_windowLog, wlog);
1417  auto codec = zstd::getCodec(std::move(options));
1418  size_t const uncompressedLength = (size_t)1 << 27;
1419  auto const original = std::string(
1420  reinterpret_cast<const char*>(dh.data(uncompressedLength).data()),
1421  uncompressedLength);
1422  auto const compressed = codec->compress(original);
1423  auto const uncompressed = codec->uncompress(compressed);
1424  EXPECT_EQ(uncompressed, original);
1425  EXPECT_EQ(
1426  codec->getUncompressedLength(
1427  folly::IOBuf::wrapBuffer(compressed.data(), compressed.size())
1428  .get()),
1429  contentSizeFlag ? uncompressedLength : Optional<uint64_t>());
1430  {
1431  ZSTD_frameHeader zfh;
1432  ZSTD_getFrameHeader(&zfh, compressed.data(), compressed.size());
1433  EXPECT_EQ(zfh.checksumFlag, 1);
1434  EXPECT_EQ(zfh.windowSize, 1ULL << wlog);
1435  EXPECT_EQ(
1436  zfh.frameContentSize,
1437  contentSizeFlag ? uncompressedLength : ZSTD_CONTENTSIZE_UNKNOWN);
1438  }
1439  };
1440  for (unsigned contentSizeFlag = 0; contentSizeFlag <= 1; ++contentSizeFlag) {
1441  test(constantDataHolder, contentSizeFlag);
1442  test(randomDataHolder, contentSizeFlag);
1443  }
1444 }
1445 
1446 TEST(ZstdTest, NegativeLevels) {
1447  EXPECT_EQ(zstd::Options(1).level(), 1);
1448  EXPECT_EQ(zstd::Options(-1).level(), -1);
1449  auto const original = std::string(
1450  reinterpret_cast<const char*>(randomDataHolder.data(16348).data()),
1451  16348);
1452  auto const plusCompressed =
1453  zstd::getCodec(zstd::Options(1))->compress(original);
1454  auto const minusCompressed =
1455  zstd::getCodec(zstd::Options(-100))->compress(original);
1456  EXPECT_GT(minusCompressed.size(), plusCompressed.size());
1457  auto codec = getCodec(CodecType::ZSTD);
1458  auto const uncompressed = codec->uncompress(minusCompressed);
1459  EXPECT_EQ(original, uncompressed);
1460 }
1461 
1462 #endif
1463 
1464 #if FOLLY_HAVE_LIBZ
1465 
1466 using ZlibFormat = zlib::Options::Format;
1467 
1468 TEST(ZlibTest, Auto) {
1469  size_t const uncompressedLength_ = (size_t)1 << 15;
1470  auto const original = std::string(
1471  reinterpret_cast<const char*>(
1472  randomDataHolder.data(uncompressedLength_).data()),
1473  uncompressedLength_);
1474  auto optionCodec = zlib::getCodec(zlib::Options(ZlibFormat::AUTO));
1475 
1476  // Test the codec can uncompress zlib data.
1477  {
1478  auto codec = getCodec(CodecType::ZLIB);
1479  auto const compressed = codec->compress(original);
1480  auto const uncompressed = optionCodec->uncompress(compressed);
1481  EXPECT_EQ(original, uncompressed);
1482  }
1483 
1484  // Test the codec can uncompress gzip data.
1485  {
1486  auto codec = getCodec(CodecType::GZIP);
1487  auto const compressed = codec->compress(original);
1488  auto const uncompressed = optionCodec->uncompress(compressed);
1489  EXPECT_EQ(original, uncompressed);
1490  }
1491 }
1492 
1493 TEST(ZlibTest, DefaultOptions) {
1494  size_t const uncompressedLength_ = (size_t)1 << 20;
1495  auto const original = std::string(
1496  reinterpret_cast<const char*>(
1497  randomDataHolder.data(uncompressedLength_).data()),
1498  uncompressedLength_);
1499  {
1500  auto codec = getCodec(CodecType::ZLIB);
1501  auto optionCodec = zlib::getCodec(zlib::defaultZlibOptions());
1502  auto const compressed = optionCodec->compress(original);
1503  auto uncompressed = codec->uncompress(compressed);
1504  EXPECT_EQ(original, uncompressed);
1505  uncompressed = optionCodec->uncompress(compressed);
1506  EXPECT_EQ(original, uncompressed);
1507  }
1508 
1509  {
1510  auto codec = getCodec(CodecType::GZIP);
1511  auto optionCodec = zlib::getCodec(zlib::defaultGzipOptions());
1512  auto const compressed = optionCodec->compress(original);
1513  auto uncompressed = codec->uncompress(compressed);
1514  EXPECT_EQ(original, uncompressed);
1515  uncompressed = optionCodec->uncompress(compressed);
1516  EXPECT_EQ(original, uncompressed);
1517  }
1518 }
1519 
1520 class ZlibOptionsTest
1521  : public testing::TestWithParam<std::tuple<ZlibFormat, int, int, int>> {
1522  protected:
1523  void SetUp() override {
1524  auto tup = GetParam();
1525  options_.format = std::get<0>(tup);
1526  options_.windowSize = std::get<1>(tup);
1527  options_.memLevel = std::get<2>(tup);
1528  options_.strategy = std::get<3>(tup);
1529  codec_ = zlib::getStreamCodec(options_);
1530  }
1531 
1532  void runSimpleRoundTripTest(const DataHolder& dh);
1533 
1534  private:
1535  zlib::Options options_;
1536  std::unique_ptr<StreamCodec> codec_;
1537 };
1538 
1539 void ZlibOptionsTest::runSimpleRoundTripTest(const DataHolder& dh) {
1540  size_t const uncompressedLength = (size_t)1 << 16;
1541  auto const original = std::string(
1542  reinterpret_cast<const char*>(dh.data(uncompressedLength).data()),
1543  uncompressedLength);
1544 
1545  auto const compressed = codec_->compress(original);
1546  auto const uncompressed = codec_->uncompress(compressed);
1547  EXPECT_EQ(uncompressed, original);
1548 }
1549 
1550 TEST_P(ZlibOptionsTest, simpleRoundTripTest) {
1551  runSimpleRoundTripTest(constantDataHolder);
1552  runSimpleRoundTripTest(randomDataHolder);
1553 }
1554 
1556  ZlibOptionsTest,
1557  ZlibOptionsTest,
1558  testing::Combine(
1559  testing::Values(
1562  ZlibFormat::RAW,
1563  ZlibFormat::AUTO),
1564  testing::Values(9, 12, 15),
1565  testing::Values(1, 8, 9),
1566  testing::Values(
1567  Z_DEFAULT_STRATEGY,
1568  Z_FILTERED,
1569  Z_HUFFMAN_ONLY,
1570  Z_RLE,
1571  Z_FIXED)));
1572 
1573 #endif // FOLLY_HAVE_LIBZ
1574 
1575 } // namespace test
1576 } // namespace io
1577 } // namespace folly
static std::unique_ptr< IOBuf > compressSome(StreamCodec *codec, ByteRange data, uint64_t bufferSize, StreamCodec::FlushOp flush)
#define EXPECT_LE(val1, val2)
Definition: gtest.h:1928
#define ASSERT_GE(val1, val2)
Definition: gtest.h:1972
#define EXPECT_ANY_THROW(statement)
Definition: gtest.h:1847
std::vector< uint8_t > buffer(kBufferSize+16)
uint64_t fnv64_buf(const void *buf, size_t n, uint64_t hash=FNV_64_HASH_START) noexcept
Definition: Hash.h:199
*than *hazptr_holder h
Definition: Hazptr.h:116
void append(std::unique_ptr< folly::IOBuf > &&buf, bool pack=false)
Definition: IOBufQueue.cpp:143
bool hasCodec(CodecType type)
RandomDataHolder randomDataHolder(dataSizeLog2)
#define EXPECT_THROW(statement, expected_exception)
Definition: gtest.h:1843
bool uncompressStream(folly::ByteRange &input, folly::MutableByteRange &output, FlushOp flushOp=StreamCodec::FlushOp::NONE)
#define ASSERT_EQ(val1, val2)
Definition: gtest.h:1956
static std::unique_ptr< IOBuf > create(std::size_t capacity)
Definition: IOBuf.cpp:229
LogLevel max
Definition: LogLevel.cpp:31
static std::unique_ptr< IOBuf > wrapBuffer(const void *buf, std::size_t capacity)
Definition: IOBuf.cpp:353
static bool codecHasFlush(CodecType type)
std::vector< ByteRange > split(ByteRange data) const
void runSimpleTest(const DataHolder &dh)
PskType type
static const int seed
bool hasStreamCodec(CodecType type)
#define EXPECT_EQ(val1, val2)
Definition: gtest.h:1922
std::unordered_map< uint64_t, uint64_t > hashCache_
void runResetStreamTest(DataHolder const &dh)
constexpr size_t kMaxVarintLength64
Definition: Varint.h:50
static uint64_t test(std::string name, bool fc_, bool dedicated_, bool tc_, bool syncops_, uint64_t base)
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
int number
std::unique_ptr< Codec > codec_
size_t tailroom() const
Definition: IOBufQueue.h:403
CodecFactory codec
constexpr size_type size() const
Definition: Range.h:431
auto begin(TestAdlIterable &instance)
Definition: ForeachTest.cpp:56
INSTANTIATE_TEST_CASE_P(CompressionTest, CompressionTest, testing::Combine(testing::Values(-1, 0, 1, 12, 22, 25, 27), testing::Values(1, 2, 3, 8, 65), testing::ValuesIn(availableCodecs())))
std::unique_ptr< folly::IOBuf > move()
Definition: IOBufQueue.h:459
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
std::unique_ptr< Codec > codec_
std::unique_ptr< StreamCodec > codec_
bool prefix(Cursor &c, uint32_t expected)
#define EXPECT_GE(val1, val2)
Definition: gtest.h:1932
const uint64_t FNV_64_HASH_START
Definition: Hash.h:146
void split(const Delim &delimiter, const String &input, std::vector< OutputType > &out, bool ignoreEmpty)
Definition: String-inl.h:382
static std::pair< bool, std::unique_ptr< IOBuf > > uncompressSome(StreamCodec *codec, ByteRange &data, uint64_t bufferSize, StreamCodec::FlushOp flush)
TEST_P(CompressionTest, RandomData)
std::vector< std::thread::id > threads
auto rng
Definition: CollectTest.cpp:31
constexpr auto size(C const &c) -> decltype(c.size())
Definition: Access.h:45
constexpr bool empty() const
Definition: Range.h:443
#define EXPECT_THROW_IF_DEBUG(statement, expected_exception)
constexpr auto empty(C const &c) -> decltype(c.empty())
Definition: Access.h:55
LogLevel min
Definition: LogLevel.cpp:30
uint64_t hashIOBuf(const IOBuf *buf)
void runSimpleStringTest(const DataHolder &dh)
std::unique_ptr< StreamCodec > getStreamCodec(CodecType type, int level)
auto end(TestAdlIterable &instance)
Definition: ForeachTest.cpp:62
void runUncompressStreamTest(DataHolder const &dh)
constexpr Iter data() const
Definition: Range.h:446
std::string prefix_
Range subpiece(size_type first, size_type length=npos) const
Definition: Range.h:686
constexpr Range< Iter > range(Iter first, Iter last)
Definition: Range.h:1114
std::unique_ptr< IOBuf > split(std::unique_ptr< IOBuf > data) const
void expect(LineReader &lr, const char *expected)
auto start
std::string uncompressed_
std::size_t computeChainDataLength() const
Definition: IOBuf.cpp:501
#define EXPECT_TRUE(condition)
Definition: gtest.h:1859
TEST(CompressionTestNeedsUncompressedLength, Simple)
constexpr size_t dataSizeLog2
static std::vector< CodecType > availableCodecs()
const char * string
Definition: Conv.cpp:212
void uncheckedAdvance(size_type n)
Definition: Range.h:695
bool compressStream(folly::ByteRange &input, folly::MutableByteRange &output, FlushOp flushOp=StreamCodec::FlushOp::NONE)
std::unique_ptr< uint8_t[]> data_
void runSimpleTest(const DataHolder &dh)
Range< const unsigned char * > ByteRange
Definition: Range.h:1163
static std::vector< CodecType > availableStreamCodecs()
std::unique_ptr< Codec > getTerminalCodec()
void runSimpleIOBufTest(const DataHolder &dh)
ConstantDataHolder constantDataHolder(dataSizeLog2)
void trimStart(std::size_t amount)
Definition: IOBuf.h:703
std::unique_ptr< StreamCodec > codec_
IOBuf cloneCoalescedAsValue() const
Definition: IOBuf.cpp:570
#define EXPECT_FALSE(condition)
Definition: gtest.h:1862
static std::vector< CodecType > supportedCodecs(std::vector< CodecType > const &v)
void runCompressStreamTest(DataHolder const &dh)
static uint64_t rand64()
Definition: Random.h:263
std::unique_ptr< Codec > getAutoUncompressionCodec(std::vector< std::unique_ptr< Codec >> customCodecs, std::unique_ptr< Codec > terminalCodec)
ByteRange data(size_t size) const
static std::unique_ptr< IOBuf > copyBuffer(const void *buf, std::size_t size, std::size_t headroom=0, std::size_t minTailroom=0)
Definition: IOBuf.h:1587
#define ASSERT_TRUE(condition)
Definition: gtest.h:1865
uint64_t oneBasedMsbPos(uint64_t number)
std::unique_ptr< Codec > getCodec(CodecType type, int level)
uint64_t hash(size_t size) const
uint32_t randomNumberSeed()
Definition: Random.h:367
constexpr None none
Definition: Optional.h:87
#define EXPECT_GT(val1, val2)
Definition: gtest.h:1934