X-Git-Url: http://plrg.eecs.uci.edu/git/?p=folly.git;a=blobdiff_plain;f=folly%2Fio%2FCompression.cpp;h=8dd19bc79ae80db4f05a64d20df4961a9b43e637;hp=53d36c5fd878c7597373dd6c9762470235300e0e;hb=b367f0fada0a53564f83e5072bbd45994a1c0795;hpb=ad7ca67b51d0d82a7fc21506921b5144b3f6f77d diff --git a/folly/io/Compression.cpp b/folly/io/Compression.cpp index 53d36c5f..8dd19bc7 100644 --- a/folly/io/Compression.cpp +++ b/folly/io/Compression.cpp @@ -1,5 +1,5 @@ /* - * Copyright 2015 Facebook, Inc. + * Copyright 2017 Facebook, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,6 +19,9 @@ #if FOLLY_HAVE_LIBLZ4 #include #include +#if LZ4_VERSION_NUMBER >= 10301 +#include +#endif #endif #include @@ -37,15 +40,23 @@ #endif #if FOLLY_HAVE_LIBZSTD +#define ZSTD_STATIC_LINKING_ONLY #include #endif +#if FOLLY_HAVE_LIBBZ2 +#include +#endif + +#include #include #include #include #include #include #include +#include +#include namespace folly { namespace io { @@ -56,26 +67,39 @@ std::unique_ptr Codec::compress(const IOBuf* data) { uint64_t len = data->computeChainDataLength(); if (len == 0) { return IOBuf::create(0); - } else if (len > maxUncompressedLength()) { + } + if (len > maxUncompressedLength()) { throw std::runtime_error("Codec: uncompressed length too large"); } return doCompress(data); } -std::unique_ptr Codec::uncompress(const IOBuf* data, - uint64_t uncompressedLength) { - if (uncompressedLength == UNKNOWN_UNCOMPRESSED_LENGTH) { +std::string Codec::compress(const StringPiece data) { + const uint64_t len = data.size(); + if (len == 0) { + return ""; + } + if (len > maxUncompressedLength()) { + throw std::runtime_error("Codec: uncompressed length too large"); + } + + return doCompressString(data); +} + +std::unique_ptr Codec::uncompress( + const IOBuf* data, + Optional uncompressedLength) { + if (!uncompressedLength) { if (needsUncompressedLength()) { throw std::invalid_argument("Codec: uncompressed length required"); } - } else if (uncompressedLength > maxUncompressedLength()) { + } else if (*uncompressedLength > maxUncompressedLength()) { throw std::runtime_error("Codec: uncompressed length too large"); } if (data->empty()) { - if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH && - uncompressedLength != 0) { + if (uncompressedLength.value_or(0) != 0) { throw std::runtime_error("Codec: invalid uncompressed length"); } return IOBuf::create(0); @@ -84,6 +108,27 @@ std::unique_ptr Codec::uncompress(const IOBuf* data, return doUncompress(data, uncompressedLength); } +std::string Codec::uncompress( + const StringPiece data, + Optional uncompressedLength) { + if (!uncompressedLength) { + if (needsUncompressedLength()) { + throw std::invalid_argument("Codec: uncompressed length required"); + } + } else if (*uncompressedLength > maxUncompressedLength()) { + throw std::runtime_error("Codec: uncompressed length too large"); + } + + if (data.empty()) { + if (uncompressedLength.value_or(0) != 0) { + throw std::runtime_error("Codec: invalid uncompressed length"); + } + return ""; + } + + return doUncompressString(data, uncompressedLength); +} + bool Codec::needsUncompressedLength() const { return doNeedsUncompressedLength(); } @@ -100,6 +145,274 @@ uint64_t Codec::doMaxUncompressedLength() const { return UNLIMITED_UNCOMPRESSED_LENGTH; } +std::vector Codec::validPrefixes() const { + return {}; +} + +bool Codec::canUncompress(const IOBuf*, Optional) const { + return false; +} + +std::string Codec::doCompressString(const StringPiece data) { + const IOBuf inputBuffer{IOBuf::WRAP_BUFFER, data}; + auto outputBuffer = doCompress(&inputBuffer); + std::string output; + output.reserve(outputBuffer->computeChainDataLength()); + for (auto range : *outputBuffer) { + output.append(reinterpret_cast(range.data()), range.size()); + } + return output; +} + +std::string Codec::doUncompressString( + const StringPiece data, + Optional uncompressedLength) { + const IOBuf inputBuffer{IOBuf::WRAP_BUFFER, data}; + auto outputBuffer = doUncompress(&inputBuffer, uncompressedLength); + std::string output; + output.reserve(outputBuffer->computeChainDataLength()); + for (auto range : *outputBuffer) { + output.append(reinterpret_cast(range.data()), range.size()); + } + return output; +} + +uint64_t Codec::maxCompressedLength(uint64_t uncompressedLength) const { + if (uncompressedLength == 0) { + return 0; + } + return doMaxCompressedLength(uncompressedLength); +} + +Optional Codec::getUncompressedLength( + const folly::IOBuf* data, + Optional uncompressedLength) const { + auto const compressedLength = data->computeChainDataLength(); + if (uncompressedLength == uint64_t(0) || compressedLength == 0) { + if (uncompressedLength.value_or(0) != 0 || compressedLength != 0) { + throw std::runtime_error("Invalid uncompressed length"); + } + return 0; + } + return doGetUncompressedLength(data, uncompressedLength); +} + +Optional Codec::doGetUncompressedLength( + const folly::IOBuf*, + Optional uncompressedLength) const { + return uncompressedLength; +} + +bool StreamCodec::needsDataLength() const { + return doNeedsDataLength(); +} + +bool StreamCodec::doNeedsDataLength() const { + return false; +} + +void StreamCodec::assertStateIs(State expected) const { + if (state_ != expected) { + throw std::logic_error(folly::to( + "Codec: state is ", state_, "; expected state ", expected)); + } +} + +void StreamCodec::resetStream(Optional uncompressedLength) { + state_ = State::RESET; + uncompressedLength_ = uncompressedLength; + doResetStream(); +} + +bool StreamCodec::compressStream( + ByteRange& input, + MutableByteRange& output, + StreamCodec::FlushOp flushOp) { + if (state_ == State::RESET && input.empty()) { + if (flushOp == StreamCodec::FlushOp::NONE) { + return false; + } + if (flushOp == StreamCodec::FlushOp::END && + uncompressedLength().value_or(0) != 0) { + throw std::runtime_error("Codec: invalid uncompressed length"); + } + return true; + } + if (state_ == State::RESET && !input.empty() && + uncompressedLength() == uint64_t(0)) { + throw std::runtime_error("Codec: invalid uncompressed length"); + } + // Handle input state transitions + switch (flushOp) { + case StreamCodec::FlushOp::NONE: + if (state_ == State::RESET) { + state_ = State::COMPRESS; + } + assertStateIs(State::COMPRESS); + break; + case StreamCodec::FlushOp::FLUSH: + if (state_ == State::RESET || state_ == State::COMPRESS) { + state_ = State::COMPRESS_FLUSH; + } + assertStateIs(State::COMPRESS_FLUSH); + break; + case StreamCodec::FlushOp::END: + if (state_ == State::RESET || state_ == State::COMPRESS) { + state_ = State::COMPRESS_END; + } + assertStateIs(State::COMPRESS_END); + break; + } + bool const done = doCompressStream(input, output, flushOp); + // Handle output state transitions + if (done) { + if (state_ == State::COMPRESS_FLUSH) { + state_ = State::COMPRESS; + } else if (state_ == State::COMPRESS_END) { + state_ = State::END; + } + // Check internal invariants + DCHECK(input.empty()); + DCHECK(flushOp != StreamCodec::FlushOp::NONE); + } + return done; +} + +bool StreamCodec::uncompressStream( + ByteRange& input, + MutableByteRange& output, + StreamCodec::FlushOp flushOp) { + if (state_ == State::RESET && input.empty()) { + if (uncompressedLength().value_or(0) == 0) { + return true; + } + return false; + } + // Handle input state transitions + if (state_ == State::RESET) { + state_ = State::UNCOMPRESS; + } + assertStateIs(State::UNCOMPRESS); + bool const done = doUncompressStream(input, output, flushOp); + // Handle output state transitions + if (done) { + state_ = State::END; + } + return done; +} + +static std::unique_ptr addOutputBuffer( + MutableByteRange& output, + uint64_t size) { + DCHECK(output.empty()); + auto buffer = IOBuf::create(size); + buffer->append(buffer->capacity()); + output = {buffer->writableData(), buffer->length()}; + return buffer; +} + +std::unique_ptr StreamCodec::doCompress(IOBuf const* data) { + uint64_t const uncompressedLength = data->computeChainDataLength(); + resetStream(uncompressedLength); + uint64_t const maxCompressedLen = maxCompressedLength(uncompressedLength); + + auto constexpr kMaxSingleStepLength = uint64_t(64) << 20; // 64 MB + auto constexpr kDefaultBufferLength = uint64_t(4) << 20; // 4 MB + + MutableByteRange output; + auto buffer = addOutputBuffer( + output, + maxCompressedLen <= kMaxSingleStepLength ? maxCompressedLen + : kDefaultBufferLength); + + // Compress the entire IOBuf chain into the IOBuf chain pointed to by buffer + IOBuf const* current = data; + ByteRange input{current->data(), current->length()}; + StreamCodec::FlushOp flushOp = StreamCodec::FlushOp::NONE; + for (;;) { + while (input.empty() && current->next() != data) { + current = current->next(); + input = {current->data(), current->length()}; + } + if (current->next() == data) { + // This is the last input buffer so end the stream + flushOp = StreamCodec::FlushOp::END; + } + if (output.empty()) { + buffer->prependChain(addOutputBuffer(output, kDefaultBufferLength)); + } + bool const done = compressStream(input, output, flushOp); + if (done) { + DCHECK(input.empty()); + DCHECK(flushOp == StreamCodec::FlushOp::END); + DCHECK_EQ(current->next(), data); + break; + } + } + buffer->prev()->trimEnd(output.size()); + return buffer; +} + +static uint64_t computeBufferLength( + uint64_t const compressedLength, + uint64_t const blockSize) { + uint64_t constexpr kMaxBufferLength = uint64_t(4) << 20; // 4 MiB + uint64_t const goodBufferSize = 4 * std::max(blockSize, compressedLength); + return std::min(goodBufferSize, kMaxBufferLength); +} + +std::unique_ptr StreamCodec::doUncompress( + IOBuf const* data, + Optional uncompressedLength) { + auto constexpr kMaxSingleStepLength = uint64_t(64) << 20; // 64 MB + auto constexpr kBlockSize = uint64_t(128) << 10; + auto const defaultBufferLength = + computeBufferLength(data->computeChainDataLength(), kBlockSize); + + uncompressedLength = getUncompressedLength(data, uncompressedLength); + resetStream(uncompressedLength); + + MutableByteRange output; + auto buffer = addOutputBuffer( + output, + (uncompressedLength && *uncompressedLength <= kMaxSingleStepLength + ? *uncompressedLength + : defaultBufferLength)); + + // Uncompress the entire IOBuf chain into the IOBuf chain pointed to by buffer + IOBuf const* current = data; + ByteRange input{current->data(), current->length()}; + StreamCodec::FlushOp flushOp = StreamCodec::FlushOp::NONE; + for (;;) { + while (input.empty() && current->next() != data) { + current = current->next(); + input = {current->data(), current->length()}; + } + if (current->next() == data) { + // Tell the uncompressor there is no more input (it may optimize) + flushOp = StreamCodec::FlushOp::END; + } + if (output.empty()) { + buffer->prependChain(addOutputBuffer(output, defaultBufferLength)); + } + bool const done = uncompressStream(input, output, flushOp); + if (done) { + break; + } + } + if (!input.empty()) { + throw std::runtime_error("Codec: Junk after end of data"); + } + + buffer->prev()->trimEnd(output.size()); + if (uncompressedLength && + *uncompressedLength != buffer->computeChainDataLength()) { + throw std::runtime_error("Codec: invalid uncompressed length"); + } + + return buffer; +} + namespace { /** @@ -111,14 +424,15 @@ class NoCompressionCodec final : public Codec { explicit NoCompressionCodec(int level, CodecType type); private: + uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override; std::unique_ptr doCompress(const IOBuf* data) override; std::unique_ptr doUncompress( const IOBuf* data, - uint64_t uncompressedLength) override; + Optional uncompressedLength) override; }; std::unique_ptr NoCompressionCodec::create(int level, CodecType type) { - return make_unique(level, type); + return std::make_unique(level, type); } NoCompressionCodec::NoCompressionCodec(int level, CodecType type) @@ -136,6 +450,11 @@ NoCompressionCodec::NoCompressionCodec(int level, CodecType type) } } +uint64_t NoCompressionCodec::doMaxCompressedLength( + uint64_t uncompressedLength) const { + return uncompressedLength; +} + std::unique_ptr NoCompressionCodec::doCompress( const IOBuf* data) { return data->clone(); @@ -143,11 +462,11 @@ std::unique_ptr NoCompressionCodec::doCompress( std::unique_ptr NoCompressionCodec::doUncompress( const IOBuf* data, - uint64_t uncompressedLength) { - if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH && - data->computeChainDataLength() != uncompressedLength) { - throw std::runtime_error(to( - "NoCompressionCodec: invalid uncompressed length")); + Optional uncompressedLength) { + if (uncompressedLength && + data->computeChainDataLength() != *uncompressedLength) { + throw std::runtime_error( + to("NoCompressionCodec: invalid uncompressed length")); } return data->clone(); } @@ -181,6 +500,39 @@ inline uint64_t decodeVarintFromCursor(folly::io::Cursor& cursor) { #endif // FOLLY_HAVE_LIBLZ4 || FOLLY_HAVE_LIBLZMA +namespace { +/** + * Reads sizeof(T) bytes, and returns false if not enough bytes are available. + * Returns true if the first n bytes are equal to prefix when interpreted as + * a little endian T. + */ +template +typename std::enable_if::value, bool>::type +dataStartsWithLE(const IOBuf* data, T prefix, uint64_t n = sizeof(T)) { + DCHECK_GT(n, 0); + DCHECK_LE(n, sizeof(T)); + T value; + Cursor cursor{data}; + if (!cursor.tryReadLE(value)) { + return false; + } + const T mask = n == sizeof(T) ? T(-1) : (T(1) << (8 * n)) - 1; + return prefix == (value & mask); +} + +template +typename std::enable_if::value, std::string>::type +prefixToStringLE(T prefix, uint64_t n = sizeof(T)) { + DCHECK_GT(n, 0); + DCHECK_LE(n, sizeof(T)); + prefix = Endian::little(prefix); + std::string result; + result.resize(n); + memcpy(&result[0], &prefix, n); + return result; +} +} // namespace + #if FOLLY_HAVE_LIBLZ4 /** @@ -194,19 +546,20 @@ class LZ4Codec final : public Codec { private: bool doNeedsUncompressedLength() const override; uint64_t doMaxUncompressedLength() const override; + uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override; bool encodeSize() const { return type() == CodecType::LZ4_VARINT_SIZE; } std::unique_ptr doCompress(const IOBuf* data) override; std::unique_ptr doUncompress( const IOBuf* data, - uint64_t uncompressedLength) override; + Optional uncompressedLength) override; bool highCompression_; }; std::unique_ptr LZ4Codec::create(int level, CodecType type) { - return make_unique(level, type); + return std::make_unique(level, type); } LZ4Codec::LZ4Codec(int level, CodecType type) : Codec(type) { @@ -243,31 +596,41 @@ uint64_t LZ4Codec::doMaxUncompressedLength() const { return LZ4_MAX_INPUT_SIZE; } +uint64_t LZ4Codec::doMaxCompressedLength(uint64_t uncompressedLength) const { + return LZ4_compressBound(uncompressedLength) + + (encodeSize() ? kMaxVarintLength64 : 0); +} + std::unique_ptr LZ4Codec::doCompress(const IOBuf* data) { - std::unique_ptr clone; + IOBuf clone; if (data->isChained()) { // LZ4 doesn't support streaming, so we have to coalesce - clone = data->clone(); - clone->coalesce(); - data = clone.get(); + clone = data->cloneCoalescedAsValue(); + data = &clone; } - uint32_t extraSize = encodeSize() ? kMaxVarintLength64 : 0; - auto out = IOBuf::create(extraSize + LZ4_compressBound(data->length())); + auto out = IOBuf::create(maxCompressedLength(data->length())); if (encodeSize()) { encodeVarintToIOBuf(data->length(), out.get()); } int n; + auto input = reinterpret_cast(data->data()); + auto output = reinterpret_cast(out->writableTail()); + const auto inputLength = data->length(); +#if LZ4_VERSION_NUMBER >= 10700 + if (highCompression_) { + n = LZ4_compress_HC(input, output, inputLength, out->tailroom(), 0); + } else { + n = LZ4_compress_default(input, output, inputLength, out->tailroom()); + } +#else if (highCompression_) { - n = LZ4_compressHC(reinterpret_cast(data->data()), - reinterpret_cast(out->writableTail()), - data->length()); + n = LZ4_compressHC(input, output, inputLength); } else { - n = LZ4_compress(reinterpret_cast(data->data()), - reinterpret_cast(out->writableTail()), - data->length()); + n = LZ4_compress(input, output, inputLength); } +#endif CHECK_GE(n, 0); CHECK_LE(n, out->capacity()); @@ -278,37 +641,35 @@ std::unique_ptr LZ4Codec::doCompress(const IOBuf* data) { std::unique_ptr LZ4Codec::doUncompress( const IOBuf* data, - uint64_t uncompressedLength) { - std::unique_ptr clone; + Optional uncompressedLength) { + IOBuf clone; if (data->isChained()) { // LZ4 doesn't support streaming, so we have to coalesce - clone = data->clone(); - clone->coalesce(); - data = clone.get(); + clone = data->cloneCoalescedAsValue(); + data = &clone; } folly::io::Cursor cursor(data); uint64_t actualUncompressedLength; if (encodeSize()) { actualUncompressedLength = decodeVarintFromCursor(cursor); - if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH && - uncompressedLength != actualUncompressedLength) { + if (uncompressedLength && *uncompressedLength != actualUncompressedLength) { throw std::runtime_error("LZ4Codec: invalid uncompressed length"); } } else { - actualUncompressedLength = uncompressedLength; - if (actualUncompressedLength == UNKNOWN_UNCOMPRESSED_LENGTH || - actualUncompressedLength > maxUncompressedLength()) { - throw std::runtime_error("LZ4Codec: invalid uncompressed length"); - } + // Invariants + DCHECK(uncompressedLength.hasValue()); + DCHECK(*uncompressedLength <= maxUncompressedLength()); + actualUncompressedLength = *uncompressedLength; } - auto p = cursor.peek(); + auto sp = StringPiece{cursor.peekBytes()}; auto out = IOBuf::create(actualUncompressedLength); - int n = LZ4_decompress_safe(reinterpret_cast(p.first), - reinterpret_cast(out->writableTail()), - p.second, - actualUncompressedLength); + int n = LZ4_decompress_safe( + sp.data(), + reinterpret_cast(out->writableTail()), + sp.size(), + actualUncompressedLength); if (n < 0 || uint64_t(n) != actualUncompressedLength) { throw std::runtime_error(to( @@ -318,7 +679,187 @@ std::unique_ptr LZ4Codec::doUncompress( return out; } -#endif // FOLLY_HAVE_LIBLZ4 +#if LZ4_VERSION_NUMBER >= 10301 + +class LZ4FrameCodec final : public Codec { + public: + static std::unique_ptr create(int level, CodecType type); + explicit LZ4FrameCodec(int level, CodecType type); + ~LZ4FrameCodec() override; + + std::vector validPrefixes() const override; + bool canUncompress(const IOBuf* data, Optional uncompressedLength) + const override; + + private: + uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override; + + std::unique_ptr doCompress(const IOBuf* data) override; + std::unique_ptr doUncompress( + const IOBuf* data, + Optional uncompressedLength) override; + + // Reset the dctx_ if it is dirty or null. + void resetDCtx(); + + int level_; + LZ4F_decompressionContext_t dctx_{nullptr}; + bool dirty_{false}; +}; + +/* static */ std::unique_ptr LZ4FrameCodec::create( + int level, + CodecType type) { + return std::make_unique(level, type); +} + +static constexpr uint32_t kLZ4FrameMagicLE = 0x184D2204; + +std::vector LZ4FrameCodec::validPrefixes() const { + return {prefixToStringLE(kLZ4FrameMagicLE)}; +} + +bool LZ4FrameCodec::canUncompress(const IOBuf* data, Optional) const { + return dataStartsWithLE(data, kLZ4FrameMagicLE); +} + +uint64_t LZ4FrameCodec::doMaxCompressedLength( + uint64_t uncompressedLength) const { + LZ4F_preferences_t prefs{}; + prefs.compressionLevel = level_; + prefs.frameInfo.contentSize = uncompressedLength; + return LZ4F_compressFrameBound(uncompressedLength, &prefs); +} + +static size_t lz4FrameThrowOnError(size_t code) { + if (LZ4F_isError(code)) { + throw std::runtime_error( + to("LZ4Frame error: ", LZ4F_getErrorName(code))); + } + return code; +} + +void LZ4FrameCodec::resetDCtx() { + if (dctx_ && !dirty_) { + return; + } + if (dctx_) { + LZ4F_freeDecompressionContext(dctx_); + } + lz4FrameThrowOnError(LZ4F_createDecompressionContext(&dctx_, 100)); + dirty_ = false; +} + +LZ4FrameCodec::LZ4FrameCodec(int level, CodecType type) : Codec(type) { + DCHECK(type == CodecType::LZ4_FRAME); + switch (level) { + case COMPRESSION_LEVEL_FASTEST: + case COMPRESSION_LEVEL_DEFAULT: + level_ = 0; + break; + case COMPRESSION_LEVEL_BEST: + level_ = 16; + break; + default: + level_ = level; + break; + } +} + +LZ4FrameCodec::~LZ4FrameCodec() { + if (dctx_) { + LZ4F_freeDecompressionContext(dctx_); + } +} + +std::unique_ptr LZ4FrameCodec::doCompress(const IOBuf* data) { + // LZ4 Frame compression doesn't support streaming so we have to coalesce + IOBuf clone; + if (data->isChained()) { + clone = data->cloneCoalescedAsValue(); + data = &clone; + } + // Set preferences + const auto uncompressedLength = data->length(); + LZ4F_preferences_t prefs{}; + prefs.compressionLevel = level_; + prefs.frameInfo.contentSize = uncompressedLength; + // Compress + auto buf = IOBuf::create(maxCompressedLength(uncompressedLength)); + const size_t written = lz4FrameThrowOnError(LZ4F_compressFrame( + buf->writableTail(), + buf->tailroom(), + data->data(), + data->length(), + &prefs)); + buf->append(written); + return buf; +} + +std::unique_ptr LZ4FrameCodec::doUncompress( + const IOBuf* data, + Optional uncompressedLength) { + // Reset the dctx if any errors have occurred + resetDCtx(); + // Coalesce the data + ByteRange in = *data->begin(); + IOBuf clone; + if (data->isChained()) { + clone = data->cloneCoalescedAsValue(); + in = clone.coalesce(); + } + data = nullptr; + // Select decompression options + LZ4F_decompressOptions_t options; + options.stableDst = 1; + // Select blockSize and growthSize for the IOBufQueue + IOBufQueue queue(IOBufQueue::cacheChainLength()); + auto blockSize = uint64_t{64} << 10; + auto growthSize = uint64_t{4} << 20; + if (uncompressedLength) { + // Allocate uncompressedLength in one chunk (up to 64 MB) + const auto allocateSize = std::min(*uncompressedLength, uint64_t{64} << 20); + queue.preallocate(allocateSize, allocateSize); + blockSize = std::min(*uncompressedLength, blockSize); + growthSize = std::min(*uncompressedLength, growthSize); + } else { + // Reduce growthSize for small data + const auto guessUncompressedLen = + 4 * std::max(blockSize, in.size()); + growthSize = std::min(guessUncompressedLen, growthSize); + } + // Once LZ4_decompress() is called, the dctx_ cannot be reused until it + // returns 0 + dirty_ = true; + // Decompress until the frame is over + size_t code = 0; + do { + // Allocate enough space to decompress at least a block + void* out; + size_t outSize; + std::tie(out, outSize) = queue.preallocate(blockSize, growthSize); + // Decompress + size_t inSize = in.size(); + code = lz4FrameThrowOnError( + LZ4F_decompress(dctx_, out, &outSize, in.data(), &inSize, &options)); + if (in.empty() && outSize == 0 && code != 0) { + // We passed no input, no output was produced, and the frame isn't over + // No more forward progress is possible + throw std::runtime_error("LZ4Frame error: Incomplete frame"); + } + in.uncheckedAdvance(inSize); + queue.postallocate(outSize); + } while (code != 0); + // At this point the decompression context can be reused + dirty_ = false; + if (uncompressedLength && queue.chainLength() != *uncompressedLength) { + throw std::runtime_error("LZ4Frame error: Invalid uncompressedLength"); + } + return queue.move(); +} + +#endif // LZ4_VERSION_NUMBER >= 10301 +#endif // FOLLY_HAVE_LIBLZ4 #if FOLLY_HAVE_LIBSNAPPY @@ -350,9 +891,9 @@ size_t IOBufSnappySource::Available() const { } const char* IOBufSnappySource::Peek(size_t* len) { - auto p = cursor_.peek(); - *len = p.second; - return reinterpret_cast(p.first); + auto sp = StringPiece{cursor_.peekBytes()}; + *len = sp.size(); + return sp.data(); } void IOBufSnappySource::Skip(size_t n) { @@ -368,14 +909,15 @@ class SnappyCodec final : public Codec { private: uint64_t doMaxUncompressedLength() const override; + uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override; std::unique_ptr doCompress(const IOBuf* data) override; std::unique_ptr doUncompress( const IOBuf* data, - uint64_t uncompressedLength) override; + Optional uncompressedLength) override; }; std::unique_ptr SnappyCodec::create(int level, CodecType type) { - return make_unique(level, type); + return std::make_unique(level, type); } SnappyCodec::SnappyCodec(int level, CodecType type) : Codec(type) { @@ -397,10 +939,13 @@ uint64_t SnappyCodec::doMaxUncompressedLength() const { return std::numeric_limits::max(); } +uint64_t SnappyCodec::doMaxCompressedLength(uint64_t uncompressedLength) const { + return snappy::MaxCompressedLength(uncompressedLength); +} + std::unique_ptr SnappyCodec::doCompress(const IOBuf* data) { IOBufSnappySource source(data); - auto out = - IOBuf::create(snappy::MaxCompressedLength(source.Available())); + auto out = IOBuf::create(maxCompressedLength(source.Available())); snappy::UncheckedByteArraySink sink(reinterpret_cast( out->writableTail())); @@ -412,8 +957,9 @@ std::unique_ptr SnappyCodec::doCompress(const IOBuf* data) { return out; } -std::unique_ptr SnappyCodec::doUncompress(const IOBuf* data, - uint64_t uncompressedLength) { +std::unique_ptr SnappyCodec::doUncompress( + const IOBuf* data, + Optional uncompressedLength) { uint32_t actualUncompressedLength = 0; { @@ -421,8 +967,7 @@ std::unique_ptr SnappyCodec::doUncompress(const IOBuf* data, if (!snappy::GetUncompressedLength(&source, &actualUncompressedLength)) { throw std::runtime_error("snappy::GetUncompressedLength failed"); } - if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH && - uncompressedLength != actualUncompressedLength) { + if (uncompressedLength && *uncompressedLength != actualUncompressedLength) { throw std::runtime_error("snappy: invalid uncompressed length"); } } @@ -447,241 +992,295 @@ std::unique_ptr SnappyCodec::doUncompress(const IOBuf* data, /** * Zlib codec */ -class ZlibCodec final : public Codec { +class ZlibStreamCodec final : public StreamCodec { public: - static std::unique_ptr create(int level, CodecType type); - explicit ZlibCodec(int level, CodecType type); - - private: - std::unique_ptr doCompress(const IOBuf* data) override; - std::unique_ptr doUncompress( - const IOBuf* data, - uint64_t uncompressedLength) override; + static std::unique_ptr createCodec(int level, CodecType type); + static std::unique_ptr createStream(int level, CodecType type); + explicit ZlibStreamCodec(int level, CodecType type); + ~ZlibStreamCodec() override; - std::unique_ptr addOutputBuffer(z_stream* stream, uint32_t length); - bool doInflate(z_stream* stream, IOBuf* head, uint32_t bufferLength); + std::vector validPrefixes() const override; + bool canUncompress(const IOBuf* data, Optional uncompressedLength) + const override; + private: + uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override; + + void doResetStream() override; + bool doCompressStream( + ByteRange& input, + MutableByteRange& output, + StreamCodec::FlushOp flush) override; + bool doUncompressStream( + ByteRange& input, + MutableByteRange& output, + StreamCodec::FlushOp flush) override; + + void resetDeflateStream(); + void resetInflateStream(); + + Optional deflateStream_{}; + Optional inflateStream_{}; int level_; + bool needReset_{true}; }; -std::unique_ptr ZlibCodec::create(int level, CodecType type) { - return make_unique(level, type); +static constexpr uint16_t kGZIPMagicLE = 0x8B1F; + +std::vector ZlibStreamCodec::validPrefixes() const { + if (type() == CodecType::ZLIB) { + // Zlib streams start with a 2 byte header. + // + // 0 1 + // +---+---+ + // |CMF|FLG| + // +---+---+ + // + // We won't restrict the values of any sub-fields except as described below. + // + // The lowest 4 bits of CMF is the compression method (CM). + // CM == 0x8 is the deflate compression method, which is currently the only + // supported compression method, so any valid prefix must have CM == 0x8. + // + // The lowest 5 bits of FLG is FCHECK. + // FCHECK must be such that the two header bytes are a multiple of 31 when + // interpreted as a big endian 16-bit number. + std::vector result; + // 16 values for the first byte, 8 values for the second byte. + // There are also 4 combinations where both 0x00 and 0x1F work as FCHECK. + result.reserve(132); + // Select all values for the CMF byte that use the deflate algorithm 0x8. + for (uint32_t first = 0x0800; first <= 0xF800; first += 0x1000) { + // Select all values for the FLG, but leave FCHECK as 0 since it's fixed. + for (uint32_t second = 0x00; second <= 0xE0; second += 0x20) { + uint16_t prefix = first | second; + // Compute FCHECK. + prefix += 31 - (prefix % 31); + result.push_back(prefixToStringLE(Endian::big(prefix))); + // zlib won't produce this, but it is a valid prefix. + if ((prefix & 0x1F) == 31) { + prefix -= 31; + result.push_back(prefixToStringLE(Endian::big(prefix))); + } + } + } + return result; + } else { + // The gzip frame starts with 2 magic bytes. + return {prefixToStringLE(kGZIPMagicLE)}; + } +} + +bool ZlibStreamCodec::canUncompress(const IOBuf* data, Optional) + const { + if (type() == CodecType::ZLIB) { + uint16_t value; + Cursor cursor{data}; + if (!cursor.tryReadBE(value)) { + return false; + } + // zlib compressed if using deflate and is a multiple of 31. + return (value & 0x0F00) == 0x0800 && value % 31 == 0; + } else { + return dataStartsWithLE(data, kGZIPMagicLE); + } +} + +uint64_t ZlibStreamCodec::doMaxCompressedLength( + uint64_t uncompressedLength) const { + return deflateBound(nullptr, uncompressedLength); +} + +std::unique_ptr ZlibStreamCodec::createCodec(int level, CodecType type) { + return std::make_unique(level, type); +} + +std::unique_ptr ZlibStreamCodec::createStream( + int level, + CodecType type) { + return std::make_unique(level, type); } -ZlibCodec::ZlibCodec(int level, CodecType type) : Codec(type) { - DCHECK(type == CodecType::ZLIB); +ZlibStreamCodec::ZlibStreamCodec(int level, CodecType type) + : StreamCodec(type) { + DCHECK(type == CodecType::ZLIB || type == CodecType::GZIP); switch (level) { - case COMPRESSION_LEVEL_FASTEST: - level = 1; - break; - case COMPRESSION_LEVEL_DEFAULT: - level = Z_DEFAULT_COMPRESSION; - break; - case COMPRESSION_LEVEL_BEST: - level = 9; - break; + case COMPRESSION_LEVEL_FASTEST: + level = 1; + break; + case COMPRESSION_LEVEL_DEFAULT: + level = Z_DEFAULT_COMPRESSION; + break; + case COMPRESSION_LEVEL_BEST: + level = 9; + break; } if (level != Z_DEFAULT_COMPRESSION && (level < 0 || level > 9)) { - throw std::invalid_argument(to( - "ZlibCodec: invalid level: ", level)); + throw std::invalid_argument( + to("ZlibStreamCodec: invalid level: ", level)); } level_ = level; } -std::unique_ptr ZlibCodec::addOutputBuffer(z_stream* stream, - uint32_t length) { - CHECK_EQ(stream->avail_out, 0); - - auto buf = IOBuf::create(length); - buf->append(length); - - stream->next_out = buf->writableData(); - stream->avail_out = buf->length(); +ZlibStreamCodec::~ZlibStreamCodec() { + if (deflateStream_) { + deflateEnd(deflateStream_.get_pointer()); + deflateStream_.clear(); + } + if (inflateStream_) { + inflateEnd(inflateStream_.get_pointer()); + inflateStream_.clear(); + } +} - return buf; +void ZlibStreamCodec::doResetStream() { + needReset_ = true; } -bool ZlibCodec::doInflate(z_stream* stream, - IOBuf* head, - uint32_t bufferLength) { - if (stream->avail_out == 0) { - head->prependChain(addOutputBuffer(stream, bufferLength)); +void ZlibStreamCodec::resetDeflateStream() { + if (deflateStream_) { + int const rc = deflateReset(deflateStream_.get_pointer()); + if (rc != Z_OK) { + deflateStream_.clear(); + throw std::runtime_error( + to("ZlibStreamCodec: deflateReset error: ", rc)); + } + return; } - - int rc = inflate(stream, Z_NO_FLUSH); - - switch (rc) { - case Z_OK: - break; - case Z_STREAM_END: - return true; - case Z_BUF_ERROR: - case Z_NEED_DICT: - case Z_DATA_ERROR: - case Z_MEM_ERROR: - throw std::runtime_error(to( - "ZlibCodec: inflate error: ", rc, ": ", stream->msg)); - default: - CHECK(false) << rc << ": " << stream->msg; + deflateStream_ = z_stream{}; + // Using deflateInit2() to support gzip. "The windowBits parameter is the + // base two logarithm of the maximum window size (...) The default value is + // 15 (...) Add 16 to windowBits to write a simple gzip header and trailer + // around the compressed data instead of a zlib wrapper. The gzip header + // will have no file name, no extra data, no comment, no modification time + // (set to zero), no header crc, and the operating system will be set to 255 + // (unknown)." + int const windowBits = 15 + (type() == CodecType::GZIP ? 16 : 0); + // All other parameters (method, memLevel, strategy) get default values from + // the zlib manual. + int const rc = deflateInit2( + deflateStream_.get_pointer(), + level_, + Z_DEFLATED, + windowBits, + /* memLevel */ 8, + Z_DEFAULT_STRATEGY); + if (rc != Z_OK) { + deflateStream_.clear(); + throw std::runtime_error( + to("ZlibStreamCodec: deflateInit error: ", rc)); } - - return false; } -std::unique_ptr ZlibCodec::doCompress(const IOBuf* data) { - z_stream stream; - stream.zalloc = nullptr; - stream.zfree = nullptr; - stream.opaque = nullptr; - - int rc = deflateInit(&stream, level_); +void ZlibStreamCodec::resetInflateStream() { + if (inflateStream_) { + int const rc = inflateReset(inflateStream_.get_pointer()); + if (rc != Z_OK) { + inflateStream_.clear(); + throw std::runtime_error( + to("ZlibStreamCodec: inflateReset error: ", rc)); + } + return; + } + inflateStream_ = z_stream{}; + // "The windowBits parameter is the base two logarithm of the maximum window + // size (...) The default value is 15 (...) add 16 to decode only the gzip + // format (the zlib format will return a Z_DATA_ERROR)." + int const windowBits = 15 + (type() == CodecType::GZIP ? 16 : 0); + int const rc = inflateInit2(inflateStream_.get_pointer(), windowBits); if (rc != Z_OK) { - throw std::runtime_error(to( - "ZlibCodec: deflateInit error: ", rc, ": ", stream.msg)); + inflateStream_.clear(); + throw std::runtime_error( + to("ZlibStreamCodec: inflateInit error: ", rc)); } +} - stream.next_in = stream.next_out = nullptr; - stream.avail_in = stream.avail_out = 0; - stream.total_in = stream.total_out = 0; +static int zlibTranslateFlush(StreamCodec::FlushOp flush) { + switch (flush) { + case StreamCodec::FlushOp::NONE: + return Z_NO_FLUSH; + case StreamCodec::FlushOp::FLUSH: + return Z_SYNC_FLUSH; + case StreamCodec::FlushOp::END: + return Z_FINISH; + default: + throw std::invalid_argument("ZlibStreamCodec: Invalid flush"); + } +} - bool success = false; +static int zlibThrowOnError(int rc) { + switch (rc) { + case Z_OK: + case Z_BUF_ERROR: + case Z_STREAM_END: + return rc; + default: + throw std::runtime_error(to("ZlibStreamCodec: error: ", rc)); + } +} +bool ZlibStreamCodec::doCompressStream( + ByteRange& input, + MutableByteRange& output, + StreamCodec::FlushOp flush) { + if (needReset_) { + resetDeflateStream(); + needReset_ = false; + } + DCHECK(deflateStream_.hasValue()); + // zlib will return Z_STREAM_ERROR if output.data() is null. + if (output.data() == nullptr) { + return false; + } + deflateStream_->next_in = const_cast(input.data()); + deflateStream_->avail_in = input.size(); + deflateStream_->next_out = output.data(); + deflateStream_->avail_out = output.size(); SCOPE_EXIT { - int rc = deflateEnd(&stream); - // If we're here because of an exception, it's okay if some data - // got dropped. - CHECK(rc == Z_OK || (!success && rc == Z_DATA_ERROR)) - << rc << ": " << stream.msg; + input.uncheckedAdvance(input.size() - deflateStream_->avail_in); + output.uncheckedAdvance(output.size() - deflateStream_->avail_out); }; + int const rc = zlibThrowOnError( + deflate(deflateStream_.get_pointer(), zlibTranslateFlush(flush))); + switch (flush) { + case StreamCodec::FlushOp::NONE: + return false; + case StreamCodec::FlushOp::FLUSH: + return deflateStream_->avail_in == 0 && deflateStream_->avail_out != 0; + case StreamCodec::FlushOp::END: + return rc == Z_STREAM_END; + default: + throw std::invalid_argument("ZlibStreamCodec: Invalid flush"); + } +} - uint64_t uncompressedLength = data->computeChainDataLength(); - uint64_t maxCompressedLength = deflateBound(&stream, uncompressedLength); - - // Max 64MiB in one go - constexpr uint32_t maxSingleStepLength = uint32_t(64) << 20; // 64MiB - constexpr uint32_t defaultBufferLength = uint32_t(4) << 20; // 4MiB - - auto out = addOutputBuffer( - &stream, - (maxCompressedLength <= maxSingleStepLength ? - maxCompressedLength : - defaultBufferLength)); - - for (auto& range : *data) { - uint64_t remaining = range.size(); - uint64_t written = 0; - while (remaining) { - uint32_t step = (remaining > maxSingleStepLength ? - maxSingleStepLength : remaining); - stream.next_in = const_cast(range.data() + written); - stream.avail_in = step; - remaining -= step; - written += step; - - while (stream.avail_in != 0) { - if (stream.avail_out == 0) { - out->prependChain(addOutputBuffer(&stream, defaultBufferLength)); - } - - rc = deflate(&stream, Z_NO_FLUSH); - - CHECK_EQ(rc, Z_OK) << stream.msg; - } - } +bool ZlibStreamCodec::doUncompressStream( + ByteRange& input, + MutableByteRange& output, + StreamCodec::FlushOp flush) { + if (needReset_) { + resetInflateStream(); + needReset_ = false; } - - do { - if (stream.avail_out == 0) { - out->prependChain(addOutputBuffer(&stream, defaultBufferLength)); - } - - rc = deflate(&stream, Z_FINISH); - } while (rc == Z_OK); - - CHECK_EQ(rc, Z_STREAM_END) << stream.msg; - - out->prev()->trimEnd(stream.avail_out); - - success = true; // we survived - - return out; -} - -std::unique_ptr ZlibCodec::doUncompress(const IOBuf* data, - uint64_t uncompressedLength) { - z_stream stream; - stream.zalloc = nullptr; - stream.zfree = nullptr; - stream.opaque = nullptr; - - int rc = inflateInit(&stream); - if (rc != Z_OK) { - throw std::runtime_error(to( - "ZlibCodec: inflateInit error: ", rc, ": ", stream.msg)); + DCHECK(inflateStream_.hasValue()); + // zlib will return Z_STREAM_ERROR if output.data() is null. + if (output.data() == nullptr) { + return false; } - - stream.next_in = stream.next_out = nullptr; - stream.avail_in = stream.avail_out = 0; - stream.total_in = stream.total_out = 0; - - bool success = false; - + inflateStream_->next_in = const_cast(input.data()); + inflateStream_->avail_in = input.size(); + inflateStream_->next_out = output.data(); + inflateStream_->avail_out = output.size(); SCOPE_EXIT { - int rc = inflateEnd(&stream); - // If we're here because of an exception, it's okay if some data - // got dropped. - CHECK(rc == Z_OK || (!success && rc == Z_DATA_ERROR)) - << rc << ": " << stream.msg; + input.advance(input.size() - inflateStream_->avail_in); + output.advance(output.size() - inflateStream_->avail_out); }; - - // Max 64MiB in one go - constexpr uint32_t maxSingleStepLength = uint32_t(64) << 20; // 64MiB - constexpr uint32_t defaultBufferLength = uint32_t(4) << 20; // 4MiB - - auto out = addOutputBuffer( - &stream, - ((uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH && - uncompressedLength <= maxSingleStepLength) ? - uncompressedLength : - defaultBufferLength)); - - bool streamEnd = false; - for (auto& range : *data) { - if (range.empty()) { - continue; - } - - stream.next_in = const_cast(range.data()); - stream.avail_in = range.size(); - - while (stream.avail_in != 0) { - if (streamEnd) { - throw std::runtime_error(to( - "ZlibCodec: junk after end of data")); - } - - streamEnd = doInflate(&stream, out.get(), defaultBufferLength); - } - } - - while (!streamEnd) { - streamEnd = doInflate(&stream, out.get(), defaultBufferLength); - } - - out->prev()->trimEnd(stream.avail_out); - - if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH && - uncompressedLength != stream.total_out) { - throw std::runtime_error(to( - "ZlibCodec: invalid uncompressed length")); - } - - success = true; // we survived - - return out; + int const rc = zlibThrowOnError( + inflate(inflateStream_.get_pointer(), zlibTranslateFlush(flush))); + return rc == Z_STREAM_END; } -#endif // FOLLY_HAVE_LIBZ +#endif // FOLLY_HAVE_LIBZ #if FOLLY_HAVE_LIBLZMA @@ -693,16 +1292,21 @@ class LZMA2Codec final : public Codec { static std::unique_ptr create(int level, CodecType type); explicit LZMA2Codec(int level, CodecType type); + std::vector validPrefixes() const override; + bool canUncompress(const IOBuf* data, Optional uncompressedLength) + const override; + private: bool doNeedsUncompressedLength() const override; uint64_t doMaxUncompressedLength() const override; + uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override; bool encodeSize() const { return type() == CodecType::LZMA2_VARINT_SIZE; } std::unique_ptr doCompress(const IOBuf* data) override; std::unique_ptr doUncompress( const IOBuf* data, - uint64_t uncompressedLength) override; + Optional uncompressedLength) override; std::unique_ptr addOutputBuffer(lzma_stream* stream, size_t length); bool doInflate(lzma_stream* stream, IOBuf* head, size_t bufferLength); @@ -710,8 +1314,27 @@ class LZMA2Codec final : public Codec { int level_; }; +static constexpr uint64_t kLZMA2MagicLE = 0x005A587A37FD; +static constexpr unsigned kLZMA2MagicBytes = 6; + +std::vector LZMA2Codec::validPrefixes() const { + if (type() == CodecType::LZMA2_VARINT_SIZE) { + return {}; + } + return {prefixToStringLE(kLZMA2MagicLE, kLZMA2MagicBytes)}; +} + +bool LZMA2Codec::canUncompress(const IOBuf* data, Optional) const { + if (type() == CodecType::LZMA2_VARINT_SIZE) { + return false; + } + // Returns false for all inputs less than 8 bytes. + // This is okay, because no valid LZMA2 streams are less than 8 bytes. + return dataStartsWithLE(data, kLZMA2MagicLE, kLZMA2MagicBytes); +} + std::unique_ptr LZMA2Codec::create(int level, CodecType type) { - return make_unique(level, type); + return std::make_unique(level, type); } LZMA2Codec::LZMA2Codec(int level, CodecType type) : Codec(type) { @@ -735,7 +1358,7 @@ LZMA2Codec::LZMA2Codec(int level, CodecType type) : Codec(type) { } bool LZMA2Codec::doNeedsUncompressedLength() const { - return !encodeSize(); + return false; } uint64_t LZMA2Codec::doMaxUncompressedLength() const { @@ -743,6 +1366,11 @@ uint64_t LZMA2Codec::doMaxUncompressedLength() const { return uint64_t(1) << 63; } +uint64_t LZMA2Codec::doMaxCompressedLength(uint64_t uncompressedLength) const { + return lzma_stream_buffer_bound(uncompressedLength) + + (encodeSize() ? kMaxVarintLength64 : 0); +} + std::unique_ptr LZMA2Codec::addOutputBuffer( lzma_stream* stream, size_t length) { @@ -750,7 +1378,7 @@ std::unique_ptr LZMA2Codec::addOutputBuffer( CHECK_EQ(stream->avail_out, 0); auto buf = IOBuf::create(length); - buf->append(length); + buf->append(buf->capacity()); stream->next_out = buf->writableData(); stream->avail_out = buf->length(); @@ -852,8 +1480,9 @@ bool LZMA2Codec::doInflate(lzma_stream* stream, return false; } -std::unique_ptr LZMA2Codec::doUncompress(const IOBuf* data, - uint64_t uncompressedLength) { +std::unique_ptr LZMA2Codec::doUncompress( + const IOBuf* data, + Optional uncompressedLength) { lzma_ret rc; lzma_stream stream = LZMA_STREAM_INIT; @@ -866,33 +1495,29 @@ std::unique_ptr LZMA2Codec::doUncompress(const IOBuf* data, SCOPE_EXIT { lzma_end(&stream); }; // Max 64MiB in one go - constexpr uint32_t maxSingleStepLength = uint32_t(64) << 20; // 64MiB - constexpr uint32_t defaultBufferLength = uint32_t(4) << 20; // 4MiB + constexpr uint32_t maxSingleStepLength = uint32_t(64) << 20; // 64MiB + constexpr uint32_t defaultBufferLength = uint32_t(256) << 10; // 256 KiB folly::io::Cursor cursor(data); - uint64_t actualUncompressedLength; if (encodeSize()) { - actualUncompressedLength = decodeVarintFromCursor(cursor); - if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH && - uncompressedLength != actualUncompressedLength) { + const uint64_t actualUncompressedLength = decodeVarintFromCursor(cursor); + if (uncompressedLength && *uncompressedLength != actualUncompressedLength) { throw std::runtime_error("LZMA2Codec: invalid uncompressed length"); } - } else { - actualUncompressedLength = uncompressedLength; - DCHECK_NE(actualUncompressedLength, UNKNOWN_UNCOMPRESSED_LENGTH); + uncompressedLength = actualUncompressedLength; } auto out = addOutputBuffer( &stream, - (actualUncompressedLength <= maxSingleStepLength ? - actualUncompressedLength : - defaultBufferLength)); + ((uncompressedLength && *uncompressedLength <= maxSingleStepLength) + ? *uncompressedLength + : defaultBufferLength)); bool streamEnd = false; - auto buf = cursor.peek(); - while (buf.second != 0) { - stream.next_in = const_cast(buf.first); - stream.avail_in = buf.second; + auto buf = cursor.peekBytes(); + while (!buf.empty()) { + stream.next_in = const_cast(buf.data()); + stream.avail_in = buf.size(); while (stream.avail_in != 0) { if (streamEnd) { @@ -903,8 +1528,8 @@ std::unique_ptr LZMA2Codec::doUncompress(const IOBuf* data, streamEnd = doInflate(&stream, out.get(), defaultBufferLength); } - cursor.skip(buf.second); - buf = cursor.peek(); + cursor.skip(buf.size()); + buf = cursor.peekBytes(); } while (!streamEnd) { @@ -913,9 +1538,9 @@ std::unique_ptr LZMA2Codec::doUncompress(const IOBuf* data, out->prev()->trimEnd(stream.avail_out); - if (actualUncompressedLength != stream.total_out) { - throw std::runtime_error(to( - "LZMA2Codec: invalid uncompressed length")); + if (uncompressedLength && *uncompressedLength != stream.total_out) { + throw std::runtime_error( + to("LZMA2Codec: invalid uncompressed length")); } return out; @@ -925,143 +1550,756 @@ std::unique_ptr LZMA2Codec::doUncompress(const IOBuf* data, #ifdef FOLLY_HAVE_LIBZSTD +namespace { +void zstdFreeCStream(ZSTD_CStream* zcs) { + ZSTD_freeCStream(zcs); +} + +void zstdFreeDStream(ZSTD_DStream* zds) { + ZSTD_freeDStream(zds); +} +} + /** - * ZSTD_BETA compression + * ZSTD compression */ -class ZSTDCodec final : public Codec { +class ZSTDStreamCodec final : public StreamCodec { public: - static std::unique_ptr create(int level, CodecType); - explicit ZSTDCodec(int level, CodecType type); + static std::unique_ptr createCodec(int level, CodecType); + static std::unique_ptr createStream(int level, CodecType); + explicit ZSTDStreamCodec(int level, CodecType type); + + std::vector validPrefixes() const override; + bool canUncompress(const IOBuf* data, Optional uncompressedLength) + const override; private: bool doNeedsUncompressedLength() const override; - std::unique_ptr doCompress(const IOBuf* data) override; - std::unique_ptr doUncompress( - const IOBuf* data, - uint64_t uncompressedLength) override; + uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override; + Optional doGetUncompressedLength( + IOBuf const* data, + Optional uncompressedLength) const override; + + void doResetStream() override; + bool doCompressStream( + ByteRange& input, + MutableByteRange& output, + StreamCodec::FlushOp flushOp) override; + bool doUncompressStream( + ByteRange& input, + MutableByteRange& output, + StreamCodec::FlushOp flushOp) override; + + void resetCStream(); + void resetDStream(); + + bool tryBlockCompress(ByteRange& input, MutableByteRange& output) const; + bool tryBlockUncompress(ByteRange& input, MutableByteRange& output) const; + + int level_; + bool needReset_{true}; + std::unique_ptr< + ZSTD_CStream, + folly::static_function_deleter> + cstream_{nullptr}; + std::unique_ptr< + ZSTD_DStream, + folly::static_function_deleter> + dstream_{nullptr}; }; -std::unique_ptr ZSTDCodec::create(int level, CodecType type) { - return make_unique(level, type); +static constexpr uint32_t kZSTDMagicLE = 0xFD2FB528; + +std::vector ZSTDStreamCodec::validPrefixes() const { + return {prefixToStringLE(kZSTDMagicLE)}; +} + +bool ZSTDStreamCodec::canUncompress(const IOBuf* data, Optional) + const { + return dataStartsWithLE(data, kZSTDMagicLE); +} + +std::unique_ptr ZSTDStreamCodec::createCodec(int level, CodecType type) { + return make_unique(level, type); +} + +std::unique_ptr ZSTDStreamCodec::createStream( + int level, + CodecType type) { + return make_unique(level, type); } -ZSTDCodec::ZSTDCodec(int level, CodecType type) : Codec(type) { - DCHECK(type == CodecType::ZSTD_BETA); +ZSTDStreamCodec::ZSTDStreamCodec(int level, CodecType type) + : StreamCodec(type) { + DCHECK(type == CodecType::ZSTD); + switch (level) { + case COMPRESSION_LEVEL_FASTEST: + level = 1; + break; + case COMPRESSION_LEVEL_DEFAULT: + level = 1; + break; + case COMPRESSION_LEVEL_BEST: + level = 19; + break; + } + if (level < 1 || level > ZSTD_maxCLevel()) { + throw std::invalid_argument( + to("ZSTD: invalid level: ", level)); + } + level_ = level; } -bool ZSTDCodec::doNeedsUncompressedLength() const { +bool ZSTDStreamCodec::doNeedsUncompressedLength() const { + return false; +} + +uint64_t ZSTDStreamCodec::doMaxCompressedLength( + uint64_t uncompressedLength) const { + return ZSTD_compressBound(uncompressedLength); +} + +void zstdThrowIfError(size_t rc) { + if (!ZSTD_isError(rc)) { + return; + } + throw std::runtime_error( + to("ZSTD returned an error: ", ZSTD_getErrorName(rc))); +} + +Optional ZSTDStreamCodec::doGetUncompressedLength( + IOBuf const* data, + Optional uncompressedLength) const { + // Read decompressed size from frame if available in first IOBuf. + auto const decompressedSize = + ZSTD_getDecompressedSize(data->data(), data->length()); + if (decompressedSize != 0) { + if (uncompressedLength && *uncompressedLength != decompressedSize) { + throw std::runtime_error("ZSTD: invalid uncompressed length"); + } + uncompressedLength = decompressedSize; + } + return uncompressedLength; +} + +void ZSTDStreamCodec::doResetStream() { + needReset_ = true; +} + +bool ZSTDStreamCodec::tryBlockCompress( + ByteRange& input, + MutableByteRange& output) const { + DCHECK(needReset_); + // We need to know that we have enough output space to use block compression + if (output.size() < ZSTD_compressBound(input.size())) { + return false; + } + size_t const length = ZSTD_compress( + output.data(), output.size(), input.data(), input.size(), level_); + zstdThrowIfError(length); + input.uncheckedAdvance(input.size()); + output.uncheckedAdvance(length); return true; } -std::unique_ptr ZSTDCodec::doCompress(const IOBuf* data) { - size_t rc; - size_t maxCompressedLength = ZSTD_compressBound(data->length()); - auto out = IOBuf::createCombined(maxCompressedLength); +void ZSTDStreamCodec::resetCStream() { + if (!cstream_) { + cstream_.reset(ZSTD_createCStream()); + if (!cstream_) { + throw std::bad_alloc{}; + } + } + // Advanced API usage works for all supported versions of zstd. + // Required to set contentSizeFlag. + auto params = ZSTD_getParams(level_, uncompressedLength().value_or(0), 0); + params.fParams.contentSizeFlag = uncompressedLength().hasValue(); + zstdThrowIfError(ZSTD_initCStream_advanced( + cstream_.get(), nullptr, 0, params, uncompressedLength().value_or(0))); +} - CHECK_EQ(out->length(), 0); +bool ZSTDStreamCodec::doCompressStream( + ByteRange& input, + MutableByteRange& output, + StreamCodec::FlushOp flushOp) { + if (needReset_) { + // If we are given all the input in one chunk try to use block compression + if (flushOp == StreamCodec::FlushOp::END && + tryBlockCompress(input, output)) { + return true; + } + resetCStream(); + needReset_ = false; + } + ZSTD_inBuffer in = {input.data(), input.size(), 0}; + ZSTD_outBuffer out = {output.data(), output.size(), 0}; + SCOPE_EXIT { + input.uncheckedAdvance(in.pos); + output.uncheckedAdvance(out.pos); + }; + if (flushOp == StreamCodec::FlushOp::NONE || !input.empty()) { + zstdThrowIfError(ZSTD_compressStream(cstream_.get(), &out, &in)); + } + if (in.pos == in.size && flushOp != StreamCodec::FlushOp::NONE) { + size_t rc; + switch (flushOp) { + case StreamCodec::FlushOp::FLUSH: + rc = ZSTD_flushStream(cstream_.get(), &out); + break; + case StreamCodec::FlushOp::END: + rc = ZSTD_endStream(cstream_.get(), &out); + break; + default: + throw std::invalid_argument("ZSTD: invalid FlushOp"); + } + zstdThrowIfError(rc); + if (rc == 0) { + return true; + } + } + return false; +} - rc = ZSTD_compress( - out->writableTail(), out->capacity(), data->data(), data->length()); +bool ZSTDStreamCodec::tryBlockUncompress( + ByteRange& input, + MutableByteRange& output) const { + DCHECK(needReset_); +#if ZSTD_VERSION_NUMBER < 10104 + // We require ZSTD_findFrameCompressedSize() to perform this optimization. + return false; +#else + // We need to know the uncompressed length and have enough output space. + if (!uncompressedLength() || output.size() < *uncompressedLength()) { + return false; + } + size_t const compressedLength = + ZSTD_findFrameCompressedSize(input.data(), input.size()); + zstdThrowIfError(compressedLength); + size_t const length = ZSTD_decompress( + output.data(), *uncompressedLength(), input.data(), compressedLength); + zstdThrowIfError(length); + DCHECK_EQ(length, *uncompressedLength()); + input.uncheckedAdvance(compressedLength); + output.uncheckedAdvance(length); + return true; +#endif +} - if (ZSTD_isError(rc)) { - throw std::runtime_error(to( - "ZSTD compression returned an error: ", - ZSTD_getErrorName(rc))); +void ZSTDStreamCodec::resetDStream() { + if (!dstream_) { + dstream_.reset(ZSTD_createDStream()); + if (!dstream_) { + throw std::bad_alloc{}; + } } + zstdThrowIfError(ZSTD_initDStream(dstream_.get())); +} - out->append(rc); - CHECK_EQ(out->length(), rc); +bool ZSTDStreamCodec::doUncompressStream( + ByteRange& input, + MutableByteRange& output, + StreamCodec::FlushOp flushOp) { + if (needReset_) { + // If we are given all the input in one chunk try to use block uncompression + if (flushOp == StreamCodec::FlushOp::END && + tryBlockUncompress(input, output)) { + return true; + } + resetDStream(); + needReset_ = false; + } + ZSTD_inBuffer in = {input.data(), input.size(), 0}; + ZSTD_outBuffer out = {output.data(), output.size(), 0}; + SCOPE_EXIT { + input.uncheckedAdvance(in.pos); + output.uncheckedAdvance(out.pos); + }; + size_t const rc = ZSTD_decompressStream(dstream_.get(), &out, &in); + zstdThrowIfError(rc); + return rc == 0; +} + +#endif // FOLLY_HAVE_LIBZSTD + +#if FOLLY_HAVE_LIBBZ2 + +class Bzip2Codec final : public Codec { + public: + static std::unique_ptr create(int level, CodecType type); + explicit Bzip2Codec(int level, CodecType type); + + std::vector validPrefixes() const override; + bool canUncompress(IOBuf const* data, Optional uncompressedLength) + const override; + + private: + uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override; + std::unique_ptr doCompress(IOBuf const* data) override; + std::unique_ptr doUncompress( + IOBuf const* data, + Optional uncompressedLength) override; + + int level_; +}; + +/* static */ std::unique_ptr Bzip2Codec::create( + int level, + CodecType type) { + return std::make_unique(level, type); +} + +Bzip2Codec::Bzip2Codec(int level, CodecType type) : Codec(type) { + DCHECK(type == CodecType::BZIP2); + switch (level) { + case COMPRESSION_LEVEL_FASTEST: + level = 1; + break; + case COMPRESSION_LEVEL_DEFAULT: + level = 9; + break; + case COMPRESSION_LEVEL_BEST: + level = 9; + break; + } + if (level < 1 || level > 9) { + throw std::invalid_argument( + to("Bzip2: invalid level: ", level)); + } + level_ = level; +} + +static uint32_t constexpr kBzip2MagicLE = 0x685a42; +static uint64_t constexpr kBzip2MagicBytes = 3; + +std::vector Bzip2Codec::validPrefixes() const { + return {prefixToStringLE(kBzip2MagicLE, kBzip2MagicBytes)}; +} + +bool Bzip2Codec::canUncompress(IOBuf const* data, Optional) const { + return dataStartsWithLE(data, kBzip2MagicLE, kBzip2MagicBytes); +} + +uint64_t Bzip2Codec::doMaxCompressedLength(uint64_t uncompressedLength) const { + // http://www.bzip.org/1.0.5/bzip2-manual-1.0.5.html#bzbufftobuffcompress + // To guarantee that the compressed data will fit in its buffer, allocate an + // output buffer of size 1% larger than the uncompressed data, plus six + // hundred extra bytes. + return uncompressedLength + uncompressedLength / 100 + 600; +} + +static bz_stream createBzStream() { + bz_stream stream; + stream.bzalloc = nullptr; + stream.bzfree = nullptr; + stream.opaque = nullptr; + stream.next_in = stream.next_out = nullptr; + stream.avail_in = stream.avail_out = 0; + return stream; +} + +// Throws on error condition, otherwise returns the code. +static int bzCheck(int const rc) { + switch (rc) { + case BZ_OK: + case BZ_RUN_OK: + case BZ_FLUSH_OK: + case BZ_FINISH_OK: + case BZ_STREAM_END: + return rc; + default: + throw std::runtime_error(to("Bzip2 error: ", rc)); + } +} + +static std::unique_ptr addOutputBuffer( + bz_stream* stream, + uint64_t const bufferLength) { + DCHECK_LE(bufferLength, std::numeric_limits::max()); + DCHECK_EQ(stream->avail_out, 0); + + auto buf = IOBuf::create(bufferLength); + buf->append(buf->capacity()); + + stream->next_out = reinterpret_cast(buf->writableData()); + stream->avail_out = buf->length(); + + return buf; +} + +std::unique_ptr Bzip2Codec::doCompress(IOBuf const* data) { + bz_stream stream = createBzStream(); + bzCheck(BZ2_bzCompressInit(&stream, level_, 0, 0)); + SCOPE_EXIT { + bzCheck(BZ2_bzCompressEnd(&stream)); + }; + + uint64_t const uncompressedLength = data->computeChainDataLength(); + uint64_t const maxCompressedLen = maxCompressedLength(uncompressedLength); + uint64_t constexpr kMaxSingleStepLength = uint64_t(64) << 20; // 64 MiB + uint64_t constexpr kDefaultBufferLength = uint64_t(4) << 20; + + auto out = addOutputBuffer( + &stream, + maxCompressedLen <= kMaxSingleStepLength ? maxCompressedLen + : kDefaultBufferLength); + + for (auto range : *data) { + while (!range.empty()) { + auto const inSize = std::min(range.size(), kMaxSingleStepLength); + stream.next_in = + const_cast(reinterpret_cast(range.data())); + stream.avail_in = inSize; + + if (stream.avail_out == 0) { + out->prependChain(addOutputBuffer(&stream, kDefaultBufferLength)); + } + + bzCheck(BZ2_bzCompress(&stream, BZ_RUN)); + range.uncheckedAdvance(inSize - stream.avail_in); + } + } + do { + if (stream.avail_out == 0) { + out->prependChain(addOutputBuffer(&stream, kDefaultBufferLength)); + } + } while (bzCheck(BZ2_bzCompress(&stream, BZ_FINISH)) != BZ_STREAM_END); + + out->prev()->trimEnd(stream.avail_out); return out; } -std::unique_ptr ZSTDCodec::doUncompress(const IOBuf* data, - uint64_t uncompressedLength) { - size_t rc; - auto out = IOBuf::createCombined(uncompressedLength); +std::unique_ptr Bzip2Codec::doUncompress( + const IOBuf* data, + Optional uncompressedLength) { + bz_stream stream = createBzStream(); + bzCheck(BZ2_bzDecompressInit(&stream, 0, 0)); + SCOPE_EXIT { + bzCheck(BZ2_bzDecompressEnd(&stream)); + }; - CHECK_GE(out->capacity(), uncompressedLength); - CHECK_EQ(out->length(), 0); + uint64_t constexpr kMaxSingleStepLength = uint64_t(64) << 20; // 64 MiB + uint64_t const kBlockSize = uint64_t(100) << 10; // 100 KiB + uint64_t const kDefaultBufferLength = + computeBufferLength(data->computeChainDataLength(), kBlockSize); - rc = ZSTD_decompress( - out->writableTail(), out->capacity(), data->data(), data->length()); + auto out = addOutputBuffer( + &stream, + ((uncompressedLength && *uncompressedLength <= kMaxSingleStepLength) + ? *uncompressedLength + : kDefaultBufferLength)); + + int rc = BZ_OK; + for (auto range : *data) { + while (!range.empty()) { + auto const inSize = std::min(range.size(), kMaxSingleStepLength); + stream.next_in = + const_cast(reinterpret_cast(range.data())); + stream.avail_in = inSize; - if (ZSTD_isError(rc)) { - throw std::runtime_error(to( - "ZSTD decompression returned an error: ", - ZSTD_getErrorName(rc))); + if (stream.avail_out == 0) { + out->prependChain(addOutputBuffer(&stream, kDefaultBufferLength)); + } + + rc = bzCheck(BZ2_bzDecompress(&stream)); + range.uncheckedAdvance(inSize - stream.avail_in); + } + } + while (rc != BZ_STREAM_END) { + if (stream.avail_out == 0) { + out->prependChain(addOutputBuffer(&stream, kDefaultBufferLength)); + } + + rc = bzCheck(BZ2_bzDecompress(&stream)); } - out->append(rc); - CHECK_EQ(out->length(), rc); + out->prev()->trimEnd(stream.avail_out); + + uint64_t const totalOut = + (uint64_t(stream.total_out_hi32) << 32) + stream.total_out_lo32; + if (uncompressedLength && uncompressedLength != totalOut) { + throw std::runtime_error("Bzip2 error: Invalid uncompressed length"); + } return out; } -#endif // FOLLY_HAVE_LIBZSTD +#endif // FOLLY_HAVE_LIBBZ2 -} // namespace +/** + * Automatic decompression + */ +class AutomaticCodec final : public Codec { + public: + static std::unique_ptr create( + std::vector> customCodecs); + explicit AutomaticCodec(std::vector> customCodecs); -std::unique_ptr getCodec(CodecType type, int level) { - typedef std::unique_ptr (*CodecFactory)(int, CodecType); + std::vector validPrefixes() const override; + bool canUncompress(const IOBuf* data, Optional uncompressedLength) + const override; + + private: + bool doNeedsUncompressedLength() const override; + uint64_t doMaxUncompressedLength() const override; + + uint64_t doMaxCompressedLength(uint64_t) const override { + throw std::runtime_error( + "AutomaticCodec error: maxCompressedLength() not supported."); + } + std::unique_ptr doCompress(const IOBuf*) override { + throw std::runtime_error("AutomaticCodec error: compress() not supported."); + } + std::unique_ptr doUncompress( + const IOBuf* data, + Optional uncompressedLength) override; + + void addCodecIfSupported(CodecType type); + + // Throws iff the codecs aren't compatible (very slow) + void checkCompatibleCodecs() const; + + std::vector> codecs_; + bool needsUncompressedLength_; + uint64_t maxUncompressedLength_; +}; + +std::vector AutomaticCodec::validPrefixes() const { + std::unordered_set prefixes; + for (const auto& codec : codecs_) { + const auto codecPrefixes = codec->validPrefixes(); + prefixes.insert(codecPrefixes.begin(), codecPrefixes.end()); + } + return std::vector{prefixes.begin(), prefixes.end()}; +} + +bool AutomaticCodec::canUncompress( + const IOBuf* data, + Optional uncompressedLength) const { + return std::any_of( + codecs_.begin(), + codecs_.end(), + [data, uncompressedLength](std::unique_ptr const& codec) { + return codec->canUncompress(data, uncompressedLength); + }); +} + +void AutomaticCodec::addCodecIfSupported(CodecType type) { + const bool present = std::any_of( + codecs_.begin(), + codecs_.end(), + [&type](std::unique_ptr const& codec) { + return codec->type() == type; + }); + if (hasCodec(type) && !present) { + codecs_.push_back(getCodec(type)); + } +} + +/* static */ std::unique_ptr AutomaticCodec::create( + std::vector> customCodecs) { + return std::make_unique(std::move(customCodecs)); +} + +AutomaticCodec::AutomaticCodec(std::vector> customCodecs) + : Codec(CodecType::USER_DEFINED), codecs_(std::move(customCodecs)) { + // Fastest -> slowest + addCodecIfSupported(CodecType::LZ4_FRAME); + addCodecIfSupported(CodecType::ZSTD); + addCodecIfSupported(CodecType::ZLIB); + addCodecIfSupported(CodecType::GZIP); + addCodecIfSupported(CodecType::LZMA2); + addCodecIfSupported(CodecType::BZIP2); + if (kIsDebug) { + checkCompatibleCodecs(); + } + // Check that none of the codes are are null + DCHECK(std::none_of( + codecs_.begin(), codecs_.end(), [](std::unique_ptr const& codec) { + return codec == nullptr; + })); + + needsUncompressedLength_ = std::any_of( + codecs_.begin(), codecs_.end(), [](std::unique_ptr const& codec) { + return codec->needsUncompressedLength(); + }); + + const auto it = std::max_element( + codecs_.begin(), + codecs_.end(), + [](std::unique_ptr const& lhs, std::unique_ptr const& rhs) { + return lhs->maxUncompressedLength() < rhs->maxUncompressedLength(); + }); + DCHECK(it != codecs_.end()); + maxUncompressedLength_ = (*it)->maxUncompressedLength(); +} + +void AutomaticCodec::checkCompatibleCodecs() const { + // Keep track of all the possible headers. + std::unordered_set headers; + // The empty header is not allowed. + headers.insert(""); + // Step 1: + // Construct a set of headers and check that none of the headers occur twice. + // Eliminate edge cases. + for (auto&& codec : codecs_) { + const auto codecHeaders = codec->validPrefixes(); + // Codecs without any valid headers are not allowed. + if (codecHeaders.empty()) { + throw std::invalid_argument{ + "AutomaticCodec: validPrefixes() must not be empty."}; + } + // Insert all the headers for the current codec. + const size_t beforeSize = headers.size(); + headers.insert(codecHeaders.begin(), codecHeaders.end()); + // Codecs are not compatible if any header occurred twice. + if (beforeSize + codecHeaders.size() != headers.size()) { + throw std::invalid_argument{ + "AutomaticCodec: Two valid prefixes collide."}; + } + } + // Step 2: + // Check if any strict non-empty prefix of any header is a header. + for (const auto& header : headers) { + for (size_t i = 1; i < header.size(); ++i) { + if (headers.count(header.substr(0, i))) { + throw std::invalid_argument{ + "AutomaticCodec: One valid prefix is a prefix of another valid " + "prefix."}; + } + } + } +} + +bool AutomaticCodec::doNeedsUncompressedLength() const { + return needsUncompressedLength_; +} + +uint64_t AutomaticCodec::doMaxUncompressedLength() const { + return maxUncompressedLength_; +} + +std::unique_ptr AutomaticCodec::doUncompress( + const IOBuf* data, + Optional uncompressedLength) { + for (auto&& codec : codecs_) { + if (codec->canUncompress(data, uncompressedLength)) { + return codec->uncompress(data, uncompressedLength); + } + } + throw std::runtime_error("AutomaticCodec error: Unknown compressed data"); +} - static CodecFactory codecFactories[ - static_cast(CodecType::NUM_CODEC_TYPES)] = { - nullptr, // USER_DEFINED - NoCompressionCodec::create, +using CodecFactory = std::unique_ptr (*)(int, CodecType); +using StreamCodecFactory = std::unique_ptr (*)(int, CodecType); +struct Factory { + CodecFactory codec; + StreamCodecFactory stream; +}; + +constexpr Factory + codecFactories[static_cast(CodecType::NUM_CODEC_TYPES)] = { + {}, // USER_DEFINED + {NoCompressionCodec::create, nullptr}, #if FOLLY_HAVE_LIBLZ4 - LZ4Codec::create, + {LZ4Codec::create, nullptr}, #else - nullptr, + {}, #endif #if FOLLY_HAVE_LIBSNAPPY - SnappyCodec::create, + {SnappyCodec::create, nullptr}, #else - nullptr, + {}, #endif #if FOLLY_HAVE_LIBZ - ZlibCodec::create, + {ZlibStreamCodec::createCodec, ZlibStreamCodec::createStream}, #else - nullptr, + {}, #endif #if FOLLY_HAVE_LIBLZ4 - LZ4Codec::create, + {LZ4Codec::create, nullptr}, #else - nullptr, + {}, #endif #if FOLLY_HAVE_LIBLZMA - LZMA2Codec::create, - LZMA2Codec::create, + {LZMA2Codec::create, nullptr}, + {LZMA2Codec::create, nullptr}, #else - nullptr, - nullptr, + {}, + {}, #endif #if FOLLY_HAVE_LIBZSTD - ZSTDCodec::create, + {ZSTDStreamCodec::createCodec, ZSTDStreamCodec::createStream}, #else - nullptr, + {}, +#endif + +#if FOLLY_HAVE_LIBZ + {ZlibStreamCodec::createCodec, ZlibStreamCodec::createStream}, +#else + {}, +#endif + +#if (FOLLY_HAVE_LIBLZ4 && LZ4_VERSION_NUMBER >= 10301) + {LZ4FrameCodec::create, nullptr}, +#else + {}, #endif - }; - size_t idx = static_cast(type); +#if FOLLY_HAVE_LIBBZ2 + {Bzip2Codec::create, nullptr}, +#else + {}, +#endif +}; + +Factory const& getFactory(CodecType type) { + size_t const idx = static_cast(type); if (idx >= static_cast(CodecType::NUM_CODEC_TYPES)) { - throw std::invalid_argument(to( - "Compression type ", idx, " not supported")); + throw std::invalid_argument( + to("Compression type ", idx, " invalid")); + } + return codecFactories[idx]; +} +} // namespace + +bool hasCodec(CodecType type) { + return getFactory(type).codec != nullptr; +} + +std::unique_ptr getCodec(CodecType type, int level) { + auto const factory = getFactory(type).codec; + if (!factory) { + throw std::invalid_argument( + to("Compression type ", type, " not supported")); } - auto factory = codecFactories[idx]; + auto codec = (*factory)(level, type); + DCHECK(codec->type() == type); + return codec; +} + +bool hasStreamCodec(CodecType type) { + return getFactory(type).stream != nullptr; +} + +std::unique_ptr getStreamCodec(CodecType type, int level) { + auto const factory = getFactory(type).stream; if (!factory) { - throw std::invalid_argument(to( - "Compression type ", idx, " not supported")); + throw std::invalid_argument( + to("Compression type ", type, " not supported")); } auto codec = (*factory)(level, type); - DCHECK_EQ(static_cast(codec->type()), idx); + DCHECK(codec->type() == type); return codec; } +std::unique_ptr getAutoUncompressionCodec( + std::vector> customCodecs) { + return AutomaticCodec::create(std::move(customCodecs)); +} }} // namespaces