proxygen
folly::io::StreamCodec Class Referenceabstract

#include <Compression.h>

Inheritance diagram for folly::io::StreamCodec:
folly::io::Codec

Public Types

enum  FlushOp { FlushOp::NONE, FlushOp::FLUSH, FlushOp::END }
 

Public Member Functions

 ~StreamCodec () override
 
bool needsDataLength () const
 
void resetStream (folly::Optional< uint64_t > uncompressedLength=folly::none)
 
bool compressStream (folly::ByteRange &input, folly::MutableByteRange &output, FlushOp flushOp=StreamCodec::FlushOp::NONE)
 
bool uncompressStream (folly::ByteRange &input, folly::MutableByteRange &output, FlushOp flushOp=StreamCodec::FlushOp::NONE)
 
- Public Member Functions inherited from folly::io::Codec
virtual ~Codec ()
 
uint64_t maxUncompressedLength () const
 
CodecType type () const
 
bool needsUncompressedLength () const
 
std::unique_ptr< IOBufcompress (const folly::IOBuf *data)
 
std::string compress (StringPiece data)
 
std::unique_ptr< IOBufuncompress (const IOBuf *data, folly::Optional< uint64_t > uncompressedLength=folly::none)
 
std::string uncompress (StringPiece data, folly::Optional< uint64_t > uncompressedLength=folly::none)
 
uint64_t maxCompressedLength (uint64_t uncompressedLength) const
 
folly::Optional< uint64_tgetUncompressedLength (const folly::IOBuf *data, folly::Optional< uint64_t > uncompressedLength=folly::none) const
 
virtual std::vector< std::stringvalidPrefixes () const
 
virtual bool canUncompress (const folly::IOBuf *data, folly::Optional< uint64_t > uncompressedLength=folly::none) const
 

Protected Member Functions

 StreamCodec (CodecType type, folly::Optional< int > level=folly::none, folly::StringPiece name={}, bool counters=true)
 
folly::Optional< uint64_tuncompressedLength () const
 
- Protected Member Functions inherited from folly::io::Codec
 Codec (CodecType type, folly::Optional< int > level=folly::none, folly::StringPiece name={}, bool counters=true)
 

Private Types

enum  State {
  State::RESET, State::COMPRESS, State::COMPRESS_FLUSH, State::COMPRESS_END,
  State::UNCOMPRESS, State::END
}
 

Private Member Functions

std::unique_ptr< IOBufdoCompress (const folly::IOBuf *data) override
 
std::unique_ptr< IOBufdoUncompress (const folly::IOBuf *data, folly::Optional< uint64_t > uncompressedLength) override
 
virtual bool doNeedsDataLength () const
 
virtual void doResetStream ()=0
 
virtual bool doCompressStream (folly::ByteRange &input, folly::MutableByteRange &output, FlushOp flushOp)=0
 
virtual bool doUncompressStream (folly::ByteRange &input, folly::MutableByteRange &output, FlushOp flushOp)=0
 
void assertStateIs (State expected) const
 

Private Attributes

State state_ {State::RESET}
 
ByteRange previousInput_ {}
 
folly::Optional< uint64_tuncompressedLength_ {}
 
bool progressMade_ {true}
 

Additional Inherited Members

- Static Public Attributes inherited from folly::io::Codec
static constexpr uint64_t UNLIMITED_UNCOMPRESSED_LENGTH = uint64_t(-1)
 

Detailed Description

Definition at line 274 of file Compression.h.

Member Enumeration Documentation

Enumerator
NONE 
FLUSH 
END 

Definition at line 311 of file Compression.h.

311 { NONE, FLUSH, END };
enum folly::io::StreamCodec::State
strongprivate
Enumerator
RESET 
COMPRESS 
COMPRESS_FLUSH 
COMPRESS_END 
UNCOMPRESS 
END 

Definition at line 424 of file Compression.h.

424  {
425  RESET,
426  COMPRESS,
427  COMPRESS_FLUSH,
428  COMPRESS_END,
429  UNCOMPRESS,
430  END,
431  };

Constructor & Destructor Documentation

folly::io::StreamCodec::~StreamCodec ( )
inlineoverride

Definition at line 276 of file Compression.h.

References folly::none.

276 {}
folly::io::StreamCodec::StreamCodec ( CodecType  type,
folly::Optional< int >  level = folly::none,
folly::StringPiece  name = {},
bool  counters = true 
)
inlineprotected

Definition at line 392 of file Compression.h.

References folly::gen::move, and name.

395  {},
396  bool counters = true)
397  : Codec(type, std::move(level), name, counters) {}
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
Codec(CodecType type, folly::Optional< int > level=folly::none, folly::StringPiece name={}, bool counters=true)
Definition: Compression.cpp:71
const char * name
Definition: http_parser.c:437
CodecType type() const
Definition: Compression.h:147

Member Function Documentation

void folly::io::StreamCodec::assertStateIs ( State  expected) const
private

Definition at line 322 of file Compression.cpp.

322  {
323  if (state_ != expected) {
324  throw std::logic_error(folly::to<std::string>(
325  "Codec: state is ", state_, "; expected state ", expected));
326  }
327 }
bool folly::io::StreamCodec::compressStream ( folly::ByteRange input,
folly::MutableByteRange output,
StreamCodec::FlushOp  flushOp = StreamCodec::FlushOp::NONE 
)

Compresses some data from the input buffer and writes the compressed data into the output buffer. It may read input without producing any output, except when forced to flush.

The input buffer is advanced to point to the range of data that hasn't yet been read. Compression will resume at this point for the next call to compressStream(). The output buffer is advanced one byte past the last byte written.

The default flushOp is NONE, which allows compressStream() complete discretion in how much data to gather before writing any output.

If flushOp is END, all pending and input data is flushed to the output buffer, and the frame is ended. compressStream() must be called with the same input and flushOp END until it returns true. At this point the caller must call resetStream() to use the codec again.

If flushOp is FLUSH, all pending and input data is flushed to the output buffer, but the frame is not ended. compressStream() must be called with the same input and flushOp END until it returns true. At this point the caller can continue to compressStream() with any input data and flushOp. The uncompressor, if passed all the produced output data, will be able to uncompress all the input data passed to compressStream() so far. Excessive use of flushOp FLUSH will deteriorate compression ratio. This is useful for stateful streaming across a network. Most users don't need to use this flushOp.

A std::logic_error is thrown on incorrect usage of the API. A std::runtime_error is thrown upon error conditions or if no forward progress could be made twice in a row.

Definition at line 336 of file Compression.cpp.

References folly::Range< Iter >::empty(), END, FLUSH, NONE, folly::Range< Iter >::size(), and uint64_t.

Referenced by folly::io::test::compressSome().

339  {
340  if (state_ == State::RESET && input.empty() &&
341  flushOp == StreamCodec::FlushOp::END &&
342  uncompressedLength().value_or(0) != 0) {
343  throw std::runtime_error("Codec: invalid uncompressed length");
344  }
345 
346  if (!uncompressedLength() && needsDataLength()) {
347  throw std::runtime_error("Codec: uncompressed length required");
348  }
349  if (state_ == State::RESET && !input.empty() &&
350  uncompressedLength() == uint64_t(0)) {
351  throw std::runtime_error("Codec: invalid uncompressed length");
352  }
353  // Handle input state transitions
354  switch (flushOp) {
356  if (state_ == State::RESET) {
358  }
360  break;
362  if (state_ == State::RESET || state_ == State::COMPRESS) {
364  }
366  break;
368  if (state_ == State::RESET || state_ == State::COMPRESS) {
370  }
372  break;
373  }
374  size_t const inputSize = input.size();
375  size_t const outputSize = output.size();
376  bool const done = doCompressStream(input, output, flushOp);
377  if (!done && inputSize == input.size() && outputSize == output.size()) {
378  if (!progressMade_) {
379  throw std::runtime_error("Codec: No forward progress made");
380  }
381  // Throw an exception if there is no progress again next time
382  progressMade_ = false;
383  } else {
384  progressMade_ = true;
385  }
386  // Handle output state transitions
387  if (done) {
388  if (state_ == State::COMPRESS_FLUSH) {
390  } else if (state_ == State::COMPRESS_END) {
391  state_ = State::END;
392  }
393  // Check internal invariants
394  DCHECK(input.empty());
395  DCHECK(flushOp != StreamCodec::FlushOp::NONE);
396  }
397  return done;
398 }
constexpr size_type size() const
Definition: Range.h:431
virtual bool doCompressStream(folly::ByteRange &input, folly::MutableByteRange &output, FlushOp flushOp)=0
constexpr bool empty() const
Definition: Range.h:443
bool needsDataLength() const
FOLLY_CPP14_CONSTEXPR Value value_or(U &&dflt) const &
Definition: Optional.h:330
folly::Optional< uint64_t > uncompressedLength() const
Definition: Compression.h:401
void assertStateIs(State expected) const
std::unique_ptr< IOBuf > folly::io::StreamCodec::doCompress ( const folly::IOBuf data)
overrideprivatevirtual

Implements folly::io::Codec.

Definition at line 444 of file Compression.cpp.

References folly::io::addOutputBuffer(), buffer(), folly::IOBuf::computeChainDataLength(), current, folly::data(), folly::IOBuf::data(), folly::Range< Iter >::empty(), END, folly::IOBuf::length(), folly::io::Codec::maxCompressedLength(), folly::IOBuf::next(), NONE, gmock_output_test::output, folly::Range< Iter >::size(), and uint64_t.

444  {
446  resetStream(uncompressedLength);
447  uint64_t const maxCompressedLen = maxCompressedLength(uncompressedLength);
448 
449  auto constexpr kMaxSingleStepLength = uint64_t(64) << 20; // 64 MB
450  auto constexpr kDefaultBufferLength = uint64_t(4) << 20; // 4 MB
451 
453  auto buffer = addOutputBuffer(
454  output,
455  maxCompressedLen <= kMaxSingleStepLength ? maxCompressedLen
456  : kDefaultBufferLength);
457 
458  // Compress the entire IOBuf chain into the IOBuf chain pointed to by buffer
459  IOBuf const* current = data;
460  ByteRange input{current->data(), current->length()};
462  bool done = false;
463  while (!done) {
464  while (input.empty() && current->next() != data) {
465  current = current->next();
466  input = {current->data(), current->length()};
467  }
468  if (current->next() == data) {
469  // This is the last input buffer so end the stream
470  flushOp = StreamCodec::FlushOp::END;
471  }
472  if (output.empty()) {
473  buffer->prependChain(addOutputBuffer(output, kDefaultBufferLength));
474  }
475  done = compressStream(input, output, flushOp);
476  if (done) {
477  DCHECK(input.empty());
478  DCHECK(flushOp == StreamCodec::FlushOp::END);
479  DCHECK_EQ(current->next(), data);
480  }
481  }
482  buffer->prev()->trimEnd(output.size());
483  return buffer;
484 }
std::vector< uint8_t > buffer(kBufferSize+16)
Range< unsigned char * > MutableByteRange
Definition: Range.h:1164
static std::unique_ptr< IOBuf > addOutputBuffer(MutableByteRange &output, uint64_t size)
int current
constexpr Iter data() const
Definition: Range.h:446
constexpr auto data(C &c) -> decltype(c.data())
Definition: Access.h:71
uint64_t maxCompressedLength(uint64_t uncompressedLength) const
std::size_t computeChainDataLength() const
Definition: IOBuf.cpp:501
folly::Optional< uint64_t > uncompressedLength() const
Definition: Compression.h:401
bool compressStream(folly::ByteRange &input, folly::MutableByteRange &output, FlushOp flushOp=StreamCodec::FlushOp::NONE)
Range< const unsigned char * > ByteRange
Definition: Range.h:1163
void resetStream(folly::Optional< uint64_t > uncompressedLength=folly::none)
virtual bool folly::io::StreamCodec::doCompressStream ( folly::ByteRange input,
folly::MutableByteRange output,
FlushOp  flushOp 
)
privatepure virtual
bool folly::io::StreamCodec::doNeedsDataLength ( ) const
privatevirtual

Definition at line 318 of file Compression.cpp.

318  {
319  return false;
320 }
virtual void folly::io::StreamCodec::doResetStream ( )
privatepure virtual
std::unique_ptr< IOBuf > folly::io::StreamCodec::doUncompress ( const folly::IOBuf data,
folly::Optional< uint64_t uncompressedLength 
)
overrideprivatevirtual

Implements folly::io::Codec.

Definition at line 494 of file Compression.cpp.

References folly::io::addOutputBuffer(), folly::Range< Iter >::advance(), folly::IOBuf::append(), b, folly::IOBuf::begin(), buffer(), folly::io::BZIP2, folly::IOBufQueue::cacheChainLength(), folly::io::Codec::canUncompress(), folly::IOBufQueue::chainLength(), folly::IOBuf::clone(), folly::IOBuf::cloneCoalescedAsValue(), folly::IOBuf::coalesce(), folly::io::Codec::Codec(), folly::io::COMPRESSION_LEVEL_BEST, folly::io::COMPRESSION_LEVEL_DEFAULT, folly::io::COMPRESSION_LEVEL_FASTEST, folly::io::computeBufferLength(), folly::IOBuf::computeChainDataLength(), folly::IOBuf::create(), current, folly::data(), folly::Range< Iter >::data(), folly::IOBuf::data(), folly::io::compression::detail::dataStartsWithLE(), folly::io::Codec::doCompress(), folly::io::Codec::doMaxCompressedLength(), folly::io::Codec::doMaxUncompressedLength(), folly::io::Codec::doNeedsUncompressedLength(), folly::io::Codec::doUncompress(), folly::Range< Iter >::empty(), folly::encodeVarint(), END, FLUSH, folly::io::getCodec(), folly::io::getStreamCodec(), folly::io::Codec::getUncompressedLength(), folly::io::GZIP, folly::Optional< Value >::hasValue(), int8_t, folly::IOBuf::isChained(), folly::kMaxVarintLength64, folly::IOBuf::length(), folly::io::LZ4, folly::io::LZ4_FRAME, folly::io::LZ4_VARINT_SIZE, folly::io::LZMA2, folly::io::LZMA2_VARINT_SIZE, max, folly::io::Codec::maxCompressedLength(), folly::io::Codec::maxUncompressedLength(), min, folly::IOBufQueue::move(), folly::IOBuf::next(), folly::io::NO_COMPRESSION, NONE, gmock_output_test::output, folly::io::detail::CursorBase< Derived, BufType >::peekBytes(), folly::IOBufQueue::postallocate(), folly::IOBufQueue::preallocate(), folly::io::compression::detail::prefixToStringLE(), folly::range(), folly::io::detail::CursorBase< Derived, BufType >::read(), SCOPE_EXIT, folly::Range< Iter >::size(), folly::Skip, folly::io::SNAPPY, stream, folly::IOBuf::tailroom(), folly::TooManyBytes, folly::tryDecodeVarint(), folly::io::Codec::type(), uint32_t, uint64_t, uint8_t, folly::Range< Iter >::uncheckedAdvance(), val, folly::io::Codec::validPrefixes(), folly::IOBuf::writableTail(), folly::io::ZLIB, folly::io::ZSTD, and folly::io::ZSTD_FAST.

496  {
497  auto constexpr kMaxSingleStepLength = uint64_t(64) << 20; // 64 MB
498  auto constexpr kBlockSize = uint64_t(128) << 10;
499  auto const defaultBufferLength =
500  computeBufferLength(data->computeChainDataLength(), kBlockSize);
501 
502  uncompressedLength = getUncompressedLength(data, uncompressedLength);
503  resetStream(uncompressedLength);
504 
506  auto buffer = addOutputBuffer(
507  output,
508  (uncompressedLength && *uncompressedLength <= kMaxSingleStepLength
509  ? *uncompressedLength
510  : defaultBufferLength));
511 
512  // Uncompress the entire IOBuf chain into the IOBuf chain pointed to by buffer
513  IOBuf const* current = data;
514  ByteRange input{current->data(), current->length()};
516  bool done = false;
517  while (!done) {
518  while (input.empty() && current->next() != data) {
519  current = current->next();
520  input = {current->data(), current->length()};
521  }
522  if (current->next() == data) {
523  // Tell the uncompressor there is no more input (it may optimize)
524  flushOp = StreamCodec::FlushOp::END;
525  }
526  if (output.empty()) {
527  buffer->prependChain(addOutputBuffer(output, defaultBufferLength));
528  }
529  done = uncompressStream(input, output, flushOp);
530  }
531  if (!input.empty()) {
532  throw std::runtime_error("Codec: Junk after end of data");
533  }
534 
535  buffer->prev()->trimEnd(output.size());
536  if (uncompressedLength &&
537  *uncompressedLength != buffer->computeChainDataLength()) {
538  throw std::runtime_error("Codec: invalid uncompressed length");
539  }
540 
541  return buffer;
542 }
std::vector< uint8_t > buffer(kBufferSize+16)
bool uncompressStream(folly::ByteRange &input, folly::MutableByteRange &output, FlushOp flushOp=StreamCodec::FlushOp::NONE)
static uint64_t computeBufferLength(uint64_t const compressedLength, uint64_t const blockSize)
Range< unsigned char * > MutableByteRange
Definition: Range.h:1164
static std::unique_ptr< IOBuf > addOutputBuffer(MutableByteRange &output, uint64_t size)
int current
constexpr Iter data() const
Definition: Range.h:446
constexpr auto data(C &c) -> decltype(c.data())
Definition: Access.h:71
std::size_t computeChainDataLength() const
Definition: IOBuf.cpp:501
Range< const unsigned char * > ByteRange
Definition: Range.h:1163
void resetStream(folly::Optional< uint64_t > uncompressedLength=folly::none)
folly::Optional< uint64_t > getUncompressedLength(const folly::IOBuf *data, folly::Optional< uint64_t > uncompressedLength=folly::none) const
virtual bool folly::io::StreamCodec::doUncompressStream ( folly::ByteRange input,
folly::MutableByteRange output,
FlushOp  flushOp 
)
privatepure virtual
bool folly::io::StreamCodec::needsDataLength ( ) const

Does the codec need the data length before compression streaming?

Definition at line 314 of file Compression.cpp.

314  {
315  return doNeedsDataLength();
316 }
virtual bool doNeedsDataLength() const
void folly::io::StreamCodec::resetStream ( folly::Optional< uint64_t uncompressedLength = folly::none)

Reset the state of the codec, and set the uncompressed length for the next streaming operation. If uncompressedLength is not none it must be exactly the uncompressed length. compressStream() must be passed exactly uncompressedLength input bytes before the stream is ended. uncompressStream() must be passed a compressed frame that uncompresses to uncompressedLength.

Definition at line 329 of file Compression.cpp.

329  {
332  progressMade_ = true;
333  doResetStream();
334 }
folly::Optional< uint64_t > uncompressedLength_
Definition: Compression.h:436
virtual void doResetStream()=0
folly::Optional< uint64_t > uncompressedLength() const
Definition: Compression.h:401
folly::Optional<uint64_t> folly::io::StreamCodec::uncompressedLength ( ) const
inlineprotected

Definition at line 401 of file Compression.h.

References folly::data().

401  {
402  return uncompressedLength_;
403  }
folly::Optional< uint64_t > uncompressedLength_
Definition: Compression.h:436
bool folly::io::StreamCodec::uncompressStream ( folly::ByteRange input,
folly::MutableByteRange output,
StreamCodec::FlushOp  flushOp = StreamCodec::FlushOp::NONE 
)

Uncompresses some data from the input buffer and writes the uncompressed data into the output buffer. It may read input without producing any output.

The input buffer is advanced to point to the range of data that hasn't yet been read. Uncompression will resume at this point for the next call to uncompressStream(). The output buffer is advanced one byte past the last byte written.

The default flushOp is NONE, which allows uncompressStream() complete discretion in how much output data to flush. The uncompressor may not make maximum forward progress, but will make some forward progress when possible.

If flushOp is END, the caller guarantees that no more input will be presented to uncompressStream(). uncompressStream() must be called with the same input and flushOp END until it returns true. This is not mandatory, but if the input is all available in one buffer, and there is enough output space to write the entire frame, codecs can uncompress faster.

If flushOp is FLUSH, uncompressStream() is guaranteed to make the maximum amount of forward progress possible. When using this flushOp and uncompressStream() returns with !output.empty() the caller knows that all pending output has been flushed. This is useful for stateful streaming across a network, and it should be used in conjunction with compressStream() with flushOp FLUSH. Most users don't need to use this flushOp.

A std::runtime_error is thrown upon error conditions or if no forward progress could be made upon two consecutive calls to the function (only the second call will throw an exception).

Returns true at the end of a frame. At this point resetStream() must be called to reuse the codec.

Definition at line 400 of file Compression.cpp.

References folly::Range< Iter >::empty(), and folly::Range< Iter >::size().

Referenced by folly::io::test::uncompressSome().

403  {
404  if (state_ == State::RESET && input.empty()) {
405  if (uncompressedLength().value_or(0) == 0) {
406  return true;
407  }
408  return false;
409  }
410  // Handle input state transitions
411  if (state_ == State::RESET) {
413  }
415  size_t const inputSize = input.size();
416  size_t const outputSize = output.size();
417  bool const done = doUncompressStream(input, output, flushOp);
418  if (!done && inputSize == input.size() && outputSize == output.size()) {
419  if (!progressMade_) {
420  throw std::runtime_error("Codec: no forward progress made");
421  }
422  // Throw an exception if there is no progress again next time
423  progressMade_ = false;
424  } else {
425  progressMade_ = true;
426  }
427  // Handle output state transitions
428  if (done) {
429  state_ = State::END;
430  }
431  return done;
432 }
constexpr size_type size() const
Definition: Range.h:431
virtual bool doUncompressStream(folly::ByteRange &input, folly::MutableByteRange &output, FlushOp flushOp)=0
constexpr bool empty() const
Definition: Range.h:443
FOLLY_CPP14_CONSTEXPR Value value_or(U &&dflt) const &
Definition: Optional.h:330
folly::Optional< uint64_t > uncompressedLength() const
Definition: Compression.h:401
void assertStateIs(State expected) const

Member Data Documentation

ByteRange folly::io::StreamCodec::previousInput_ {}
private

Definition at line 435 of file Compression.h.

bool folly::io::StreamCodec::progressMade_ {true}
private

Definition at line 437 of file Compression.h.

State folly::io::StreamCodec::state_ {State::RESET}
private

Definition at line 434 of file Compression.h.

folly::Optional<uint64_t> folly::io::StreamCodec::uncompressedLength_ {}
private

Definition at line 436 of file Compression.h.


The documentation for this class was generated from the following files: